diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index c1ed6aabd..83ed5d465 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -571,7 +571,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> Session = replay_streams(Session0#{replay => Streams}, ClientInfo), {ok, [], Session}. -replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) -> +replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -> case replay_batch(Srs0, Session0, ClientInfo) of Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); @@ -579,7 +579,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) RetryTimeout = ?TIMEOUT_RETRY_REPLAY, ?SLOG(warning, #{ msg => "failed_to_fetch_replay_batch", - stream => Srs0, + stream => StreamKey, reason => Reason, class => recoverable, retry_in_ms => RetryTimeout @@ -867,7 +867,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> %% TODO: Handle unrecoverable error. ?SLOG(info, #{ msg => "failed_to_fetch_batch", - stream => Srs1, + stream => StreamKey, reason => Reason, class => Class }), diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 286d32ef4..154f59b44 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> ?SLOG(debug, #{ msg => new_stream, key => Key, stream => Stream }), - {ok, Iterator} = emqx_ds:make_iterator( - ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime - ), - NewStreamState = #srs{ - rank_x = RankX, - rank_y = RankY, - it_begin = Iterator, - it_end = Iterator - }, - emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of + {ok, Iterator} -> + NewStreamState = #srs{ + rank_x = RankX, + rank_y = RankY, + it_begin = Iterator, + it_end = Iterator + }, + emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + {error, recoverable, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_initialize_stream_iterator", + stream => Stream, + class => recoverable, + reason => Reason + }), + S + end; #srs{} -> S end.