From 3f3e33b2cb36763100b5ca4a8fa0ef3a9a705f6d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 5 Mar 2024 20:16:52 +0100 Subject: [PATCH] fix(sessds): untangle pull and replay retry timers And restore the convention that timer handler always manages only its own timers. --- apps/emqx/src/emqx_persistent_session_ds.erl | 31 +++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 790e2d477..4c42b2415 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -140,11 +140,14 @@ %% Buffer: inflight := emqx_persistent_session_ds_inflight:t(), %% In-progress replay: + %% List of stream replay states to be added to the inflight buffer. replay => [{_StreamKey, stream_state()}, ...], %% Timers: timer() => reference() }. +-define(IS_REPLAY_ONGOING(SESS), is_map_key(replay, SESS)). + -record(req_sync, { from :: pid(), ref :: reference() @@ -457,12 +460,14 @@ deliver(ClientInfo, Delivers, Session0) -> -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. -handle_timeout( - ClientInfo, - ?TIMER_PULL, - Session0 -) when not is_map_key(replay, Session0) -> - {Publishes, Session1} = drain_buffer(fetch_new_messages(Session0, ClientInfo)), +handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> + {Publishes, Session1} = + case ?IS_REPLAY_ONGOING(Session0) of + false -> + drain_buffer(fetch_new_messages(Session0, ClientInfo)); + true -> + {[], Session0} + end, Timeout = case Publishes of [] -> @@ -472,11 +477,15 @@ handle_timeout( end, Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; -handle_timeout(ClientInfo, ?TIMER_PULL, Session0 = #{replay := [_ | _]}) -> - Session = replay_streams(Session0, ClientInfo), - {ok, [], Session}; handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> - Session = replay_streams(Session0, ClientInfo), + Session1 = replay_streams(Session0, ClientInfo), + Session = + case ?IS_REPLAY_ONGOING(Session1) of + true -> + emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session1); + false -> + Session1 + end, {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), @@ -524,7 +533,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); {error, _, _} -> - emqx_session:ensure_timer(?TIMER_RETRY_REPLAY, ?TIMEOUT_RETRY_REPLAY, Session0) + Session0 end; replay_streams(Session0 = #{replay := []}, _ClientInfo) -> Session = maps:remove(replay, Session0),