From a20d2623278755b61ce19cb9b3e391a45745a8d8 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 28 Jun 2024 13:41:10 +0300 Subject: [PATCH] feat(queue): send progress before fetching new messages --- apps/emqx/src/emqx_persistent_session_ds.erl | 9 ++++----- .../emqx_persistent_session_ds_shared_subs.erl | 6 +++--- .../emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl | 4 ++-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index dc4a74b43..62e6bdd26 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -987,14 +987,13 @@ do_ensure_all_iterators_closed(_DSSessionID) -> %% Normal replay: %%-------------------------------------------------------------------- -fetch_new_messages(Session0 = #{s := S0}, ClientInfo) -> +fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) -> + {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0), LFS = maps:get(last_fetched_stream, Session0, beginning), - ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0), + ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1), BatchSize = get_config(ClientInfo, [batch_size]), Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo), - #{s := S1, shared_sub_s := SharedSubS0} = Session1, - {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0), - Session1#{s => S2, shared_sub_s => SharedSubS1}. + Session1#{shared_sub_s => SharedSubS1}. fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) -> #{inflight := Inflight} = Session0, diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 0274b9b9e..616414112 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -17,7 +17,7 @@ on_unsubscribe/4, on_disconnect/2, - on_streams_replayed/2, + on_streams_replay/2, on_info/3, renew_streams/2, @@ -114,11 +114,11 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> SharedSubS1 = SharedSubS0#{agent => Agent1}, {S1, SharedSubS1}. --spec on_streams_replayed( +-spec on_streams_replay( emqx_persistent_session_ds_state:t(), t() ) -> {emqx_persistent_session_ds_state:t(), t()}. -on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> +on_streams_replay(S, #{agent := Agent0} = SharedSubS0) -> Progresses = stream_progresses(S), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( Agent0, Progresses diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 2bbdc67d8..d563c0115 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -336,8 +336,8 @@ select_streams_for_revoke( ) -> %% TODO %% Some intellectual logic should be used regarding: - %% * shard ids (better spread shards across different streams); - %% * stream stats (how much data was replayed from stream, + %% * shard ids (better do not mix shards in the same agent); + %% * stream stats (how much data was replayed from stream), %% heavy streams should be distributed across different agents); %% * data locality (agents better preserve streams with data available on the agent's node) lists:sublist(shuffle(Streams), RevokeCount).