From b1f144ab8b24a6500eacdd4dcfac6515d28ded0f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 25 Sep 2023 18:19:26 +0300 Subject: [PATCH] feat(session): add custom session timers mechanism That are managed exclusively by the session implementation, unlike common session timers that are managed by the channel itself. --- apps/emqx/src/emqx_channel.erl | 11 +++++ apps/emqx/src/emqx_session.erl | 43 ++++++++++++++++++- .../src/emqx_mqttsn_channel.erl | 3 ++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8669aea8e..cecaa780e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1338,6 +1338,17 @@ handle_timeout( NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, reset_timer(TimerName, Timeout, NChannel)) end; +handle_timeout( + _TRef, + {emqx_session, TimerName}, + Channel = #channel{session = Session, clientinfo = ClientInfo} +) -> + case emqx_session:handle_timeout(ClientInfo, TimerName, Session) of + {ok, [], NSession} -> + {ok, Channel#channel{session = NSession}}; + {ok, Replies, NSession} -> + handle_out(publish, Replies, Channel#channel{session = NSession}) + end; handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); handle_timeout( diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 092c4483a..f5bad6e8b 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -88,6 +88,13 @@ terminate/3 ]). +% Timers +-export([ + ensure_timer/3, + reset_timer/3, + cancel_timer/2 +]). + % Foreign session implementations -export([enrich_delivers/3]). @@ -103,7 +110,9 @@ conninfo/0, reply/0, replies/0, - common_timer_name/0 + common_timer_name/0, + custom_timer_name/0, + timerset/0 ]). -type session_id() :: _TODO. @@ -118,6 +127,7 @@ }. -type common_timer_name() :: retry_delivery | expire_awaiting_rel. +-type custom_timer_name() :: atom(). -type message() :: emqx_types:message(). -type publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}. @@ -144,6 +154,8 @@ emqx_session_mem:session() | emqx_persistent_session_ds:session(). +-type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}. + -define(INFO_KEYS, [ id, created_at, @@ -442,14 +454,41 @@ enrich_subopts(_Opt, _V, Msg, _) -> %% Timeouts %%-------------------------------------------------------------------- --spec handle_timeout(clientinfo(), common_timer_name(), t()) -> +-spec handle_timeout(clientinfo(), common_timer_name() | custom_timer_name(), t()) -> {ok, replies(), t()} + %% NOTE: only relevant for `common_timer_name()` | {ok, replies(), timeout(), t()}. handle_timeout(ClientInfo, Timer, Session) -> ?IMPL(Session):handle_timeout(ClientInfo, Timer, Session). %%-------------------------------------------------------------------- +-spec ensure_timer(custom_timer_name(), timeout(), timerset()) -> + timerset(). +ensure_timer(Name, _Time, Timers = #{}) when is_map_key(Name, Timers) -> + Timers; +ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> + TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), + Timers#{Name => TRef}. + +-spec reset_timer(custom_timer_name(), timeout(), timerset()) -> + timerset(). +reset_timer(Name, Time, Channel) -> + ensure_timer(Name, Time, cancel_timer(Name, Channel)). + +-spec cancel_timer(custom_timer_name(), timerset()) -> + timerset(). +cancel_timer(Name, Timers) -> + case maps:take(Name, Timers) of + {TRef, NTimers} -> + ok = emqx_utils:cancel_timer(TRef), + NTimers; + error -> + Timers + end. + +%%-------------------------------------------------------------------- + -spec disconnect(clientinfo(), t()) -> {idle | shutdown, t()}. disconnect(_ClientInfo, Session) -> diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 087187379..e7061e4a5 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -2116,6 +2116,9 @@ handle_timeout(_TRef, expire_session, Channel) -> handle_timeout(_TRef, expire_asleep, Channel) -> shutdown(asleep_timeout, Channel); handle_timeout(_TRef, Msg, Channel) -> + %% NOTE + %% We do not expect `emqx_mqttsn_session` to set up any custom timers (i.e with + %% `emqx_session:ensure_timer/3`), because `emqx_session_mem` doesn't use any. ?SLOG(error, #{ msg => "unexpected_timeout", timeout_msg => Msg