feat(ds): implemented the replayer scheduler
This commit is contained in:
parent
abeb5e985f
commit
d278d3afb5
|
@ -185,10 +185,10 @@ poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSi
|
|||
{[], Inflight0};
|
||||
true ->
|
||||
%% TODO: Wrap this in `mria:async_dirty/2`?
|
||||
Streams = shuffle(get_streams(SessionId)),
|
||||
Checkpoints = find_checkpoints(Inflight0#inflight.offset_ranges),
|
||||
StreamGroups = group_streams(get_streams(SessionId)),
|
||||
{Publihes, Inflight} =
|
||||
fetch(PreprocFun, SessionId, Inflight0, Checkpoints, Streams, FreeSpace, []),
|
||||
fetch(PreprocFun, SessionId, Inflight0, Checkpoints, StreamGroups, FreeSpace, []),
|
||||
%% Discard now irrelevant QoS0-only ranges, if any.
|
||||
{Publihes, discard_committed(SessionId, Inflight)}
|
||||
end.
|
||||
|
@ -257,14 +257,13 @@ get_ranges(SessionId) ->
|
|||
),
|
||||
mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read).
|
||||
|
||||
fetch(PreprocFun, SessionId, Inflight0, CPs, [Stream | Streams], N, Acc) when N > 0 ->
|
||||
fetch(PreprocFun, SessionId, Inflight0, CPs, Groups, N, Acc) when N > 0, Groups =/= [] ->
|
||||
#inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
|
||||
ItBegin = get_last_iterator(Stream, CPs),
|
||||
{ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
|
||||
case Messages of
|
||||
{Stream, Groups2} = get_the_first_stream(Groups),
|
||||
case get_next_n_messages_from_stream(Stream, CPs, N) of
|
||||
[] ->
|
||||
fetch(PreprocFun, SessionId, Inflight0, CPs, Streams, N, Acc);
|
||||
_ ->
|
||||
fetch(PreprocFun, SessionId, Inflight0, CPs, Groups2, N, Acc);
|
||||
{ItBegin, ItEnd, Messages} ->
|
||||
%% We need to preserve the iterator pointing to the beginning of the
|
||||
%% range, so that we can replay it if needed.
|
||||
{Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages),
|
||||
|
@ -285,9 +284,9 @@ fetch(PreprocFun, SessionId, Inflight0, CPs, [Stream | Streams], N, Acc) when N
|
|||
next_seqno = UntilSeqno,
|
||||
offset_ranges = Ranges ++ [Range]
|
||||
},
|
||||
fetch(PreprocFun, SessionId, Inflight, CPs, Streams, N - Size, [Publishes | Acc])
|
||||
fetch(PreprocFun, SessionId, Inflight, CPs, Groups2, N - Size, [Publishes | Acc])
|
||||
end;
|
||||
fetch(_PreprocFun, _SessionId, Inflight, _CPs, _Streams, _N, Acc) ->
|
||||
fetch(_ReplyFun, _SessionId, Inflight, _CPs, _Groups, _N, Acc) ->
|
||||
Publishes = lists:append(lists:reverse(Acc)),
|
||||
{Publishes, Inflight}.
|
||||
|
||||
|
@ -607,10 +606,23 @@ range_size(FirstSeqno, UntilSeqno) ->
|
|||
Size = UntilSeqno - FirstSeqno,
|
||||
Size + (FirstSeqno bsr 16) - (UntilSeqno bsr 16).
|
||||
|
||||
%%================================================================================
|
||||
%% stream scheduler
|
||||
|
||||
%% group streams by the first position in the rank
|
||||
-spec group_streams(list(ds_stream())) -> list(list(ds_stream())).
|
||||
group_streams(Streams) ->
|
||||
Groups = maps:groups_from_list(
|
||||
fun(#ds_stream{rank = {RankX, _}}) -> RankX end,
|
||||
Streams
|
||||
),
|
||||
shuffle(maps:values(Groups)).
|
||||
|
||||
-spec shuffle([A]) -> [A].
|
||||
shuffle(L0) ->
|
||||
L1 = lists:map(
|
||||
fun(A) ->
|
||||
%% maybe topic/stream prioritization could be introduced here?
|
||||
{rand:uniform(), A}
|
||||
end,
|
||||
L0
|
||||
|
@ -619,6 +631,47 @@ shuffle(L0) ->
|
|||
{_, L} = lists:unzip(L2),
|
||||
L.
|
||||
|
||||
get_the_first_stream([Group | Groups]) ->
|
||||
case get_next_stream_from_group(Group) of
|
||||
{Stream, {sorted, []}} ->
|
||||
{Stream, Groups};
|
||||
{Stream, Group2} ->
|
||||
{Stream, [Group2 | Groups]};
|
||||
undefined ->
|
||||
get_the_first_stream(Groups)
|
||||
end;
|
||||
get_the_first_stream([]) ->
|
||||
%% how this possible ?
|
||||
throw(#{reason => no_valid_stream}).
|
||||
|
||||
%% the scheduler is simple, try to get messages from the same shard, but it's okay to take turns
|
||||
get_next_stream_from_group({sorted, [H | T]}) ->
|
||||
{H, {sorted, T}};
|
||||
get_next_stream_from_group({sorted, []}) ->
|
||||
undefined;
|
||||
get_next_stream_from_group(Streams) ->
|
||||
[Stream | T] = lists:sort(
|
||||
fun(#ds_stream{rank = {_, RankA}}, #ds_stream{rank = {_, RankB}}) ->
|
||||
RankA < RankB
|
||||
end,
|
||||
Streams
|
||||
),
|
||||
{Stream, {sorted, T}}.
|
||||
|
||||
get_next_n_messages_from_stream(Stream, CPs, N) ->
|
||||
ItBegin = get_last_iterator(Stream, CPs),
|
||||
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N) of
|
||||
{ok, _ItEnd, []} ->
|
||||
[];
|
||||
{ok, ItEnd, Messages} ->
|
||||
{ItBegin, ItEnd, Messages};
|
||||
{ok, end_of_stream} ->
|
||||
%% TODO: how to skip this closed stream or it should be taken over by lower level layer
|
||||
[]
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
|
||||
-spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}.
|
||||
flatmapfoldl(_Fun, Acc, []) ->
|
||||
{[], Acc};
|
||||
|
|
Loading…
Reference in New Issue