From b8e8f7c8e06911d0990e0e6ec90894f1aed2b199 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 10 Jul 2024 22:23:50 +0300 Subject: [PATCH] feat(queue): add pre_renew_streams callback --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- .../emqx_persistent_session_ds_shared_subs.erl | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 124b1919a..0984f9de8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -624,7 +624,7 @@ handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_ %% `gc` and `renew_streams` methods may drop unsubscribed streams. %% Shared subscription handler must have a chance to see unsubscribed streams %% in the fully replayed state. - {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0), + {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0), S2 = emqx_persistent_session_ds_subs:gc(S1), S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2), {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1), diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 506114f35..1b7a1b420 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -46,6 +46,7 @@ on_streams_replay/2, on_info/3, + pre_renew_streams/2, renew_streams/2, to_map/2 ]). @@ -299,6 +300,14 @@ schedule_unsubscribe( SharedSubS0#{scheduled_actions := ScheduledActions1} end. +%%-------------------------------------------------------------------- +%% pre_renew_streams + +-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) -> + {emqx_persistent_session_ds_state:t(), t()}. +pre_renew_streams(S, SharedSubS) -> + on_streams_replay(S, SharedSubS). + %%-------------------------------------------------------------------- %% renew_streams