fix(sessds): set replay retry timer if initial `replay/3` fails

This commit is contained in:
Andrew Mayorov 2024-03-07 12:45:10 +01:00
parent 09905d78cd
commit e7e8771277
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 5 additions and 10 deletions

View File

@ -478,14 +478,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
{ok, Publishes, Session};
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session1 = replay_streams(Session0, ClientInfo),
Session =
case ?IS_REPLAY_ONGOING(Session1) of
true ->
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1);
false ->
Session1
end,
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
@ -533,13 +526,15 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo)
Session = #{} ->
replay_streams(Session#{replay := Rest}, ClientInfo);
{error, recoverable, Reason} ->
RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
?SLOG(warning, #{
msg => "failed_to_fetch_replay_batch",
stream => Srs0,
reason => Reason,
class => recoverable
class => recoverable,
retry_in_ms => RetryTimeout
}),
Session0
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0)
%% TODO: Handle unrecoverable errors.
end;
replay_streams(Session0 = #{replay := []}, _ClientInfo) ->