From 45d44df11d1f6d4c8481215555d4c222abedd1a8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 15 Sep 2023 14:34:11 +0400 Subject: [PATCH] refactor(session): update eviction channel session logic The changes partially reflect `emqx_channel` changes with respect to in-memory session specific logic. The difference is that eviction channel does not replay post-takeover, instead enqueues messages. --- apps/emqx/src/emqx_channel.erl | 2 +- apps/emqx/src/emqx_session_mem.erl | 28 ++++++++++--------- .../src/emqx_eviction_agent_channel.erl | 21 +++++++------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 2b89170b9..73f307d43 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1177,7 +1177,7 @@ handle_call( ok = emqx_session_mem:takeover(Session), %% TODO: Should not drain deliver here (side effect) Delivers = emqx_utils:drain_deliver(), - AllPendings = lists:append(Delivers, Pendings), + AllPendings = lists:append(Pendings, maybe_nack(Delivers)), ?tp( debug, emqx_channel_takeover_end, diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index a2d3ca91b..9322a9f28 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -100,7 +100,8 @@ resume/2, enqueue/3, dequeue/2, - replay/2 + replay/2, + dedup/4 ]). %% Export for CT @@ -669,19 +670,10 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec replay(emqx_types:clientinfo(), [emqx_types:message()], session()) -> {ok, replies(), session()}. replay(ClientInfo, Pendings, Session) -> - PendingsLocal = emqx_session:enrich_delivers( - ClientInfo, - emqx_utils:drain_deliver(), - Session - ), - PendingsLocal1 = lists:filter( - fun(Msg) -> not lists:keymember(Msg#message.id, #message.id, Pendings) end, - PendingsLocal - ), + PendingsAll = dedup(ClientInfo, Pendings, emqx_utils:drain_deliver(), Session), {ok, PubsResendQueued, Session1} = replay(ClientInfo, Session), - {ok, Pubs1, Session2} = deliver(ClientInfo, Pendings, Session1), - {ok, Pubs2, Session3} = deliver(ClientInfo, PendingsLocal1, Session2), - {ok, append(append(PubsResendQueued, Pubs1), Pubs2), Session3}. + {ok, PubsPending, Session2} = deliver(ClientInfo, PendingsAll, Session1), + {ok, append(PubsResendQueued, PubsPending), Session2}. -spec replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}. @@ -698,6 +690,16 @@ replay(ClientInfo, Session) -> {ok, More, Session1} = dequeue(ClientInfo, Session), {ok, append(PubsResend, More), Session1}. +-spec dedup(clientinfo(), [emqx_types:message()], [emqx_types:deliver()], session()) -> + [emqx_types:message()]. +dedup(ClientInfo, Pendings, DeliversLocal, Session) -> + PendingsLocal1 = emqx_session:enrich_delivers(ClientInfo, DeliversLocal, Session), + PendingsLocal2 = lists:filter( + fun(Msg) -> not lists:keymember(Msg#message.id, #message.id, Pendings) end, + PendingsLocal1 + ), + append(Pendings, PendingsLocal2). + append(L1, []) -> L1; append(L1, L2) -> L1 ++ L2. diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index f6ad11167..d7b35458c 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -7,7 +7,6 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_channel.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/types.hrl"). @@ -122,7 +121,9 @@ handle_call( pendings := Pendings } = Channel ) -> - ok = emqx_session:takeover(Session), + % NOTE + % This is essentially part of `emqx_session_mem` logic, thus call it directly. + ok = emqx_session_mem:takeover(Session), %% TODO: Should not drain deliver here (side effect) Delivers = emqx_utils:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), @@ -196,8 +197,11 @@ handle_deliver( clientinfo := ClientInfo } = Channel ) -> + % NOTE + % This is essentially part of `emqx_session_mem` logic, thus call it directly. Delivers1 = emqx_channel:maybe_nack(Delivers), - NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session), + Messages = emqx_session:enrich_delivers(ClientInfo, Delivers1, Session), + NSession = emqx_session_mem:enqueue(ClientInfo, Messages, Session), Channel#{session := NSession}. cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) -> @@ -230,7 +234,7 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> } ), {error, no_session}; - {ok, #{session := Session, present := true, pendings := Pendings0}} -> + {ok, #{session := Session, present := true, replay := Pendings}} -> ?SLOG( info, #{ @@ -239,12 +243,9 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> node => node() } ), - Pendings1 = lists:usort(lists:append(Pendings0, emqx_utils:drain_deliver())), - NSession = emqx_session:enqueue( - ClientInfo, - emqx_channel:maybe_nack(Pendings1), - Session - ), + DeliversLocal = emqx_channel:maybe_nack(emqx_utils:drain_deliver()), + PendingsAll = emqx_session_mem:dedup(ClientInfo, Pendings, DeliversLocal, Session), + NSession = emqx_session_mem:enqueue(ClientInfo, PendingsAll, Session), NChannel = Channel#{session => NSession}, ok = emqx_cm:insert_channel_info(ClientId, info(NChannel), stats(NChannel)), ?SLOG(