diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5c5fe5b82..4757e8670 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -478,14 +478,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> - Session1 = replay_streams(Session0, ClientInfo), - Session = - case ?IS_REPLAY_ONGOING(Session1) of - true -> - emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1); - false -> - Session1 - end, + Session = replay_streams(Session0, ClientInfo), {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), @@ -533,13 +526,15 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); {error, recoverable, Reason} -> + RetryTimeout = ?TIMEOUT_RETRY_REPLAY, ?SLOG(warning, #{ msg => "failed_to_fetch_replay_batch", stream => Srs0, reason => Reason, - class => recoverable + class => recoverable, + retry_in_ms => RetryTimeout }), - Session0 + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0) %% TODO: Handle unrecoverable errors. end; replay_streams(Session0 = #{replay := []}, _ClientInfo) ->