diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 3ba2fdeee..494730346 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -185,10 +185,10 @@ poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSi {[], Inflight0}; true -> %% TODO: Wrap this in `mria:async_dirty/2`? - Streams = shuffle(get_streams(SessionId)), Checkpoints = find_checkpoints(Inflight0#inflight.offset_ranges), + StreamGroups = group_streams(get_streams(SessionId)), {Publihes, Inflight} = - fetch(PreprocFun, SessionId, Inflight0, Checkpoints, Streams, FreeSpace, []), + fetch(PreprocFun, SessionId, Inflight0, Checkpoints, StreamGroups, FreeSpace, []), %% Discard now irrelevant QoS0-only ranges, if any. {Publihes, discard_committed(SessionId, Inflight)} end. @@ -257,14 +257,13 @@ get_ranges(SessionId) -> ), mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read). -fetch(PreprocFun, SessionId, Inflight0, CPs, [Stream | Streams], N, Acc) when N > 0 -> +fetch(PreprocFun, SessionId, Inflight0, CPs, Groups, N, Acc) when N > 0, Groups =/= [] -> #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, - ItBegin = get_last_iterator(Stream, CPs), - {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), - case Messages of + {Stream, Groups2} = get_the_first_stream(Groups), + case get_next_n_messages_from_stream(Stream, CPs, N) of [] -> - fetch(PreprocFun, SessionId, Inflight0, CPs, Streams, N, Acc); - _ -> + fetch(PreprocFun, SessionId, Inflight0, CPs, Groups2, N, Acc); + {ItBegin, ItEnd, Messages} -> %% We need to preserve the iterator pointing to the beginning of the %% range, so that we can replay it if needed. {Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages), @@ -285,9 +284,9 @@ fetch(PreprocFun, SessionId, Inflight0, CPs, [Stream | Streams], N, Acc) when N next_seqno = UntilSeqno, offset_ranges = Ranges ++ [Range] }, - fetch(PreprocFun, SessionId, Inflight, CPs, Streams, N - Size, [Publishes | Acc]) + fetch(PreprocFun, SessionId, Inflight, CPs, Groups2, N - Size, [Publishes | Acc]) end; -fetch(_PreprocFun, _SessionId, Inflight, _CPs, _Streams, _N, Acc) -> +fetch(_ReplyFun, _SessionId, Inflight, _CPs, _Groups, _N, Acc) -> Publishes = lists:append(lists:reverse(Acc)), {Publishes, Inflight}. @@ -607,10 +606,23 @@ range_size(FirstSeqno, UntilSeqno) -> Size = UntilSeqno - FirstSeqno, Size + (FirstSeqno bsr 16) - (UntilSeqno bsr 16). +%%================================================================================ +%% stream scheduler + +%% group streams by the first position in the rank +-spec group_streams(list(ds_stream())) -> list(list(ds_stream())). +group_streams(Streams) -> + Groups = maps:groups_from_list( + fun(#ds_stream{rank = {RankX, _}}) -> RankX end, + Streams + ), + shuffle(maps:values(Groups)). + -spec shuffle([A]) -> [A]. shuffle(L0) -> L1 = lists:map( fun(A) -> + %% maybe topic/stream prioritization could be introduced here? {rand:uniform(), A} end, L0 @@ -619,6 +631,47 @@ shuffle(L0) -> {_, L} = lists:unzip(L2), L. +get_the_first_stream([Group | Groups]) -> + case get_next_stream_from_group(Group) of + {Stream, {sorted, []}} -> + {Stream, Groups}; + {Stream, Group2} -> + {Stream, [Group2 | Groups]}; + undefined -> + get_the_first_stream(Groups) + end; +get_the_first_stream([]) -> + %% how this possible ? + throw(#{reason => no_valid_stream}). + +%% the scheduler is simple, try to get messages from the same shard, but it's okay to take turns +get_next_stream_from_group({sorted, [H | T]}) -> + {H, {sorted, T}}; +get_next_stream_from_group({sorted, []}) -> + undefined; +get_next_stream_from_group(Streams) -> + [Stream | T] = lists:sort( + fun(#ds_stream{rank = {_, RankA}}, #ds_stream{rank = {_, RankB}}) -> + RankA < RankB + end, + Streams + ), + {Stream, {sorted, T}}. + +get_next_n_messages_from_stream(Stream, CPs, N) -> + ItBegin = get_last_iterator(Stream, CPs), + case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N) of + {ok, _ItEnd, []} -> + []; + {ok, ItEnd, Messages} -> + {ItBegin, ItEnd, Messages}; + {ok, end_of_stream} -> + %% TODO: how to skip this closed stream or it should be taken over by lower level layer + [] + end. + +%%================================================================================ + -spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}. flatmapfoldl(_Fun, Acc, []) -> {[], Acc};