feat(queue): add pre_renew_streams callback

This commit is contained in:
Ilya Averyanov 2024-07-10 22:23:50 +03:00
parent a97a0d6400
commit b8e8f7c8e0
2 changed files with 10 additions and 1 deletions

View File

@ -624,7 +624,7 @@ handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
%% Shared subscription handler must have a chance to see unsubscribed streams
%% in the fully replayed state.
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
S2 = emqx_persistent_session_ds_subs:gc(S1),
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),

View File

@ -46,6 +46,7 @@
on_streams_replay/2,
on_info/3,
pre_renew_streams/2,
renew_streams/2,
to_map/2
]).
@ -299,6 +300,14 @@ schedule_unsubscribe(
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
%%--------------------------------------------------------------------
%% pre_renew_streams
-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
pre_renew_streams(S, SharedSubS) ->
on_streams_replay(S, SharedSubS).
%%--------------------------------------------------------------------
%% renew_streams