From bab526be242490d3cc98f82c2880e8e581b76deb Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 10 Jul 2024 23:05:40 +0300 Subject: [PATCH] feat(queue): self-revoke all shared streams on session open --- .../emqx_persistent_session_ds_shared_subs.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 1b7a1b420..bbaf3fd10 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -118,19 +118,20 @@ new(Opts) -> -spec open(emqx_persistent_session_ds_state:t(), opts()) -> {ok, emqx_persistent_session_ds_state:t(), t()}. -open(S, Opts) -> +open(S0, Opts) -> SharedSubscriptions = fold_shared_subs( fun(#share{} = ShareTopicFilter, Sub, Acc) -> - [{ShareTopicFilter, to_agent_subscription(S, Sub)} | Acc] + [{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc] end, [], - S + S0 ), Agent = emqx_persistent_session_ds_shared_subs_agent:open( SharedSubscriptions, agent_opts(Opts) ), SharedSubS = #{agent => Agent, scheduled_actions => #{}}, - {ok, S, SharedSubS}. + S1 = revoke_all_streams(S0), + {ok, S1, SharedSubS}. %%-------------------------------------------------------------------- %% on_subscribe