feat(queue): self-revoke all shared streams on session open

This commit is contained in:
Ilya Averyanov 2024-07-10 23:05:40 +03:00
parent 9307a82004
commit bab526be24
1 changed files with 5 additions and 4 deletions

View File

@ -118,19 +118,20 @@ new(Opts) ->
-spec open(emqx_persistent_session_ds_state:t(), opts()) -> -spec open(emqx_persistent_session_ds_state:t(), opts()) ->
{ok, emqx_persistent_session_ds_state:t(), t()}. {ok, emqx_persistent_session_ds_state:t(), t()}.
open(S, Opts) -> open(S0, Opts) ->
SharedSubscriptions = fold_shared_subs( SharedSubscriptions = fold_shared_subs(
fun(#share{} = ShareTopicFilter, Sub, Acc) -> fun(#share{} = ShareTopicFilter, Sub, Acc) ->
[{ShareTopicFilter, to_agent_subscription(S, Sub)} | Acc] [{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc]
end, end,
[], [],
S S0
), ),
Agent = emqx_persistent_session_ds_shared_subs_agent:open( Agent = emqx_persistent_session_ds_shared_subs_agent:open(
SharedSubscriptions, agent_opts(Opts) SharedSubscriptions, agent_opts(Opts)
), ),
SharedSubS = #{agent => Agent, scheduled_actions => #{}}, SharedSubS = #{agent => Agent, scheduled_actions => #{}},
{ok, S, SharedSubS}. S1 = revoke_all_streams(S0),
{ok, S1, SharedSubS}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% on_subscribe %% on_subscribe