From 07774ab060795c921cdb3797073349a340755fef Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 May 2024 21:52:19 +0200 Subject: [PATCH] feat(sessds): Handle unrecoverable errors --- apps/emqx/src/emqx_persistent_session_ds.erl | 48 +++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 923e17aaf..4bc6b4183 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -629,8 +629,10 @@ replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) - class => recoverable, retry_in_ms => RetryTimeout }), - emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0) - %% TODO: Handle unrecoverable errors. + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, RetryTimeout, Session0); + {error, unrecoverable, Reason} -> + Session1 = skip_batch(StreamKey, Srs0, Session0, ClientInfo, Reason), + replay_streams(Session1#{replay := Rest}, ClientInfo) end; replay_streams(Session0 = #{replay := []}, _ClientInfo) -> Session = maps:remove(replay, Session0), @@ -655,6 +657,39 @@ replay_batch(Srs0, Session0, ClientInfo) -> Error end. +%% Handle `{error, unrecoverable, _}' returned by `enqueue_batch'. +%% Most likely they mean that the generation containing the messages +%% has been removed. +-spec skip_batch(_StreamKey, stream_state(), session(), clientinfo(), _Reason) -> session(). +skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) -> + ?SLOG(info, #{ + msg => "session_ds_replay_unrecoverable_error", + reason => Reason, + srs => SRS0 + }), + GenEvents = fun + F(QoS, SeqNo, LastSeqNo) when SeqNo < LastSeqNo -> + FakeMsg = #message{ + id = <<>>, + qos = QoS, + payload = <<>>, + topic = <<>>, + timestamp = 0 + }, + _ = emqx_session_events:handle_event(ClientInfo, {expired, FakeMsg}), + F(QoS, inc_seqno(QoS, SeqNo), LastSeqNo); + F(_, _, _) -> + ok + end, + %% Treat messages as expired: + GenEvents(?QOS_1, SRS0#srs.first_seqno_qos1, SRS0#srs.last_seqno_qos1), + GenEvents(?QOS_2, SRS0#srs.first_seqno_qos2, SRS0#srs.last_seqno_qos2), + SRS = SRS0#srs{it_end = end_of_stream, batch_size = 0}, + %% That's it for the iterator. Mark SRS as reached the + %% `end_of_stream', and let stream scheduler do the rest: + S = emqx_persistent_session_ds_state:put_stream(StreamKey, SRS, S0), + Session#{s := S}. + %%-------------------------------------------------------------------- -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. @@ -923,15 +958,16 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> ), S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2), Session#{s => S}; - {error, Class, Reason} -> - %% TODO: Handle unrecoverable error. + {error, recoverable, Reason} -> ?SLOG(debug, #{ msg => "failed_to_fetch_batch", stream => StreamKey, reason => Reason, - class => Class + class => recoverable }), - Session0 + Session0; + {error, unrecoverable, Reason} -> + skip_batch(StreamKey, Srs1, Session0, ClientInfo, Reason) end. enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->