From 38800c0260004f5fc600f507e80a23072459bde8 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 1 Dec 2023 02:20:12 +0100 Subject: [PATCH] refactor(sessds): Store timers in the session --- apps/emqx/src/emqx_persistent_session_ds.erl | 53 ++++++++------------ apps/emqx/src/emqx_session.erl | 23 ++++----- 2 files changed, 31 insertions(+), 45 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index fa94e656e..6b94f3d74 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -96,6 +96,12 @@ props := map(), extra := map() }. + +-define(TIMER_PULL, timer_pull). +-define(TIMER_GET_STREAMS, timer_get_streams). +-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). +-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. + -type session() :: #{ %% Client ID id := id(), @@ -111,6 +117,8 @@ receive_maximum := pos_integer(), %% Connection Info conninfo := emqx_types:conninfo(), + %% Timers + timer() => reference(), %% props := map() }. @@ -121,11 +129,6 @@ -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). --define(TIMER_PULL, timer_pull). --define(TIMER_GET_STREAMS, timer_get_streams). --define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). --type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. - -define(STATS_KEYS, [ subscriptions_cnt, subscriptions_max, @@ -148,8 +151,7 @@ session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> % TODO: expiration - ensure_timers(), - ensure_session(ClientID, ConnInfo, Conf). + ensure_timers(ensure_session(ClientID, ConnInfo, Conf)). -spec open(clientinfo(), conninfo()) -> {_IsPresent :: true, session(), []} | false. @@ -163,10 +165,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - ensure_timers(), ReceiveMaximum = receive_maximum(ConnInfo), Session = Session0#{receive_maximum => ReceiveMaximum}, - {true, Session, []}; + {true, ensure_timers(Session), []}; false -> false end. @@ -404,7 +405,7 @@ deliver(_ClientInfo, _Delivers, Session) -> handle_timeout( _ClientInfo, ?TIMER_PULL, - Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} + Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( fun @@ -426,12 +427,11 @@ handle_timeout( [_ | _] -> 0 end, - ensure_timer(?TIMER_PULL, Timeout), - {ok, Publishes, Session#{inflight := Inflight}}; + Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session0#{inflight := Inflight}), + {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> renew_streams(Session), - ensure_timer(?TIMER_GET_STREAMS), - {ok, [], Session}; + {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)}; handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session @@ -440,8 +440,8 @@ handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), EstimatedLastAliveAt = now_ms() + BumpInterval, Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT), - {ok, [], Session}. + BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), + {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}. -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. @@ -961,22 +961,11 @@ export_record(_, _, [], Acc) -> %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? -ensure_timers() -> - ensure_timer(?TIMER_PULL), - ensure_timer(?TIMER_GET_STREAMS), - ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT). - --spec ensure_timer(timer()) -> ok. -ensure_timer(bump_last_alive_at = Type) -> - BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), - ensure_timer(Type, BumpInterval); -ensure_timer(Type) -> - ensure_timer(Type, 100). - --spec ensure_timer(timer(), non_neg_integer()) -> ok. -ensure_timer(Type, Timeout) -> - _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), - ok. +-spec ensure_timers(session()) -> session(). +ensure_timers(Session0) -> + Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0), + Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1), + emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2). -spec receive_maximum(conninfo()) -> pos_integer(). receive_maximum(ConnInfo) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 239919179..bf12c933f 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -111,8 +111,7 @@ reply/0, replies/0, common_timer_name/0, - custom_timer_name/0, - timerset/0 + custom_timer_name/0 ]). -type session_id() :: _TODO. @@ -154,8 +153,6 @@ emqx_session_mem:session() | emqx_persistent_session_ds:session(). --type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}. - -define(INFO_KEYS, [ id, created_at, @@ -477,19 +474,19 @@ handle_timeout(ClientInfo, Timer, Session) -> %%-------------------------------------------------------------------- --spec ensure_timer(custom_timer_name(), timeout(), timerset()) -> - timerset(). -ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> +-spec ensure_timer(custom_timer_name(), timeout(), map()) -> + map(). +ensure_timer(Name, Time, Timers = #{}) when Time >= 0 -> TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), Timers#{Name => TRef}. --spec reset_timer(custom_timer_name(), timeout(), timerset()) -> - timerset(). -reset_timer(Name, Time, Channel) -> - ensure_timer(Name, Time, cancel_timer(Name, Channel)). +-spec reset_timer(custom_timer_name(), timeout(), map()) -> + map(). +reset_timer(Name, Time, Timers) -> + ensure_timer(Name, Time, cancel_timer(Name, Timers)). --spec cancel_timer(custom_timer_name(), timerset()) -> - timerset(). +-spec cancel_timer(custom_timer_name(), map()) -> + map(). cancel_timer(Name, Timers0) -> case maps:take(Name, Timers0) of {TRef, Timers} ->