diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 00cafb1c2..32c291eec 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -182,6 +182,9 @@ shared_sub_s := shared_sub_state(), %% Buffer: inflight := emqx_persistent_session_ds_inflight:t(), + %% Last fetched stream: + %% Used as a continuation point for fair stream scheduling. + last_fetched_stream => emqx_persistent_session_ds_state:stream_key(), %% In-progress replay: %% List of stream replay states to be added to the inflight buffer. replay => [{_StreamKey, stream_state()}, ...], @@ -984,26 +987,32 @@ do_ensure_all_iterators_closed(_DSSessionID) -> %%-------------------------------------------------------------------- fetch_new_messages(Session0 = #{s := S0}, ClientInfo) -> - Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0), - Session1 = fetch_new_messages(Streams, Session0, ClientInfo), + LFS = maps:get(last_fetched_stream, Session0, beginning), + ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0), + BatchSize = get_config(ClientInfo, [batch_size]), + Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo), #{s := S1, shared_sub_s := SharedSubS0} = Session1, {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0), Session1#{s => S2, shared_sub_s => SharedSubS1}. -fetch_new_messages([], Session, _ClientInfo) -> - Session; -fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) -> - BatchSize = get_config(ClientInfo, [batch_size]), +fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) -> + #{inflight := Inflight} = Session0, case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of true -> %% Buffer is full: Session0; false -> - Session = new_batch(I, BatchSize, Session0, ClientInfo), - fetch_new_messages(Streams, Session, ClientInfo) + case emqx_persistent_session_ds_stream_scheduler:next_stream(ItStream0) of + {StreamKey, Srs, ItStream} -> + Session1 = new_batch(StreamKey, Srs, BatchSize, Session0, ClientInfo), + Session = Session1#{last_fetched_stream => StreamKey}, + fetch_new_messages(ItStream, BatchSize, Session, ClientInfo); + none -> + Session0 + end end. -new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> +new_batch(StreamKey, Srs0, BatchSize, Session0 = #{s := S0}, ClientInfo) -> SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0), SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0), Srs1 = Srs0#srs{ diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl index 95f6ee375..1d60250ea 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl @@ -39,7 +39,7 @@ -export([get_peername/1, set_peername/2]). -export([get_protocol/1, set_protocol/2]). -export([new_id/1]). --export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). +-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, iter_streams/2, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([ @@ -66,11 +66,14 @@ n_awaiting_rel/1 ]). +-export([iter_next/1]). + -export([make_session_iterator/0, session_iterator_next/2]). -export_type([ t/0, metadata/0, + iter/2, seqno_type/0, stream_key/0, rank_key/0, @@ -89,6 +92,8 @@ -type message() :: emqx_types:message(). +-opaque iter(K, V) :: gb_trees:iter(K, V). + -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. %% Generic key-value wrapper that is used for exporting arbitrary @@ -113,7 +118,7 @@ -type pmap(K, V) :: #pmap{ table :: atom(), - cache :: #{K => V}, + cache :: #{K => V} | gb_trees:tree(K, V), dirty :: #{K => dirty | del} }. @@ -476,6 +481,14 @@ del_stream(Key, Rec) -> fold_streams(Fun, Acc, Rec) -> gen_fold(?streams, Fun, Acc, Rec). +-spec iter_streams(_StartAfter :: stream_key() | beginning, t()) -> + iter(stream_key(), emqx_persistent_session_ds:stream_state()). +iter_streams(After, Rec) -> + %% NOTE + %% No special handling for `beginning', as it always compares less + %% than any `stream_key()'. + gen_iter_after(?streams, After, Rec). + -spec n_streams(t()) -> non_neg_integer(). n_streams(Rec) -> gen_size(?streams, Rec). @@ -534,6 +547,12 @@ n_awaiting_rel(Rec) -> %% +-spec iter_next(iter(K, V)) -> {K, V, iter(K, V)} | none. +iter_next(It0) -> + gen_iter_next(It0). + +%% + -spec make_session_iterator() -> session_iterator(). make_session_iterator() -> mnesia:dirty_first(?session_tab). @@ -601,6 +620,14 @@ gen_size(Field, Rec) -> check_sequence(Rec), pmap_size(maps:get(Field, Rec)). +gen_iter_after(Field, After, Rec) -> + check_sequence(Rec), + pmap_iter_after(After, maps:get(Field, Rec)). + +gen_iter_next(It) -> + %% NOTE: Currently, gbt iterators is the only type of iterators. + gbt_iter_next(It). + -spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map(). update_pmaps(Fun, Map) -> lists:foldl( @@ -619,7 +646,7 @@ update_pmaps(Fun, Map) -> %% This functtion should be ran in a transaction. -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V). pmap_open(Table, SessionId) -> - Clean = maps:from_list(kv_pmap_restore(Table, SessionId)), + Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)), #pmap{ table = Table, cache = Clean, @@ -627,29 +654,29 @@ pmap_open(Table, SessionId) -> }. -spec pmap_get(K, pmap(K, V)) -> V | undefined. -pmap_get(K, #pmap{cache = Cache}) -> - maps:get(K, Cache, undefined). +pmap_get(K, #pmap{table = Table, cache = Cache}) -> + cache_get(Table, K, Cache). -spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V). -pmap_put(K, V, Pmap = #pmap{dirty = Dirty, cache = Cache}) -> +pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) -> Pmap#pmap{ - cache = maps:put(K, V, Cache), + cache = cache_put(Table, K, V, Cache), dirty = Dirty#{K => dirty} }. -spec pmap_del(K, pmap(K, V)) -> pmap(K, V). pmap_del( Key, - Pmap = #pmap{dirty = Dirty, cache = Cache} + Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache} ) -> Pmap#pmap{ - cache = maps:remove(Key, Cache), + cache = cache_remove(Table, Key, Cache), dirty = Dirty#{Key => del} }. -spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A. -pmap_fold(Fun, Acc, #pmap{cache = Cache}) -> - maps:fold(Fun, Acc, Cache). +pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) -> + cache_fold(Table, Fun, Acc, Cache). -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V). pmap_commit( @@ -660,7 +687,7 @@ pmap_commit( (K, del) -> kv_pmap_delete(Tab, SessionId, K); (K, dirty) -> - V = maps:get(K, Cache), + V = cache_get(Tab, K, Cache), kv_pmap_persist(Tab, SessionId, K, V) end, Dirty @@ -670,13 +697,110 @@ pmap_commit( }. -spec pmap_format(pmap(_K, _V)) -> map(). -pmap_format(#pmap{cache = Cache}) -> - Cache. +pmap_format(#pmap{table = Table, cache = Cache}) -> + cache_format(Table, Cache). -spec pmap_size(pmap(_K, _V)) -> non_neg_integer(). -pmap_size(#pmap{cache = Cache}) -> +pmap_size(#pmap{table = Table, cache = Cache}) -> + cache_size(Table, Cache). + +pmap_iter_after(After, #pmap{table = Table, cache = Cache}) -> + %% NOTE: Only valid for gbt-backed PMAPs. + gbt = cache_data_type(Table), + gbt_iter_after(After, Cache). + +%% + +cache_data_type(?stream_tab) -> gbt; +cache_data_type(_Table) -> map. + +cache_from_list(?stream_tab, L) -> + gbt_from_list(L); +cache_from_list(_Table, L) -> + maps:from_list(L). + +cache_get(?stream_tab, K, Cache) -> + gbt_get(K, Cache, undefined); +cache_get(_Table, K, Cache) -> + maps:get(K, Cache, undefined). + +cache_put(?stream_tab, K, V, Cache) -> + gbt_put(K, V, Cache); +cache_put(_Table, K, V, Cache) -> + maps:put(K, V, Cache). + +cache_remove(?stream_tab, K, Cache) -> + gbt_remove(K, Cache); +cache_remove(_Table, K, Cache) -> + maps:remove(K, Cache). + +cache_fold(?stream_tab, Fun, Acc, Cache) -> + gbt_fold(Fun, Acc, Cache); +cache_fold(_Table, Fun, Acc, Cache) -> + maps:fold(Fun, Acc, Cache). + +cache_format(?stream_tab, Cache) -> + gbt_format(Cache); +cache_format(_Table, Cache) -> + Cache. + +cache_size(?stream_tab, Cache) -> + gbt_size(Cache); +cache_size(_Table, Cache) -> maps:size(Cache). +%% PMAP Cache implementation backed by `gb_trees'. +%% Supports iteration starting from specific key. + +gbt_from_list(L) -> + lists:foldl( + fun({K, V}, Acc) -> gb_trees:insert(K, V, Acc) end, + gb_trees:empty(), + L + ). + +gbt_get(K, Cache, undefined) -> + case gb_trees:lookup(K, Cache) of + none -> undefined; + {_, V} -> V + end. + +gbt_put(K, V, Cache) -> + gb_trees:enter(K, V, Cache). + +gbt_remove(K, Cache) -> + gb_trees:delete_any(K, Cache). + +gbt_format(Cache) -> + gb_trees:to_list(Cache). + +gbt_fold(Fun, Acc, Cache) -> + It = gb_trees:iterator(Cache), + gbt_fold_iter(Fun, Acc, It). + +gbt_fold_iter(Fun, Acc, It0) -> + case gb_trees:next(It0) of + {K, V, It} -> + gbt_fold_iter(Fun, Fun(K, V, Acc), It); + _ -> + Acc + end. + +gbt_size(Cache) -> + gb_trees:size(Cache). + +gbt_iter_after(After, Cache) -> + It0 = gb_trees:iterator_from(After, Cache), + case gb_trees:next(It0) of + {After, _, It} -> + It; + _ -> + It0 + end. + +gbt_iter_next(It) -> + gb_trees:next(It). + %% Functions dealing with set tables: kv_persist(Tab, SessionId, Val0) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl index 15d43c7cd..80dee0e4d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl @@ -16,7 +16,8 @@ -module(emqx_persistent_session_ds_stream_scheduler). %% API: --export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]). +-export([iter_next_streams/2, next_stream/1]). +-export([find_replay_streams/1, is_fully_acked/2]). -export([renew_streams/1, on_unsubscribe/2]). %% behavior callbacks: @@ -35,6 +36,29 @@ %% Type declarations %%================================================================================ +-type stream_key() :: emqx_persistent_session_ds_state:stream_key(). +-type stream_state() :: emqx_persistent_session_ds:stream_state(). + +%% Restartable iterator with a filter and an iteration limit. +-record(iter, { + limit :: non_neg_integer(), + filter, + it, + it_cont +}). + +-type iter(K, V, IterInner) :: #iter{ + filter :: fun((K, V) -> boolean()), + it :: IterInner, + it_cont :: IterInner +}. + +-type iter_stream() :: iter( + stream_key(), + stream_state(), + emqx_persistent_session_ds_state:iter(stream_key(), stream_state()) +). + %%================================================================================ %% API functions %%================================================================================ @@ -70,9 +94,9 @@ find_replay_streams(S) -> %% %% This function is non-detereministic: it randomizes the order of %% streams to ensure fair replay of different topics. --spec find_new_streams(emqx_persistent_session_ds_state:t()) -> - [{emqx_persistent_session_ds_state:stream_key(), emqx_persistent_session_ds:stream_state()}]. -find_new_streams(S) -> +-spec iter_next_streams(_LastVisited :: stream_key(), emqx_persistent_session_ds_state:t()) -> + iter_stream(). +iter_next_streams(LastVisited, S) -> %% FIXME: this function is currently very sensitive to the %% consistency of the packet IDs on both broker and client side. %% @@ -87,23 +111,44 @@ find_new_streams(S) -> %% after timeout?) Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), - shuffle( - emqx_persistent_session_ds_state:fold_streams( - fun - (_Key, #srs{it_end = end_of_stream}, Acc) -> - Acc; - (Key, Stream, Acc) -> - case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of - true -> - [{Key, Stream} | Acc]; - false -> - Acc - end - end, - [], - S - ) - ). + Filter = fun(_Key, Stream) -> is_fetchable(Comm1, Comm2, Stream) end, + #iter{ + %% Limit the iteration to one round over all streams: + limit = emqx_persistent_session_ds_state:n_streams(S), + %% Filter out the streams not eligible for fetching: + filter = Filter, + %% Start the iteration right after the last visited stream: + it = emqx_persistent_session_ds_state:iter_streams(LastVisited, S), + %% Restart the iteration from the beginning: + it_cont = emqx_persistent_session_ds_state:iter_streams(beginning, S) + }. + +-spec next_stream(iter_stream()) -> {stream_key(), stream_state(), iter_stream()} | none. +next_stream(#iter{limit = 0}) -> + none; +next_stream(ItStream0 = #iter{limit = N, filter = Filter, it = It0, it_cont = ItCont}) -> + case emqx_persistent_session_ds_state:iter_next(It0) of + {Key, Stream, It} -> + ItStream = ItStream0#iter{it = It, limit = N - 1}, + case Filter(Key, Stream) of + true -> + {Key, Stream, ItStream}; + false -> + next_stream(ItStream) + end; + none when It0 =/= ItCont -> + %% Restart the iteration from the beginning: + ItStream = ItStream0#iter{it = ItCont}, + next_stream(ItStream); + none -> + %% No point in restarting the iteration, `ItCont` is empty: + none + end. + +is_fetchable(_Comm1, _Comm2, #srs{it_end = end_of_stream}) -> + false; +is_fetchable(Comm1, Comm2, #srs{unsubscribed = Unsubscribed} = Stream) -> + is_fully_acked(Comm1, Comm2, Stream) andalso not Unsubscribed. %% @doc This function makes the session aware of the new streams. %% @@ -410,19 +455,6 @@ is_fully_acked(_, _, #srs{ is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2). --spec shuffle([A]) -> [A]. -shuffle(L0) -> - L1 = lists:map( - fun(A) -> - %% maybe topic/stream prioritization could be introduced here? - {rand:uniform(), A} - end, - L0 - ), - L2 = lists:sort(L1), - {_, L} = lists:unzip(L2), - L. - fold_proper_subscriptions(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( fun