chore: ensure comments follow code style consistently
This commit is contained in:
parent
8670efbfa0
commit
98706cd215
|
@ -939,8 +939,8 @@ handle_deliver(
|
||||||
clientinfo = ClientInfo
|
clientinfo = ClientInfo
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
% NOTE
|
%% NOTE
|
||||||
% This is essentially part of `emqx_session_mem` logic, thus call it directly.
|
%% This is essentially part of `emqx_session_mem` logic, thus call it directly.
|
||||||
Delivers1 = maybe_nack(Delivers),
|
Delivers1 = maybe_nack(Delivers),
|
||||||
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
|
Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session),
|
||||||
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
|
NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session),
|
||||||
|
@ -1072,10 +1072,10 @@ return_connack(AckPacket, Channel) ->
|
||||||
},
|
},
|
||||||
{Packets, NChannel2} = do_deliver(Publishes, NChannel1),
|
{Packets, NChannel2} = do_deliver(Publishes, NChannel1),
|
||||||
Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0],
|
Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0],
|
||||||
% NOTE
|
%% NOTE
|
||||||
% Session timers are not restored here, so there's a tiny chance that
|
%% Session timers are not restored here, so there's a tiny chance that
|
||||||
% the session becomes stuck, when it already has no place to track new
|
%% the session becomes stuck, when it already has no place to track new
|
||||||
% messages.
|
%% messages.
|
||||||
{ok, Replies ++ Outgoing, NChannel2}
|
{ok, Replies ++ Outgoing, NChannel2}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -1172,8 +1172,8 @@ handle_call(
|
||||||
conninfo = #{clientid := ClientId}
|
conninfo = #{clientid := ClientId}
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
% NOTE
|
%% NOTE
|
||||||
% This is essentially part of `emqx_session_mem` logic, thus call it directly.
|
%% This is essentially part of `emqx_session_mem` logic, thus call it directly.
|
||||||
ok = emqx_session_mem:takeover(Session),
|
ok = emqx_session_mem:takeover(Session),
|
||||||
%% TODO: Should not drain deliver here (side effect)
|
%% TODO: Should not drain deliver here (side effect)
|
||||||
Delivers = emqx_utils:drain_deliver(),
|
Delivers = emqx_utils:drain_deliver(),
|
||||||
|
|
|
@ -104,12 +104,12 @@ create(#{clientid := ClientID}, _ConnInfo, Conf) ->
|
||||||
-spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
|
-spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
|
||||||
{_IsPresent :: true, session(), []} | {_IsPresent :: false, session()}.
|
{_IsPresent :: true, session(), []} | {_IsPresent :: false, session()}.
|
||||||
open(#{clientid := ClientID}, _ConnInfo, Conf) ->
|
open(#{clientid := ClientID}, _ConnInfo, Conf) ->
|
||||||
% NOTE
|
%% NOTE
|
||||||
% The fact that we need to concern about discarding all live channels here
|
%% The fact that we need to concern about discarding all live channels here
|
||||||
% is essentially a consequence of the in-memory session design, where we
|
%% is essentially a consequence of the in-memory session design, where we
|
||||||
% have disconnected channels holding onto session state. Ideally, we should
|
%% have disconnected channels holding onto session state. Ideally, we should
|
||||||
% somehow isolate those idling not-yet-expired sessions into a separate process
|
%% somehow isolate those idling not-yet-expired sessions into a separate process
|
||||||
% space, and move this call back into `emqx_cm` where it belongs.
|
%% space, and move this call back into `emqx_cm` where it belongs.
|
||||||
ok = emqx_cm:discard_session(ClientID),
|
ok = emqx_cm:discard_session(ClientID),
|
||||||
{IsNew, Session} = open_session(ClientID, Conf),
|
{IsNew, Session} = open_session(ClientID, Conf),
|
||||||
IsPresent = not IsNew,
|
IsPresent = not IsNew,
|
||||||
|
|
|
@ -185,8 +185,8 @@ get_mqtt_conf(Zone, Key, Default) ->
|
||||||
|
|
||||||
-spec destroy(session() | clientinfo()) -> ok.
|
-spec destroy(session() | clientinfo()) -> ok.
|
||||||
destroy(_Session) ->
|
destroy(_Session) ->
|
||||||
% NOTE
|
%% NOTE
|
||||||
% This is a stub. This session impl has no backing store, thus always `ok`.
|
%% This is a stub. This session impl has no backing store, thus always `ok`.
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -674,20 +674,20 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
|
||||||
-spec replay(emqx_types:clientinfo(), replayctx(), session()) ->
|
-spec replay(emqx_types:clientinfo(), replayctx(), session()) ->
|
||||||
{ok, replies(), session()}.
|
{ok, replies(), session()}.
|
||||||
replay(ClientInfo, Pendings, Session) ->
|
replay(ClientInfo, Pendings, Session) ->
|
||||||
% NOTE
|
%% NOTE
|
||||||
% Here, `Pendings` is a list messages that were pending delivery in the remote
|
%% Here, `Pendings` is a list messages that were pending delivery in the remote
|
||||||
% session, see `clean_session/3`. It's a replay context that gets passed back
|
%% session, see `clean_session/3`. It's a replay context that gets passed back
|
||||||
% here after the remote session is taken over by `open/2`. When we have a set
|
%% here after the remote session is taken over by `open/2`. When we have a set
|
||||||
% of remote deliveries and a set of local deliveries, some publishes might actually
|
%% of remote deliveries and a set of local deliveries, some publishes might actually
|
||||||
% be in both sets, because there's a tiny amount of time when both remote and local
|
%% be in both sets, because there's a tiny amount of time when both remote and local
|
||||||
% sessions were subscribed to the same set of topics simultaneously (i.e. after
|
%% sessions were subscribed to the same set of topics simultaneously (i.e. after
|
||||||
% local session calls `resume/2` but before remote session calls `takeover/1`
|
%% local session calls `resume/2` but before remote session calls `takeover/1`
|
||||||
% through `emqx_channel:handle_call({takeover, 'end'}, Channel)`).
|
%% through `emqx_channel:handle_call({takeover, 'end'}, Channel)`).
|
||||||
% We basically need to:
|
%% We basically need to:
|
||||||
% 1. Combine and deduplicate remote and local pending messages, so that no message
|
%% 1. Combine and deduplicate remote and local pending messages, so that no message
|
||||||
% is delivered twice.
|
%% is delivered twice.
|
||||||
% 2. Replay deliveries of the inflight messages, this time to the new channel.
|
%% 2. Replay deliveries of the inflight messages, this time to the new channel.
|
||||||
% 3. Deliver the combined pending messages, following the same logic as `deliver/3`.
|
%% 3. Deliver the combined pending messages, following the same logic as `deliver/3`.
|
||||||
PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session),
|
PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session),
|
||||||
{ok, PubsResendQueued, Session1} = replay(ClientInfo, Session),
|
{ok, PubsResendQueued, Session1} = replay(ClientInfo, Session),
|
||||||
{ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1),
|
{ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1),
|
||||||
|
|
Loading…
Reference in New Issue