From d9e7544070eb9f57cf948d73a997e2693dce574f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 30 Nov 2023 23:23:55 +0100 Subject: [PATCH] refactor(sessds): Introduce macros for the timers --- apps/emqx/src/emqx_persistent_session_ds.erl | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 9844e6d48..fa94e656e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -120,7 +120,11 @@ -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). --type timer() :: pull | get_streams | bump_last_alive_at. + +-define(TIMER_PULL, timer_pull). +-define(TIMER_GET_STREAMS, timer_get_streams). +-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. -define(STATS_KEYS, [ subscriptions_cnt, @@ -399,7 +403,7 @@ deliver(_ClientInfo, _Delivers, Session) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout( _ClientInfo, - pull, + ?TIMER_PULL, Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( @@ -422,13 +426,13 @@ handle_timeout( [_ | _] -> 0 end, - ensure_timer(pull, Timeout), + ensure_timer(?TIMER_PULL, Timeout), {ok, Publishes, Session#{inflight := Inflight}}; -handle_timeout(_ClientInfo, get_streams, Session) -> +handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> renew_streams(Session), - ensure_timer(get_streams), + ensure_timer(?TIMER_GET_STREAMS), {ok, [], Session}; -handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> +handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session %% too early in case the session/connection/node crashes earlier without having time @@ -436,7 +440,7 @@ handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), EstimatedLastAliveAt = now_ms() + BumpInterval, Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - ensure_timer(bump_last_alive_at), + ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT), {ok, [], Session}. -spec replay(clientinfo(), [], session()) -> @@ -958,9 +962,9 @@ export_record(_, _, [], Acc) -> %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? ensure_timers() -> - ensure_timer(pull), - ensure_timer(get_streams), - ensure_timer(bump_last_alive_at). + ensure_timer(?TIMER_PULL), + ensure_timer(?TIMER_GET_STREAMS), + ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT). -spec ensure_timer(timer()) -> ok. ensure_timer(bump_last_alive_at = Type) ->