perf(sessds): rotate through streams with iterators when fetching

This avoids expensive shuffling of the whole list of fetchable streams,
which can be quite long.
This commit is contained in:
Andrew Mayorov 2024-07-02 15:42:33 +02:00
parent 9a4f3f88e3
commit a57917b66b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 81 additions and 43 deletions

View File

@ -182,6 +182,9 @@
shared_sub_s := shared_sub_state(), shared_sub_s := shared_sub_state(),
%% Buffer: %% Buffer:
inflight := emqx_persistent_session_ds_inflight:t(), inflight := emqx_persistent_session_ds_inflight:t(),
%% Last fetched stream:
%% Used as a continuation point for fair stream scheduling.
last_fetched_stream => emqx_persistent_session_ds_state:stream_key(),
%% In-progress replay: %% In-progress replay:
%% List of stream replay states to be added to the inflight buffer. %% List of stream replay states to be added to the inflight buffer.
replay => [{_StreamKey, stream_state()}, ...], replay => [{_StreamKey, stream_state()}, ...],
@ -984,26 +987,32 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) -> fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0), LFS = maps:get(last_fetched_stream, Session0, beginning),
Session1 = fetch_new_messages(Streams, Session0, ClientInfo), ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
BatchSize = get_config(ClientInfo, [batch_size]),
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
#{s := S1, shared_sub_s := SharedSubS0} = Session1, #{s := S1, shared_sub_s := SharedSubS0} = Session1,
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0), {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
Session1#{s => S2, shared_sub_s => SharedSubS1}. Session1#{s => S2, shared_sub_s => SharedSubS1}.
fetch_new_messages([], Session, _ClientInfo) -> fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
Session; #{inflight := Inflight} = Session0,
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
BatchSize = get_config(ClientInfo, [batch_size]),
case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
true -> true ->
%% Buffer is full: %% Buffer is full:
Session0; Session0;
false -> false ->
Session = new_batch(I, BatchSize, Session0, ClientInfo), case emqx_persistent_session_ds_stream_scheduler:next_stream(ItStream0) of
fetch_new_messages(Streams, Session, ClientInfo) {StreamKey, Srs, ItStream} ->
Session1 = new_batch(StreamKey, Srs, BatchSize, Session0, ClientInfo),
Session = Session1#{last_fetched_stream => StreamKey},
fetch_new_messages(ItStream, BatchSize, Session, ClientInfo);
none ->
Session0
end
end. end.
new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0), SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0), SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
Srs1 = Srs0#srs{ Srs1 = Srs0#srs{

View File

@ -16,7 +16,8 @@
-module(emqx_persistent_session_ds_stream_scheduler). -module(emqx_persistent_session_ds_stream_scheduler).
%% API: %% API:
-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]). -export([iter_next_streams/2, next_stream/1]).
-export([find_replay_streams/1, is_fully_acked/2]).
-export([renew_streams/1, on_unsubscribe/2]). -export([renew_streams/1, on_unsubscribe/2]).
%% behavior callbacks: %% behavior callbacks:
@ -35,6 +36,29 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type stream_key() :: emqx_persistent_session_ds_state:stream_key().
-type stream_state() :: emqx_persistent_session_ds:stream_state().
%% Restartable iterator with a filter and an iteration limit.
-record(iter, {
limit :: non_neg_integer(),
filter,
it,
it_cont
}).
-type iter(K, V, IterInner) :: #iter{
filter :: fun((K, V) -> boolean()),
it :: IterInner,
it_cont :: IterInner
}.
-type iter_stream() :: iter(
stream_key(),
stream_state(),
emqx_persistent_session_ds_state:iter(stream_key(), stream_state())
).
%%================================================================================ %%================================================================================
%% API functions %% API functions
%%================================================================================ %%================================================================================
@ -70,9 +94,9 @@ find_replay_streams(S) ->
%% %%
%% This function is non-detereministic: it randomizes the order of %% This function is non-detereministic: it randomizes the order of
%% streams to ensure fair replay of different topics. %% streams to ensure fair replay of different topics.
-spec find_new_streams(emqx_persistent_session_ds_state:t()) -> -spec iter_next_streams(_LastVisited :: stream_key(), emqx_persistent_session_ds_state:t()) ->
[{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}]. iter_stream().
find_new_streams(S) -> iter_next_streams(LastVisited, S) ->
%% FIXME: this function is currently very sensitive to the %% FIXME: this function is currently very sensitive to the
%% consistency of the packet IDs on both broker and client side. %% consistency of the packet IDs on both broker and client side.
%% %%
@ -87,23 +111,41 @@ find_new_streams(S) ->
%% after timeout?) %% after timeout?)
Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
shuffle( Filter = fun(_Key, Stream) -> is_fetchable(Comm1, Comm2, Stream) end,
emqx_persistent_session_ds_state:fold_streams( #iter{
fun %% Limit the iteration to one round over all streams:
(_Key, #srs{it_end = end_of_stream}, Acc) -> limit = emqx_persistent_session_ds_state:n_streams(S),
Acc; %% Filter out the streams not eligible for fetching:
(Key, Stream, Acc) -> filter = Filter,
case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of %% Start the iteration right after the last visited stream:
true -> it = emqx_persistent_session_ds_state:iter_streams(LastVisited, S),
[{Key, Stream} | Acc]; %% Restart the iteration from the beginning:
false -> it_cont = emqx_persistent_session_ds_state:iter_streams(beginning, S)
Acc }.
end
end, -spec next_stream(iter_stream()) -> {stream_key(), stream_state(), iter_stream()} | none.
[], next_stream(#iter{limit = 0}) ->
S none;
) next_stream(ItStream0 = #iter{limit = N, filter = Filter, it = It0, it_cont = ItCont}) ->
). case emqx_persistent_session_ds_state:iterate(It0) of
{Key, Stream, It} ->
ItStream = ItStream0#iter{it = It, limit = N - 1},
case Filter(Key, Stream) of
true ->
{Key, Stream, ItStream};
false ->
next_stream(ItStream)
end;
none ->
%% Restart the iteration from the beginning:
ItStream = ItStream0#iter{it = ItCont},
next_stream(ItStream)
end.
is_fetchable(_Comm1, _Comm2, #srs{it_end = end_of_stream}) ->
false;
is_fetchable(Comm1, Comm2, #srs{unsubscribed = Unsubscribed} = Stream) ->
is_fully_acked(Comm1, Comm2, Stream) andalso not Unsubscribed.
%% @doc This function makes the session aware of the new streams. %% @doc This function makes the session aware of the new streams.
%% %%
@ -410,19 +452,6 @@ is_fully_acked(_, _, #srs{
is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
(Comm1 >= S1) andalso (Comm2 >= S2). (Comm1 >= S1) andalso (Comm2 >= S2).
-spec shuffle([A]) -> [A].
shuffle(L0) ->
L1 = lists:map(
fun(A) ->
%% maybe topic/stream prioritization could be introduced here?
{rand:uniform(), A}
end,
L0
),
L2 = lists:sort(L1),
{_, L} = lists:unzip(L2),
L.
fold_proper_subscriptions(Fun, Acc, S) -> fold_proper_subscriptions(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions( emqx_persistent_session_ds_state:fold_subscriptions(
fun fun