fix(sessds): untangle pull and replay retry timers

And restore the convention that timer handler always manages only
its own timers.
This commit is contained in:
Andrew Mayorov 2024-03-05 20:16:52 +01:00
parent b39c710ec2
commit 3f3e33b2cb
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 20 additions and 11 deletions

View File

@ -140,11 +140,14 @@
%% Buffer: %% Buffer:
inflight := emqx_persistent_session_ds_inflight:t(), inflight := emqx_persistent_session_ds_inflight:t(),
%% In-progress replay: %% In-progress replay:
%% List of stream replay states to be added to the inflight buffer.
replay => [{_StreamKey, stream_state()}, ...], replay => [{_StreamKey, stream_state()}, ...],
%% Timers: %% Timers:
timer() => reference() timer() => reference()
}. }.
-define(IS_REPLAY_ONGOING(SESS), is_map_key(replay, SESS)).
-record(req_sync, { -record(req_sync, {
from :: pid(), from :: pid(),
ref :: reference() ref :: reference()
@ -457,12 +460,14 @@ deliver(ClientInfo, Delivers, Session0) ->
-spec handle_timeout(clientinfo(), _Timeout, session()) -> -spec handle_timeout(clientinfo(), _Timeout, session()) ->
{ok, replies(), session()} | {ok, replies(), timeout(), session()}. {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
handle_timeout( handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
ClientInfo, {Publishes, Session1} =
?TIMER_PULL, case ?IS_REPLAY_ONGOING(Session0) of
Session0 false ->
) when not is_map_key(replay, Session0) -> drain_buffer(fetch_new_messages(Session0, ClientInfo));
{Publishes, Session1} = drain_buffer(fetch_new_messages(Session0, ClientInfo)), true ->
{[], Session0}
end,
Timeout = Timeout =
case Publishes of case Publishes of
[] -> [] ->
@ -472,11 +477,15 @@ handle_timeout(
end, end,
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
{ok, Publishes, Session}; {ok, Publishes, Session};
handle_timeout(ClientInfo, ?TIMER_PULL, Session0 = #{replay := [_ | _]}) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo), Session1 = replay_streams(Session0, ClientInfo),
Session =
case ?IS_REPLAY_ONGOING(Session1) of
true ->
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1);
false ->
Session1
end,
{ok, [], Session}; {ok, [], Session};
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0), S1 = emqx_persistent_session_ds_subs:gc(S0),
@ -524,7 +533,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo)
Session = #{} -> Session = #{} ->
replay_streams(Session#{replay := Rest}, ClientInfo); replay_streams(Session#{replay := Rest}, ClientInfo);
{error, _, _} -> {error, _, _} ->
emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session0) Session0
end; end;
replay_streams(Session0 = #{replay := []}, _ClientInfo) -> replay_streams(Session0 = #{replay := []}, _ClientInfo) ->
Session = maps:remove(replay, Session0), Session = maps:remove(replay, Session0),