From e7e8771277a759731f179680dd38137991a51d8a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 7 Mar 2024 12:45:10 +0100 Subject: [PATCH] fix(sessds): set replay retry timer if initial `replay/3` fails --- apps/emqx/src/emqx_persistent_session_ds.erl | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5c5fe5b82..4757e8670 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -478,14 +478,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> - Session1 = replay_streams(Session0, ClientInfo), - Session = - case ?IS_REPLAY_ONGOING(Session1) of - true -> - emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1); - false -> - Session1 - end, + Session = replay_streams(Session0, ClientInfo), {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), @@ -533,13 +526,15 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); {error, recoverable, Reason} -> + RetryTimeout = ?TIMEOUT_RETRY_REPLAY, ?SLOG(warning, #{ msg => "failed_to_fetch_replay_batch", stream => Srs0, reason => Reason, - class => recoverable + class => recoverable, + retry_in_ms => RetryTimeout }), - Session0 + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0) %% TODO: Handle unrecoverable errors. end; replay_streams(Session0 = #{replay := []}, _ClientInfo) ->