feat(sessds): handle recoverable errors during replay

This commit is contained in:
Andrew Mayorov 2024-03-01 19:11:40 +01:00
parent 2146d9e1fe
commit 1cf672e78d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 77 additions and 31 deletions

View File

@ -123,7 +123,12 @@
-define(TIMER_PULL, timer_pull).
-define(TIMER_GET_STREAMS, timer_get_streams).
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT.
-define(TIMER_RETRY_REPLAY, timer_retry_replay).
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT | ?TIMER_RETRY_REPLAY.
%% TODO: Needs configuration?
-define(TIMEOUT_RETRY_REPLAY, 1000).
-type session() :: #{
%% Client ID
@ -134,6 +139,8 @@
s := emqx_persistent_session_ds_state:t(),
%% Buffer:
inflight := emqx_persistent_session_ds_inflight:t(),
%% In-progress replay:
replay => [{_StreamKey, stream_state()}, ...],
%% Timers:
timer() => reference()
}.
@ -454,7 +461,7 @@ handle_timeout(
ClientInfo,
?TIMER_PULL,
Session0
) ->
) when not is_map_key(replay, Session0) ->
{Publishes, Session1} = drain_buffer(fetch_new_messages(Session0, ClientInfo)),
Timeout =
case Publishes of
@ -465,6 +472,12 @@ handle_timeout(
end,
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
{ok, Publishes, Session};
handle_timeout(ClientInfo, ?TIMER_PULL, Session0 = #{replay := [_ | _]}) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
@ -503,30 +516,44 @@ bump_last_alive(S0) ->
{ok, replies(), session()}.
replay(ClientInfo, [], Session0 = #{s := S0}) ->
Streams = emqx_persistent_session_ds_stream_scheduler:find_replay_streams(S0),
Session = lists:foldl(
fun({_StreamKey, Stream}, SessionAcc) ->
replay_batch(Stream, SessionAcc, ClientInfo)
end,
Session0,
Streams
),
Session = replay_streams(Session0#{replay => Streams}, ClientInfo),
{ok, [], Session}.
replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) ->
case replay_batch(Srs0, Session0, ClientInfo) of
Session = #{} ->
replay_streams(Session#{replay := Rest}, ClientInfo);
{error, _, _} ->
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session0)
end;
replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
Session = maps:remove(replay, Session0),
%% Note: we filled the buffer with the historical messages, and
%% from now on we'll rely on the normal inflight/flow control
%% mechanisms to replay them:
{ok, [], pull_now(Session)}.
pull_now(Session).
-spec replay_batch(stream_state(), session(), clientinfo()) -> session().
replay_batch(Srs0, Session, ClientInfo) ->
replay_batch(Srs0, Session0, ClientInfo) ->
#srs{batch_size = BatchSize} = Srs0,
%% TODO: retry on errors:
{Srs, Inflight} = enqueue_batch(true, BatchSize, Srs0, Session, ClientInfo),
%% Assert:
Srs =:= Srs0 orelse
?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
expected => Srs0,
got => Srs
}),
Session#{inflight => Inflight}.
case enqueue_batch(true, BatchSize, Srs0, Session0, ClientInfo) of
{ok, Srs, Session} ->
%% Assert:
Srs =:= Srs0 orelse
?tp(warning, emqx_persistent_session_ds_replay_inconsistency, #{
expected => Srs0,
got => Srs
}),
Session;
{error, recoverable, Reason} = Error ->
?SLOG(warning, #{
msg => "failed_to_fetch_replay_batch",
stream => Srs0,
reason => Reason,
class => recoverable
}),
Error
end.
%%--------------------------------------------------------------------
@ -743,7 +770,7 @@ fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo
fetch_new_messages(Streams, Session, ClientInfo)
end.
new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) ->
new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
SN1 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S0),
SN2 = emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S0),
Srs1 = Srs0#srs{
@ -753,11 +780,30 @@ new_batch({StreamKey, Srs0}, BatchSize, Session = #{s := S0}, ClientInfo) ->
last_seqno_qos1 = SN1,
last_seqno_qos2 = SN2
},
{Srs, Inflight} = enqueue_batch(false, BatchSize, Srs1, Session, ClientInfo),
S1 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_1), Srs#srs.last_seqno_qos1, S0),
S2 = emqx_persistent_session_ds_state:put_seqno(?next(?QOS_2), Srs#srs.last_seqno_qos2, S1),
S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
Session#{s => S, inflight => Inflight}.
case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
{ok, Srs, Session} ->
S1 = emqx_persistent_session_ds_state:put_seqno(
?next(?QOS_1),
Srs#srs.last_seqno_qos1,
S0
),
S2 = emqx_persistent_session_ds_state:put_seqno(
?next(?QOS_2),
Srs#srs.last_seqno_qos2,
S1
),
S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S2),
Session#{s => S};
{error, Class, Reason} ->
%% TODO: Handle unrecoverable error.
?SLOG(info, #{
msg => "failed_to_fetch_batch",
stream => Srs1,
reason => Reason,
class => Class
}),
Session0
end.
enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) ->
#srs{
@ -786,13 +832,13 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli
last_seqno_qos1 = LastSeqnoQos1,
last_seqno_qos2 = LastSeqnoQos2
},
{Srs, Inflight};
{ok, Srs, Session#{inflight := Inflight}};
{ok, end_of_stream} ->
%% No new messages; just update the end iterator:
{Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0}, Inflight0};
{error, _} when not IsReplay ->
?SLOG(info, #{msg => "failed_to_fetch_batch", iterator => ItBegin}),
{Srs0, Inflight0}
Srs = Srs0#srs{it_begin = ItBegin, it_end = end_of_stream, batch_size = 0},
{ok, Srs, Session#{inflight := Inflight0}};
{error, _, _} = Error ->
Error
end.
%% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->