From 2a6d72878fe2d6831a21dfcbb1c6c522927ebe4c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Dec 2023 18:06:54 -0300 Subject: [PATCH] chore(ds): return DS message key along with batch --- apps/emqx/src/emqx_persistent_message_ds_replayer.erl | 4 ++-- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 4 ++-- apps/emqx_durable_storage/src/emqx_ds.erl | 5 ++++- .../src/emqx_ds_storage_bitfield_lts.erl | 2 +- apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl | 2 +- apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl | 2 +- apps/emqx_durable_storage/test/emqx_ds_SUITE.erl | 4 ++-- .../test/emqx_ds_storage_bitfield_lts_SUITE.erl | 5 +++-- 8 files changed, 16 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 723f02a01..39eb60d95 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -427,7 +427,7 @@ get_commit_next(comp, #inflight{commits = Commits}) -> publish_fetch(PreprocFun, FirstSeqno, Messages) -> flatmapfoldl( - fun(MessageIn, Acc) -> + fun({_DSKey, MessageIn}, Acc) -> Message = PreprocFun(MessageIn), publish_fetch(Message, Acc) end, @@ -446,7 +446,7 @@ publish_fetch(Messages, Seqno) -> publish_replay(PreprocFun, Commits, FirstSeqno, Messages) -> #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits, flatmapfoldl( - fun(MessageIn, Acc) -> + fun({_DSKey, MessageIn}, Acc) -> Message = PreprocFun(MessageIn), publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) end, diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index ff0f2733f..418c77095 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -343,8 +343,8 @@ consume(It) -> case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of {ok, _NIt, _Msgs = []} -> []; - {ok, NIt, Msgs} -> - Msgs ++ consume(NIt); + {ok, NIt, MsgsAndKeys} -> + [Msg || {_DSKey, Msg} <- MsgsAndKeys] ++ consume(NIt); {ok, end_of_stream} -> [] end. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 649341eb5..d9e4e4b5a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -43,6 +43,7 @@ stream_rank/0, iterator/0, message_id/0, + message_key/0, next_result/1, next_result/0, store_batch_result/0, make_iterator_result/1, make_iterator_result/0, @@ -74,6 +75,8 @@ -type ds_specific_stream() :: term(). +-type message_key() :: binary(). + -type store_batch_result() :: ok | {error, _}. -type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. @@ -81,7 +84,7 @@ -type make_iterator_result() :: make_iterator_result(iterator()). -type next_result(Iterator) :: - {ok, Iterator, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. + {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}. -type next_result() :: next_result(iterator()). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index c2f533673..b4422083a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -329,7 +329,7 @@ traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> Msg = deserialize(Val), case check_message(Cutoff, It, Msg) of true -> - Acc = [Msg | Acc0], + Acc = [{Key, Msg} | Acc0], traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); false -> traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 6676faf88..37719a38f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -125,7 +125,7 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of true -> - do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [Msg | Acc]); + do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]); false -> do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc) end; diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 3b7c36082..243ce230e 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -65,7 +65,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_storage_layer:iterator(), pos_integer() ) -> - {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]} + {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]} | {ok, end_of_stream} | {error, _}. next(Node, DB, Shard, Iter, BatchSize) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 8a46804b0..c5af38def 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -101,7 +101,7 @@ t_03_smoke_iterate(_Config) -> [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), {ok, Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual(Msgs, Batch, {Iter0, Iter}). + ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). %% Verify that iterators survive restart of the application. This is %% an important property, since the lifetime of the iterators is tied @@ -128,7 +128,7 @@ t_04_restart(_Config) -> ok = emqx_ds:open_db(DB, opts()), %% The old iterator should be still operational: {ok, Iter, Batch} = iterate(DB, Iter0, 1), - ?assertEqual(Msgs, Batch, {Iter0, Iter}). + ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}). message(Topic, Payload, PublishedAt) -> #message{ diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 7b733406d..fc6049669 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -64,7 +64,8 @@ t_iterate(_Config) -> begin [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), - {ok, NextIt, Messages} = emqx_ds_storage_layer:next(?SHARD, It, 100), + {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), + Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], ?assertEqual( lists:map(fun integer_to_binary/1, Timestamps), payloads(Messages) @@ -249,7 +250,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> {ok, _NextIt, []} -> []; {ok, NextIt, Batch} -> - Batch ++ F(NextIt, N - 1) + [Msg || {_DSKey, Msg} <- Batch] ++ F(NextIt, N - 1) end end, MaxIterations = 1000000,