From ede72468827ad3eb1f1086b1d923291add4c156b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 19 Apr 2024 13:39:04 +0200 Subject: [PATCH] fix(sessds): Avoid double-enriching transient messages --- apps/emqx/src/emqx_persistent_session_ds.erl | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 20c382934..4bfefe5b6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -1066,7 +1066,9 @@ process_batch( %% Transient messages %%-------------------------------------------------------------------- -enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) -> +enqueue_transient( + _ClientInfo, Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0} +) -> %% TODO: Such messages won't be retransmitted, should the session %% reconnect before transient messages are acked. %% @@ -1076,16 +1078,6 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos : %% queued messages. Since streams in this DB are exclusive to the %% session, messages from the queue can be dropped as soon as they %% are acked. - case emqx_persistent_session_ds_state:get_subscription(Msg0#message.topic, S) of - #{current_state := CS} -> - #{subopts := SubOpts} = emqx_persistent_session_ds_state:get_subscription_state(CS, S); - undefined -> - SubOpts = undefined - end, - Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), - lists:foldl(fun do_enqueue_transient/2, Session, Msgs). - -do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> case Qos of ?QOS_0 -> S = S0,