diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 2cbf65b47..3517d6b73 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -123,7 +123,12 @@ -define(TIMER_PULL, timer_pull). -define(TIMER_GET_STREAMS, timer_get_streams). -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). --type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. +-define(TIMER_RETRY_REPLAY, timer_retry_replay). + +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT | ?TIMER_RETRY_REPLAY. + +%% TODO: Needs configuration? +-define(TIMEOUT_RETRY_REPLAY, 1000). -type session() :: #{ %% Client ID @@ -134,6 +139,8 @@ s := emqx_persistent_session_ds_state:t(), %% Buffer: inflight := emqx_persistent_session_ds_inflight:t(), + %% In-progress replay: + replay => [{_StreamKey, stream_state()}, ...], %% Timers: timer() => reference() }. @@ -454,7 +461,7 @@ handle_timeout( ClientInfo, ?TIMER_PULL, Session0 -) -> +) when not is_map_key(replay, Session0) -> {Publishes, Session1} = drain_buffer(fetch_new_messages(Session0, ClientInfo)), Timeout = case Publishes of @@ -465,6 +472,12 @@ handle_timeout( end, Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; +handle_timeout(ClientInfo, ?TIMER_PULL, Session0 = #{replay := [_ | _]}) -> + Session = replay_streams(Session0, ClientInfo), + {ok, [], Session}; +handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> + Session = replay_streams(Session0, ClientInfo), + {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), @@ -503,30 +516,44 @@ bump_last_alive(S0) -> {ok, replies(), session()}. replay(ClientInfo, [], Session0 = #{s := S0}) -> Streams = emqx_persistent_session_ds_stream_scheduler:find_replay_streams(S0), - Session = lists:foldl( - fun({_StreamKey, Stream}, SessionAcc) -> - replay_batch(Stream, SessionAcc, ClientInfo) - end, - Session0, - Streams - ), + Session = replay_streams(Session0#{replay => Streams}, ClientInfo), + {ok, [], Session}. + +replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) -> + case replay_batch(Srs0, Session0, ClientInfo) of + Session = #{} -> + replay_streams(Session#{replay := Rest}, ClientInfo); + {error, _, _} -> + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session0) + end; +replay_streams(Session0 = #{replay := []}, _ClientInfo) -> + Session = maps:remove(replay, Session0), %% Note: we filled the buffer with the historical messages, and %% from now on we'll rely on the normal inflight/flow control %% mechanisms to replay them: - {ok, [], pull_now(Session)}. + pull_now(Session). -spec replay_batch(stream_state(), session(), clientinfo()) -> session(). -replay_batch(Srs0, Session, ClientInfo) -> +replay_batch(Srs0, Session0, ClientInfo) -> #srs{batch_size = BatchSize} = Srs0, - %% TODO: retry on errors: - {Srs, Inflight} = enqueue_batch(true, BatchSize, Srs0, Session, ClientInfo), - %% Assert: - Srs =:= Srs0 orelse - ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{ - expected => Srs0, - got => Srs - }), - Session#{inflight => Inflight}. + case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of + {ok, Srs, Session} -> + %% Assert: + Srs =:= Srs0 orelse + ?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{ + expected => Srs0, + got => Srs + }), + Session; + {error, recoverable, Reason} = Error -> + ?SLOG(warning, #{ + msg => "failed_to_fetch_replay_batch", + stream => Srs0, + reason => Reason, + class => recoverable + }), + Error + end. %%-------------------------------------------------------------------- @@ -743,7 +770,7 @@ fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo fetch_new_messages(Streams, Session, ClientInfo) end. -new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> +new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0), SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0), Srs1 = Srs0#srs{ @@ -753,11 +780,30 @@ new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) -> last_seqno_qos1 = SN1, last_seqno_qos2 = SN2 }, - {Srs, Inflight} = enqueue_batch(false, BatchSize, Srs1, Session, ClientInfo), - S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Srs#srs.last_seqno_qos1, S0), - S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Srs#srs.last_seqno_qos2, S1), - S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2), - Session#{s => S, inflight => Inflight}. + case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of + {ok, Srs, Session} -> + S1 = emqx_persistent_session_ds_state:put_seqno( + ?next(?QOS_1), + Srs#srs.last_seqno_qos1, + S0 + ), + S2 = emqx_persistent_session_ds_state:put_seqno( + ?next(?QOS_2), + Srs#srs.last_seqno_qos2, + S1 + ), + S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2), + Session#{s => S}; + {error, Class, Reason} -> + %% TODO: Handle unrecoverable error. + ?SLOG(info, #{ + msg => "failed_to_fetch_batch", + stream => Srs1, + reason => Reason, + class => Class + }), + Session0 + end. enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> #srs{ @@ -786,13 +832,13 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli last_seqno_qos1 = LastSeqnoQos1, last_seqno_qos2 = LastSeqnoQos2 }, - {Srs, Inflight}; + {ok, Srs, Session#{inflight := Inflight}}; {ok, end_of_stream} -> %% No new messages; just update the end iterator: - {Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0}; - {error, _} when not IsReplay -> - ?SLOG(info, #{msg => "failed_to_fetch_batch", iterator => ItBegin}), - {Srs0, Inflight0} + Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, + {ok, Srs, Session#{inflight := Inflight0}}; + {error, _, _} = Error -> + Error end. %% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->