diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 536c9bd37..93cd872c4 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -939,8 +939,8 @@ handle_deliver( clientinfo = ClientInfo } ) -> - % NOTE - % This is essentially part of `emqx_session_mem` logic, thus call it directly. + %% NOTE + %% This is essentially part of `emqx_session_mem` logic, thus call it directly. Delivers1 = maybe_nack(Delivers), Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), @@ -1072,10 +1072,10 @@ return_connack(AckPacket, Channel) -> }, {Packets, NChannel2} = do_deliver(Publishes, NChannel1), Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0], - % NOTE - % Session timers are not restored here, so there's a tiny chance that - % the session becomes stuck, when it already has no place to track new - % messages. + %% NOTE + %% Session timers are not restored here, so there's a tiny chance that + %% the session becomes stuck, when it already has no place to track new + %% messages. {ok, Replies ++ Outgoing, NChannel2} end. @@ -1172,8 +1172,8 @@ handle_call( conninfo = #{clientid := ClientId} } ) -> - % NOTE - % This is essentially part of `emqx_session_mem` logic, thus call it directly. + %% NOTE + %% This is essentially part of `emqx_session_mem` logic, thus call it directly. ok = emqx_session_mem:takeover(Session), %% TODO: Should not drain deliver here (side effect) Delivers = emqx_utils:drain_deliver(), diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 6060aa4a4..f37eb7710 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -104,12 +104,12 @@ create(#{clientid := ClientID}, _ConnInfo, Conf) -> -spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | {_IsPresent :: false, session()}. open(#{clientid := ClientID}, _ConnInfo, Conf) -> - % NOTE - % The fact that we need to concern about discarding all live channels here - % is essentially a consequence of the in-memory session design, where we - % have disconnected channels holding onto session state. Ideally, we should - % somehow isolate those idling not-yet-expired sessions into a separate process - % space, and move this call back into `emqx_cm` where it belongs. + %% NOTE + %% The fact that we need to concern about discarding all live channels here + %% is essentially a consequence of the in-memory session design, where we + %% have disconnected channels holding onto session state. Ideally, we should + %% somehow isolate those idling not-yet-expired sessions into a separate process + %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), {IsNew, Session} = open_session(ClientID, Conf), IsPresent = not IsNew, diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 586196d36..578a4fb68 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -185,8 +185,8 @@ get_mqtt_conf(Zone, Key, Default) -> -spec destroy(session() | clientinfo()) -> ok. destroy(_Session) -> - % NOTE - % This is a stub. This session impl has no backing store, thus always `ok`. + %% NOTE + %% This is a stub. This session impl has no backing store, thus always `ok`. ok. %%-------------------------------------------------------------------- @@ -674,20 +674,20 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec replay(emqx_types:clientinfo(), replayctx(), session()) -> {ok, replies(), session()}. replay(ClientInfo, Pendings, Session) -> - % NOTE - % Here, `Pendings` is a list messages that were pending delivery in the remote - % session, see `clean_session/3`. It's a replay context that gets passed back - % here after the remote session is taken over by `open/2`. When we have a set - % of remote deliveries and a set of local deliveries, some publishes might actually - % be in both sets, because there's a tiny amount of time when both remote and local - % sessions were subscribed to the same set of topics simultaneously (i.e. after - % local session calls `resume/2` but before remote session calls `takeover/1` - % through `emqx_channel:handle_call({takeover, 'end'}, Channel)`). - % We basically need to: - % 1. Combine and deduplicate remote and local pending messages, so that no message - % is delivered twice. - % 2. Replay deliveries of the inflight messages, this time to the new channel. - % 3. Deliver the combined pending messages, following the same logic as `deliver/3`. + %% NOTE + %% Here, `Pendings` is a list messages that were pending delivery in the remote + %% session, see `clean_session/3`. It's a replay context that gets passed back + %% here after the remote session is taken over by `open/2`. When we have a set + %% of remote deliveries and a set of local deliveries, some publishes might actually + %% be in both sets, because there's a tiny amount of time when both remote and local + %% sessions were subscribed to the same set of topics simultaneously (i.e. after + %% local session calls `resume/2` but before remote session calls `takeover/1` + %% through `emqx_channel:handle_call({takeover, 'end'}, Channel)`). + %% We basically need to: + %% 1. Combine and deduplicate remote and local pending messages, so that no message + %% is delivered twice. + %% 2. Replay deliveries of the inflight messages, this time to the new channel. + %% 3. Deliver the combined pending messages, following the same logic as `deliver/3`. PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session), {ok, PubsResendQueued, Session1} = replay(ClientInfo, Session), {ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1),