From d88deb9ceb5805da3895f1547c3c5da65815ee6a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Nov 2023 11:56:35 -0300 Subject: [PATCH] feat(ds): add session timer to bump last alive at timestamp --- apps/emqx/src/emqx_persistent_session_ds.erl | 22 ++++++++++++++++---- apps/emqx/src/emqx_schema.erl | 8 +++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1429d6e97..97825e728 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -117,6 +117,7 @@ -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). +-type timer() :: pull | get_streams | bump_last_alive_at. -define(STATS_KEYS, [ subscriptions_cnt, @@ -396,6 +397,11 @@ handle_timeout( handle_timeout(_ClientInfo, get_streams, Session) -> renew_streams(Session), ensure_timer(get_streams), + {ok, [], Session}; +handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> + NowMS = now_ms(), + Session = session_set_last_alive_at_trans(Session0, NowMS), + ensure_timer(bump_last_alive_at), {ok, [], Session}. -spec replay(clientinfo(), [], session()) -> @@ -553,7 +559,7 @@ session_open(SessionId, NewConnInfo) -> false -> %% new connection being established Record1 = Record0#session{conninfo = NewConnInfo}, - Record = session_set_last_alive_at(Record1, never), + Record = session_set_last_alive_at(Record1, NowMS), Session = export_session(Record), DSSubs = session_read_subscriptions(SessionId), Subscriptions = export_subscriptions(DSSubs), @@ -592,6 +598,10 @@ session_create(SessionId, ConnInfo, Props) -> ok = mnesia:write(?SESSION_TAB, Session, write), Session. +session_set_last_alive_at_trans(Session, LastAliveAt) -> + #{conninfo := ConnInfo} = Session, + session_set_last_alive_at_trans(Session, ConnInfo, LastAliveAt). + session_set_last_alive_at_trans(Session, NewConnInfo, LastAliveAt) -> #{id := SessionId} = Session, transaction(fun() -> @@ -863,13 +873,17 @@ export_record(_, _, [], Acc) -> %% effects. Add `CBM:init' callback to the session behavior? ensure_timers() -> ensure_timer(pull), - ensure_timer(get_streams). + ensure_timer(get_streams), + ensure_timer(bump_last_alive_at). --spec ensure_timer(pull | get_streams) -> ok. +-spec ensure_timer(timer()) -> ok. +ensure_timer(bump_last_alive_at = Type) -> + BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), + ensure_timer(Type, BumpInterval); ensure_timer(Type) -> ensure_timer(Type, 100). --spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok. +-spec ensure_timer(timer(), non_neg_integer()) -> ok. ensure_timer(Type, Timeout) -> _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 5c3f2e72f..e10160e4c 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1781,6 +1781,14 @@ fields("session_persistence") -> desc => ?DESC(session_ds_idle_poll_interval) } )}, + {"last_alive_update_interval", + sc( + timeout_duration(), + #{ + default => <<"5000ms">>, + desc => ?DESC(session_ds_last_alive_update_interval) + } + )}, {"force_persistence", sc( boolean(),