From 393a1b1391b81d16ef69255579fbaa1758b07719 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 26 Mar 2024 18:23:08 +0100 Subject: [PATCH] wip(sessds): handle recoverable errors in stream scheduler --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- ...persistent_session_ds_stream_scheduler.erl | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1c727f6c9..744f6ca3f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -569,7 +569,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) RetryTimeout = ?TIMEOUT_RETRY_REPLAY, ?SLOG(warning, #{ msg => "failed_to_fetch_replay_batch", - stream => Srs0, + iterator => Srs0#srs.it_end, reason => Reason, class => recoverable, retry_in_ms => RetryTimeout diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 286d32ef4..5514aeaa9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> ?SLOG(debug, #{ msg => new_stream, key => Key, stream => Stream }), - {ok, Iterator} = emqx_ds:make_iterator( - ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime - ), - NewStreamState = #srs{ - rank_x = RankX, - rank_y = RankY, - it_begin = Iterator, - it_end = Iterator - }, - emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of + {ok, Iterator} -> + NewStreamState = #srs{ + rank_x = RankX, + rank_y = RankY, + it_begin = Iterator, + it_end = Iterator + }, + emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + {error, recoverable, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_initialize_stream_iterator", + stream => {Key, Stream}, + class => recoverable, + reason => Reason + }), + S + end; #srs{} -> S end.