chore(session): try to describe what happens after session takeover
This commit is contained in:
parent
45d44df11d
commit
abeff0bc4f
|
@ -123,6 +123,7 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type session() :: #session{}.
|
-type session() :: #session{}.
|
||||||
|
-type replayctx() :: [emqx_types:message()].
|
||||||
|
|
||||||
-type clientinfo() :: emqx_types:clientinfo().
|
-type clientinfo() :: emqx_types:clientinfo().
|
||||||
-type conninfo() :: emqx_session:conninfo().
|
-type conninfo() :: emqx_session:conninfo().
|
||||||
|
@ -193,7 +194,7 @@ destroy(_Session) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec open(clientinfo(), emqx_types:conninfo()) ->
|
-spec open(clientinfo(), emqx_types:conninfo()) ->
|
||||||
{true, session(), _ReplayContext :: [emqx_types:message()]} | false.
|
{true, session(), replayctx()} | false.
|
||||||
open(ClientInfo = #{clientid := ClientId}, _ConnInfo) ->
|
open(ClientInfo = #{clientid := ClientId}, _ConnInfo) ->
|
||||||
case
|
case
|
||||||
emqx_cm:takeover_channel_session(
|
emqx_cm:takeover_channel_session(
|
||||||
|
@ -667,9 +668,23 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
|
||||||
ok = emqx_hooks:run('session.resumed', [ClientInfo, emqx_session:info(Session)]),
|
ok = emqx_hooks:run('session.resumed', [ClientInfo, emqx_session:info(Session)]),
|
||||||
Session.
|
Session.
|
||||||
|
|
||||||
-spec replay(emqx_types:clientinfo(), [emqx_types:message()], session()) ->
|
-spec replay(emqx_types:clientinfo(), replayctx(), session()) ->
|
||||||
{ok, replies(), session()}.
|
{ok, replies(), session()}.
|
||||||
replay(ClientInfo, Pendings, Session) ->
|
replay(ClientInfo, Pendings, Session) ->
|
||||||
|
% NOTE
|
||||||
|
% Here, `Pendings` is a list messages that were pending delivery in the remote
|
||||||
|
% session, see `clean_session/3`. It's a replay context that gets passed back
|
||||||
|
% here after the remote session is taken over by `open/2`. When we have a set
|
||||||
|
% of remote deliveries and a set of local deliveries, some publishes might actually
|
||||||
|
% be in both sets, because there's a tiny amount of time when both remote and local
|
||||||
|
% sessions were subscribed to the same set of topics simultaneously (i.e. after
|
||||||
|
% local session calls `resume/2` but before remote session calls `takeover/1`
|
||||||
|
% through `emqx_channel:handle_call({takeover, 'end'}, Channel)`).
|
||||||
|
% We basically need to:
|
||||||
|
% 1. Combine and deduplicate remote and local pending messages, so that no message
|
||||||
|
% is delivered twice.
|
||||||
|
% 2. Replay deliveries of the inflight messages, this time to the new channel.
|
||||||
|
% 3. Deliver the combined pending messages, following the same logic as `deliver/3`.
|
||||||
PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session),
|
PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session),
|
||||||
{ok, PubsResendQueued, Session1} = replay(ClientInfo, Session),
|
{ok, PubsResendQueued, Session1} = replay(ClientInfo, Session),
|
||||||
{ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1),
|
{ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1),
|
||||||
|
|
|
@ -243,6 +243,12 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) ->
|
||||||
node => node()
|
node => node()
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
% NOTE
|
||||||
|
% Here we aggregate and deduplicate remote and local pending deliveries,
|
||||||
|
% throwing away any local deliveries that are part of some shared
|
||||||
|
% subscription. Remote deliviries pertaining to shared subscriptions should
|
||||||
|
% already have been thrown away by `emqx_channel:handle_deliver/2`.
|
||||||
|
% See also: `emqx_channel:maybe_resume_session/1`, `emqx_session_mem:replay/3`.
|
||||||
DeliversLocal = emqx_channel:maybe_nack(emqx_utils:drain_deliver()),
|
DeliversLocal = emqx_channel:maybe_nack(emqx_utils:drain_deliver()),
|
||||||
PendingsAll = emqx_session_mem:dedup(ClientInfo, Pendings, DeliversLocal, Session),
|
PendingsAll = emqx_session_mem:dedup(ClientInfo, Pendings, DeliversLocal, Session),
|
||||||
NSession = emqx_session_mem:enqueue(ClientInfo, PendingsAll, Session),
|
NSession = emqx_session_mem:enqueue(ClientInfo, PendingsAll, Session),
|
||||||
|
|
Loading…
Reference in New Issue