From 0cd23511341de3819835ff060bf40958ed2fe7c3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 19 Jul 2023 14:11:10 +0200 Subject: [PATCH 1/2] refactor(session): hide `no_local` logic behind enqueue / deliver This is a part of effort to minimize `emqx_session` module interface to simplify adding alternative session implementations. --- apps/emqx/src/emqx_channel.erl | 24 +++------- apps/emqx/src/emqx_session.erl | 47 +++++++++---------- .../emqx_persistent_session.erl | 16 +++---- apps/emqx/test/emqx_channel_SUITE.erl | 13 +++-- .../src/emqx_eviction_agent_channel.erl | 21 ++------- 5 files changed, 47 insertions(+), 74 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d879e5a2d..9f47c8f81 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -924,18 +924,13 @@ handle_deliver( Delivers, Channel = #channel{ takeover = true, - pendings = Pendings, - session = Session, - clientinfo = #{clientid := ClientId} = ClientInfo + pendings = Pendings } ) -> %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. - NPendings = lists:append( - Pendings, - emqx_session:ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session) - ), + NPendings = lists:append(Pendings, maybe_nack(Delivers)), {ok, Channel#channel{pendings = NPendings}}; handle_deliver( Delivers, @@ -943,12 +938,11 @@ handle_deliver( conn_state = disconnected, takeover = false, session = Session, - clientinfo = #{clientid := ClientId} = ClientInfo + clientinfo = ClientInfo } ) -> Delivers1 = maybe_nack(Delivers), - Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session), - NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session), + NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session), NChannel = Channel#channel{session = NSession}, {ok, NChannel}; handle_deliver( @@ -956,16 +950,10 @@ handle_deliver( Channel = #channel{ session = Session, takeover = false, - clientinfo = #{clientid := ClientId} = ClientInfo + clientinfo = ClientInfo } ) -> - case - emqx_session:deliver( - ClientInfo, - emqx_session:ignore_local(ClientInfo, Delivers, ClientId, Session), - Session - ) - of + case emqx_session:deliver(ClientInfo, Delivers, Session) of {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index db0059709..b15fcd2ed 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -309,27 +309,6 @@ info(created_at, #session{created_at = CreatedAt}) -> -spec stats(session()) -> emqx_types:stats(). stats(Session) -> info(?STATS_KEYS, Session). -%%-------------------------------------------------------------------- -%% Ignore local messages -%%-------------------------------------------------------------------- - -ignore_local(ClientInfo, Delivers, Subscriber, Session) -> - Subs = info(subscriptions, Session), - lists:filter( - fun({deliver, Topic, #message{from = Publisher} = Msg}) -> - case maps:find(Topic, Subs) of - {ok, #{nl := 1}} when Subscriber =:= Publisher -> - ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), - ok = emqx_metrics:inc('delivery.dropped'), - ok = emqx_metrics:inc('delivery.dropped.no_local'), - false; - _ -> - true - end - end, - Delivers - ). - %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE %%-------------------------------------------------------------------- @@ -610,7 +589,10 @@ deliver_msg( MarkedMsg = mark_begin_deliver(Msg), Inflight1 = emqx_inflight:insert(PacketId, with_ts(MarkedMsg), Inflight), {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})} - end. + end; +deliver_msg(ClientInfo, {drop, Msg, Reason}, Session) -> + handle_dropped(ClientInfo, Msg, Reason, Session), + {ok, Session}. -spec enqueue( emqx_types:clientinfo(), @@ -629,7 +611,10 @@ enqueue(ClientInfo, Delivers, Session) when is_list(Delivers) -> enqueue(ClientInfo, #message{} = Msg, Session = #session{mqueue = Q}) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), (Dropped =/= undefined) andalso handle_dropped(ClientInfo, Dropped, Session), - Session#session{mqueue = NewQ}. + Session#session{mqueue = NewQ}; +enqueue(ClientInfo, {drop, Msg, Reason}, Session) -> + handle_dropped(ClientInfo, Msg, Reason, Session), + Session. handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mqueue = Q}) -> Payload = emqx_message:to_log_map(Msg), @@ -666,8 +651,18 @@ handle_dropped(ClientInfo, Msg = #message{qos = QoS, topic = Topic}, #session{mq ) end. +handle_dropped(ClientInfo, Msg, Reason, _Session) -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, Reason]), + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'). + enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs}) -> - enrich_subopts(get_subopts(Topic, Subs), Msg, Session). + enrich_deliver(Msg, maps:find(Topic, Subs), Session). + +enrich_deliver(Msg = #message{from = ClientId}, {ok, #{nl := 1}}, #session{clientid = ClientId}) -> + {drop, Msg, no_local}; +enrich_deliver(Msg, SubOpts, Session) -> + enrich_subopts(mk_subopts(SubOpts), Msg, Session). maybe_ack(Msg) -> emqx_shared_sub:maybe_ack(Msg). @@ -675,8 +670,8 @@ maybe_ack(Msg) -> maybe_nack(Msg) -> emqx_shared_sub:maybe_nack_dropped(Msg). -get_subopts(Topic, SubMap) -> - case maps:find(Topic, SubMap) of +mk_subopts(SubOpts) -> + case SubOpts of {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}]; {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl index bfda233e1..111154571 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -272,7 +272,7 @@ remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) -> %% Must be called inside a emqx_cm_locker transaction. -spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session()) -> {emqx_session:session(), [emqx_types:deliver()]}. -resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> +resume(ClientInfo, ConnInfo, Session) -> SessionID = emqx_session:info(id, Session), ?tp(ps_resuming, #{from => db, sid => SessionID}), @@ -281,7 +281,6 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> %% 1. Get pending messages from DB. ?tp(ps_initial_pendings, #{sid => SessionID}), Pendings1 = pending(SessionID), - Pendings2 = emqx_session:ignore_local(ClientInfo, Pendings1, ClientID, Session), ?tp(ps_got_initial_pendings, #{ sid => SessionID, msgs => Pendings1 @@ -290,11 +289,11 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> %% 2. Enqueue messages to mimic that the process was alive %% when the messages were delivered. ?tp(ps_persist_pendings, #{sid => SessionID}), - Session1 = emqx_session:enqueue(ClientInfo, Pendings2, Session), + Session1 = emqx_session:enqueue(ClientInfo, Pendings1, Session), Session2 = persist(ClientInfo, ConnInfo, Session1), - mark_as_delivered(SessionID, Pendings2), + mark_as_delivered(SessionID, Pendings1), ?tp(ps_persist_pendings_msgs, #{ - msgs => Pendings2, + msgs => Pendings1, sid => SessionID }), @@ -312,11 +311,10 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> %% 5. Get pending messages from DB until we find all markers. ?tp(ps_marker_pendings, #{sid => SessionID}), MarkerIDs = [Marker || {_, Marker} <- NodeMarkers], - Pendings3 = pending(SessionID, MarkerIDs), - Pendings4 = emqx_session:ignore_local(ClientInfo, Pendings3, ClientID, Session), + Pendings2 = pending(SessionID, MarkerIDs), ?tp(ps_marker_pendings_msgs, #{ sid => SessionID, - msgs => Pendings4 + msgs => Pendings2 }), %% 6. Get pending messages from writers. @@ -329,7 +327,7 @@ resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> %% 7. Drain the inbox and usort the messages %% with the pending messages. (Should be done by caller.) - {Session2, Pendings4 ++ WriterPendings}. + {Session2, Pendings2 ++ WriterPendings}. resume_begin(Nodes, SessionID) -> Res = emqx_persistent_session_proto_v1:resume_begin(Nodes, self(), SessionID), diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index f266dbcfa..3cd0d411f 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -584,7 +584,7 @@ t_handle_deliver(_) -> t_handle_deliver_nl(_) -> ClientInfo = clientinfo(#{clientid => <<"clientid">>}), - Session = session(#{subscriptions => #{<<"t1">> => #{nl => 1}}}), + Session = session(ClientInfo, #{subscriptions => #{<<"t1">> => #{nl => 1}}}), Channel = channel(#{clientinfo => ClientInfo, session => Session}), Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>), NMsg = emqx_message:set_flag(nl, Msg), @@ -1071,11 +1071,14 @@ connpkt(Props) -> password = <<"passwd">> }. -session() -> session(#{}). -session(InitFields) when is_map(InitFields) -> +session() -> session(#{zone => default, clientid => <<"fake-test">>}, #{}). +session(InitFields) -> session(#{zone => default, clientid => <<"fake-test">>}, InitFields). +session(ClientInfo, InitFields) when is_map(InitFields) -> Conf = emqx_cm:get_session_confs( - #{zone => default, clientid => <<"fake-test">>}, #{ - receive_maximum => 0, expiry_interval => 0 + ClientInfo, + #{ + receive_maximum => 0, + expiry_interval => 0 } ), Session = emqx_session:init(Conf), diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index 7d0bc7528..f6ad11167 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -180,30 +180,24 @@ handle_deliver( Delivers, #{ takeover := true, - pendings := Pendings, - session := Session, - clientinfo := #{clientid := ClientId} = ClientInfo + pendings := Pendings } = Channel ) -> %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. - NPendings = lists:append( - Pendings, - emqx_session:ignore_local(ClientInfo, emqx_channel:maybe_nack(Delivers), ClientId, Session) - ), + NPendings = lists:append(Pendings, emqx_channel:maybe_nack(Delivers)), Channel#{pendings => NPendings}; handle_deliver( Delivers, #{ takeover := false, session := Session, - clientinfo := #{clientid := ClientId} = ClientInfo + clientinfo := ClientInfo } = Channel ) -> Delivers1 = emqx_channel:maybe_nack(Delivers), - Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session), - NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session), + NSession = emqx_session:enqueue(ClientInfo, Delivers1, Session), Channel#{session := NSession}. cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) -> @@ -248,12 +242,7 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> Pendings1 = lists:usort(lists:append(Pendings0, emqx_utils:drain_deliver())), NSession = emqx_session:enqueue( ClientInfo, - emqx_session:ignore_local( - ClientInfo, - emqx_channel:maybe_nack(Pendings1), - ClientId, - Session - ), + emqx_channel:maybe_nack(Pendings1), Session ), NChannel = Channel#{session => NSession}, From dd31487b4a9e5a74a98a79393870037b6c4c4865 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 19 Jul 2023 14:12:56 +0200 Subject: [PATCH 2/2] refactor(session): drop `is_session/1` helper as useless This is a part of effort to minimize `emqx_session` module interface to simplify adding alternative session implementations. --- apps/emqx/src/emqx_session.erl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index b15fcd2ed..3036887de 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -64,7 +64,6 @@ -export([ info/1, info/2, - is_session/1, stats/1, obtain_next_pkt_id/1, get_mqueue/1 @@ -88,7 +87,6 @@ enqueue/3, dequeue/2, filter_queue/2, - ignore_local/4, retry/2, terminate/3 ]). @@ -252,9 +250,6 @@ unpersist(Session) -> %% Info, Stats %%-------------------------------------------------------------------- -is_session(#session{}) -> true; -is_session(_) -> false. - %% @doc Get infos of the session. -spec info(session()) -> emqx_types:infos(). info(Session) ->