fix(sessds): Avoid double-enriching transient messages

This commit is contained in:
ieQu1 2024-04-19 13:39:04 +02:00
parent f1e6565ddd
commit ede7246882
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 3 additions and 11 deletions

View File

@ -1066,7 +1066,9 @@ process_batch(
%% Transient messages
%%--------------------------------------------------------------------
enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) ->
enqueue_transient(
_ClientInfo, Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}
) ->
%% TODO: Such messages won't be retransmitted, should the session
%% reconnect before transient messages are acked.
%%
@ -1076,16 +1078,6 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos :
%% queued messages. Since streams in this DB are exclusive to the
%% session, messages from the queue can be dropped as soon as they
%% are acked.
case emqx_persistent_session_ds_state:get_subscription(Msg0#message.topic, S) of
#{current_state := CS} ->
#{subopts := SubOpts} = emqx_persistent_session_ds_state:get_subscription_state(CS, S);
undefined ->
SubOpts = undefined
end,
Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS),
lists:foldl(fun do_enqueue_transient/2, Session, Msgs).
do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) ->
case Qos of
?QOS_0 ->
S = S0,