From abeff0bc4f4241c22b42b51a83f970f01c1eb2da Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 15 Sep 2023 14:49:29 +0400 Subject: [PATCH] chore(session): try to describe what happens after session takeover --- apps/emqx/src/emqx_session_mem.erl | 19 +++++++++++++++++-- .../src/emqx_eviction_agent_channel.erl | 6 ++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 9322a9f28..e5e76ed31 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -123,6 +123,7 @@ }). -type session() :: #session{}. +-type replayctx() :: [emqx_types:message()]. -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). @@ -193,7 +194,7 @@ destroy(_Session) -> %%-------------------------------------------------------------------- -spec open(clientinfo(), emqx_types:conninfo()) -> - {true, session(), _ReplayContext :: [emqx_types:message()]} | false. + {true, session(), replayctx()} | false. open(ClientInfo = #{clientid := ClientId}, _ConnInfo) -> case emqx_cm:takeover_channel_session( @@ -667,9 +668,23 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = ok = emqx_hooks:run('session.resumed', [ClientInfo, emqx_session:info(Session)]), Session. --spec replay(emqx_types:clientinfo(), [emqx_types:message()], session()) -> +-spec replay(emqx_types:clientinfo(), replayctx(), session()) -> {ok, replies(), session()}. replay(ClientInfo, Pendings, Session) -> + % NOTE + % Here, `Pendings` is a list messages that were pending delivery in the remote + % session, see `clean_session/3`. It's a replay context that gets passed back + % here after the remote session is taken over by `open/2`. When we have a set + % of remote deliveries and a set of local deliveries, some publishes might actually + % be in both sets, because there's a tiny amount of time when both remote and local + % sessions were subscribed to the same set of topics simultaneously (i.e. after + % local session calls `resume/2` but before remote session calls `takeover/1` + % through `emqx_channel:handle_call({takeover, 'end'}, Channel)`). + % We basically need to: + % 1. Combine and deduplicate remote and local pending messages, so that no message + % is delivered twice. + % 2. Replay deliveries of the inflight messages, this time to the new channel. + % 3. Deliver the combined pending messages, following the same logic as `deliver/3`. PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session), {ok, PubsResendQueued, Session1} = replay(ClientInfo, Session), {ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1), diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index d7b35458c..9c4b01699 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -243,6 +243,12 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> node => node() } ), + % NOTE + % Here we aggregate and deduplicate remote and local pending deliveries, + % throwing away any local deliveries that are part of some shared + % subscription. Remote deliviries pertaining to shared subscriptions should + % already have been thrown away by `emqx_channel:handle_deliver/2`. + % See also: `emqx_channel:maybe_resume_session/1`, `emqx_session_mem:replay/3`. DeliversLocal = emqx_channel:maybe_nack(emqx_utils:drain_deliver()), PendingsAll = emqx_session_mem:dedup(ClientInfo, Pendings, DeliversLocal, Session), NSession = emqx_session_mem:enqueue(ClientInfo, PendingsAll, Session),