wip(sessds): handle recoverable errors in stream scheduler

This commit is contained in:
Andrew Mayorov 2024-03-26 18:23:08 +01:00
parent c6085a6ab0
commit 393a1b1391
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 19 additions and 11 deletions

View File

@ -569,7 +569,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo)
RetryTimeout = ?TIMEOUT_RETRY_REPLAY, RetryTimeout = ?TIMEOUT_RETRY_REPLAY,
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "failed_to_fetch_replay_batch", msg => "failed_to_fetch_replay_batch",
stream => Srs0, iterator => Srs0#srs.it_end,
reason => Reason, reason => Reason,
class => recoverable, class => recoverable,
retry_in_ms => RetryTimeout retry_in_ms => RetryTimeout

View File

@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => new_stream, key => Key, stream => Stream msg => new_stream, key => Key, stream => Stream
}), }),
{ok, Iterator} = emqx_ds:make_iterator( case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime {ok, Iterator} ->
), NewStreamState = #srs{
NewStreamState = #srs{ rank_x = RankX,
rank_x = RankX, rank_y = RankY,
rank_y = RankY, it_begin = Iterator,
it_begin = Iterator, it_end = Iterator
it_end = Iterator },
}, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); {error, recoverable, Reason} ->
?SLOG(warning, #{
msg => "failed_to_initialize_stream_iterator",
stream => {Key, Stream},
class => recoverable,
reason => Reason
}),
S
end;
#srs{} -> #srs{} ->
S S
end. end.