chore(ds): return DS message key along with batch

This commit is contained in:
Thales Macedo Garitezi 2023-12-06 18:06:54 -03:00
parent b7e7141e77
commit 2a6d72878f
8 changed files with 16 additions and 12 deletions

View File

@ -427,7 +427,7 @@ get_commit_next(comp, #inflight{commits = Commits}) ->
publish_fetch(PreprocFun, FirstSeqno, Messages) -> publish_fetch(PreprocFun, FirstSeqno, Messages) ->
flatmapfoldl( flatmapfoldl(
fun(MessageIn, Acc) -> fun({_DSKey, MessageIn}, Acc) ->
Message = PreprocFun(MessageIn), Message = PreprocFun(MessageIn),
publish_fetch(Message, Acc) publish_fetch(Message, Acc)
end, end,
@ -446,7 +446,7 @@ publish_fetch(Messages, Seqno) ->
publish_replay(PreprocFun, Commits, FirstSeqno, Messages) -> publish_replay(PreprocFun, Commits, FirstSeqno, Messages) ->
#{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits, #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits,
flatmapfoldl( flatmapfoldl(
fun(MessageIn, Acc) -> fun({_DSKey, MessageIn}, Acc) ->
Message = PreprocFun(MessageIn), Message = PreprocFun(MessageIn),
publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
end, end,

View File

@ -343,8 +343,8 @@ consume(It) ->
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of
{ok, _NIt, _Msgs = []} -> {ok, _NIt, _Msgs = []} ->
[]; [];
{ok, NIt, Msgs} -> {ok, NIt, MsgsAndKeys} ->
Msgs ++ consume(NIt); [Msg || {_DSKey, Msg} <- MsgsAndKeys] ++ consume(NIt);
{ok, end_of_stream} -> {ok, end_of_stream} ->
[] []
end. end.

View File

@ -43,6 +43,7 @@
stream_rank/0, stream_rank/0,
iterator/0, iterator/0,
message_id/0, message_id/0,
message_key/0,
next_result/1, next_result/0, next_result/1, next_result/0,
store_batch_result/0, store_batch_result/0,
make_iterator_result/1, make_iterator_result/0, make_iterator_result/1, make_iterator_result/0,
@ -74,6 +75,8 @@
-type ds_specific_stream() :: term(). -type ds_specific_stream() :: term().
-type message_key() :: binary().
-type store_batch_result() :: ok | {error, _}. -type store_batch_result() :: ok | {error, _}.
-type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. -type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}.
@ -81,7 +84,7 @@
-type make_iterator_result() :: make_iterator_result(iterator()). -type make_iterator_result() :: make_iterator_result(iterator()).
-type next_result(Iterator) :: -type next_result(Iterator) ::
{ok, Iterator, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. {ok, Iterator, [{message_key(), emqx_types:message()}]} | {ok, end_of_stream} | {error, _}.
-type next_result() :: next_result(iterator()). -type next_result() :: next_result(iterator()).

View File

@ -329,7 +329,7 @@ traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
Msg = deserialize(Val), Msg = deserialize(Val),
case check_message(Cutoff, It, Msg) of case check_message(Cutoff, It, Msg) of
true -> true ->
Acc = [Msg | Acc0], Acc = [{Key, Msg} | Acc0],
traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1);
false -> false ->
traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N);

View File

@ -125,7 +125,7 @@ do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of
true -> true ->
do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [Msg | Acc]); do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [{Key, Msg} | Acc]);
false -> false ->
do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc) do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc)
end; end;

View File

@ -65,7 +65,7 @@ make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:iterator(), emqx_ds_storage_layer:iterator(),
pos_integer() pos_integer()
) -> ) ->
{ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]} {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), emqx_types:messages()}]}
| {ok, end_of_stream} | {ok, end_of_stream}
| {error, _}. | {error, _}.
next(Node, DB, Shard, Iter, BatchSize) -> next(Node, DB, Shard, Iter, BatchSize) ->

View File

@ -101,7 +101,7 @@ t_03_smoke_iterate(_Config) ->
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
{ok, Iter, Batch} = iterate(DB, Iter0, 1), {ok, Iter, Batch} = iterate(DB, Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}). ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
%% Verify that iterators survive restart of the application. This is %% Verify that iterators survive restart of the application. This is
%% an important property, since the lifetime of the iterators is tied %% an important property, since the lifetime of the iterators is tied
@ -128,7 +128,7 @@ t_04_restart(_Config) ->
ok = emqx_ds:open_db(DB, opts()), ok = emqx_ds:open_db(DB, opts()),
%% The old iterator should be still operational: %% The old iterator should be still operational:
{ok, Iter, Batch} = iterate(DB, Iter0, 1), {ok, Iter, Batch} = iterate(DB, Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}). ?assertEqual(Msgs, [Msg || {_Key, Msg} <- Batch], {Iter0, Iter}).
message(Topic, Payload, PublishedAt) -> message(Topic, Payload, PublishedAt) ->
#message{ #message{

View File

@ -64,7 +64,8 @@ t_iterate(_Config) ->
begin begin
[{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0),
{ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0),
{ok, NextIt, Messages} = emqx_ds_storage_layer:next(?SHARD, It, 100), {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100),
Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys],
?assertEqual( ?assertEqual(
lists:map(fun integer_to_binary/1, Timestamps), lists:map(fun integer_to_binary/1, Timestamps),
payloads(Messages) payloads(Messages)
@ -249,7 +250,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) ->
{ok, _NextIt, []} -> {ok, _NextIt, []} ->
[]; [];
{ok, NextIt, Batch} -> {ok, NextIt, Batch} ->
Batch ++ F(NextIt, N - 1) [Msg || {_DSKey, Msg} <- Batch] ++ F(NextIt, N - 1)
end end
end, end,
MaxIterations = 1000000, MaxIterations = 1000000,