refactor(sessds): Store timers in the session

This commit is contained in:
ieQu1 2023-12-01 02:20:12 +01:00
parent 69f1ca43c3
commit 38800c0260
2 changed files with 31 additions and 45 deletions

View File

@ -96,6 +96,12 @@
props := map(), props := map(),
extra := map() extra := map()
}. }.
-define(TIMER_PULL, timer_pull).
-define(TIMER_GET_STREAMS, timer_get_streams).
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT.
-type session() :: #{ -type session() :: #{
%% Client ID %% Client ID
id := id(), id := id(),
@ -111,6 +117,8 @@
receive_maximum := pos_integer(), receive_maximum := pos_integer(),
%% Connection Info %% Connection Info
conninfo := emqx_types:conninfo(), conninfo := emqx_types:conninfo(),
%% Timers
timer() => reference(),
%% %%
props := map() props := map()
}. }.
@ -121,11 +129,6 @@
-type conninfo() :: emqx_session:conninfo(). -type conninfo() :: emqx_session:conninfo().
-type replies() :: emqx_session:replies(). -type replies() :: emqx_session:replies().
-define(TIMER_PULL, timer_pull).
-define(TIMER_GET_STREAMS, timer_get_streams).
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT.
-define(STATS_KEYS, [ -define(STATS_KEYS, [
subscriptions_cnt, subscriptions_cnt,
subscriptions_max, subscriptions_max,
@ -148,8 +151,7 @@
session(). session().
create(#{clientid := ClientID}, ConnInfo, Conf) -> create(#{clientid := ClientID}, ConnInfo, Conf) ->
% TODO: expiration % TODO: expiration
ensure_timers(), ensure_timers(ensure_session(ClientID, ConnInfo, Conf)).
ensure_session(ClientID, ConnInfo, Conf).
-spec open(clientinfo(), conninfo()) -> -spec open(clientinfo(), conninfo()) ->
{_IsPresent :: true, session(), []} | false. {_IsPresent :: true, session(), []} | false.
@ -163,10 +165,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
ok = emqx_cm:discard_session(ClientID), ok = emqx_cm:discard_session(ClientID),
case session_open(ClientID, ConnInfo) of case session_open(ClientID, ConnInfo) of
Session0 = #{} -> Session0 = #{} ->
ensure_timers(),
ReceiveMaximum = receive_maximum(ConnInfo), ReceiveMaximum = receive_maximum(ConnInfo),
Session = Session0#{receive_maximum => ReceiveMaximum}, Session = Session0#{receive_maximum => ReceiveMaximum},
{true, Session, []}; {true, ensure_timers(Session), []};
false -> false ->
false false
end. end.
@ -404,7 +405,7 @@ deliver(_ClientInfo, _Delivers, Session) ->
handle_timeout( handle_timeout(
_ClientInfo, _ClientInfo,
?TIMER_PULL, ?TIMER_PULL,
Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
) -> ) ->
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(
fun fun
@ -426,12 +427,11 @@ handle_timeout(
[_ | _] -> [_ | _] ->
0 0
end, end,
ensure_timer(?TIMER_PULL, Timeout), Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session0#{inflight := Inflight}),
{ok, Publishes, Session#{inflight := Inflight}}; {ok, Publishes, Session};
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) ->
renew_streams(Session), renew_streams(Session),
ensure_timer(?TIMER_GET_STREAMS), {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)};
{ok, [], Session};
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
%% Note: we take a pessimistic approach here and assume that the client will be alive %% Note: we take a pessimistic approach here and assume that the client will be alive
%% until the next bump timeout. With this, we avoid garbage collecting this session %% until the next bump timeout. With this, we avoid garbage collecting this session
@ -440,8 +440,8 @@ handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
EstimatedLastAliveAt = now_ms() + BumpInterval, EstimatedLastAliveAt = now_ms() + BumpInterval,
Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt),
ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT), BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
{ok, [], Session}. {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}.
-spec replay(clientinfo(), [], session()) -> -spec replay(clientinfo(), [], session()) ->
{ok, replies(), session()}. {ok, replies(), session()}.
@ -961,22 +961,11 @@ export_record(_, _, [], Acc) ->
%% TODO: find a more reliable way to perform actions that have side %% TODO: find a more reliable way to perform actions that have side
%% effects. Add `CBM:init' callback to the session behavior? %% effects. Add `CBM:init' callback to the session behavior?
ensure_timers() -> -spec ensure_timers(session()) -> session().
ensure_timer(?TIMER_PULL), ensure_timers(Session0) ->
ensure_timer(?TIMER_GET_STREAMS), Session1 = emqx_session:ensure_timer(?TIMER_PULL, 100, Session0),
ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT). Session2 = emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session1),
emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, 100, Session2).
-spec ensure_timer(timer()) -> ok.
ensure_timer(bump_last_alive_at = Type) ->
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
ensure_timer(Type, BumpInterval);
ensure_timer(Type) ->
ensure_timer(Type, 100).
-spec ensure_timer(timer(), non_neg_integer()) -> ok.
ensure_timer(Type, Timeout) ->
_ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
ok.
-spec receive_maximum(conninfo()) -> pos_integer(). -spec receive_maximum(conninfo()) -> pos_integer().
receive_maximum(ConnInfo) -> receive_maximum(ConnInfo) ->

View File

@ -111,8 +111,7 @@
reply/0, reply/0,
replies/0, replies/0,
common_timer_name/0, common_timer_name/0,
custom_timer_name/0, custom_timer_name/0
timerset/0
]). ]).
-type session_id() :: _TODO. -type session_id() :: _TODO.
@ -154,8 +153,6 @@
emqx_session_mem:session() emqx_session_mem:session()
| emqx_persistent_session_ds:session(). | emqx_persistent_session_ds:session().
-type timerset() :: #{custom_timer_name() => _TimerRef :: reference()}.
-define(INFO_KEYS, [ -define(INFO_KEYS, [
id, id,
created_at, created_at,
@ -477,19 +474,19 @@ handle_timeout(ClientInfo, Timer, Session) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec ensure_timer(custom_timer_name(), timeout(), timerset()) -> -spec ensure_timer(custom_timer_name(), timeout(), map()) ->
timerset(). map().
ensure_timer(Name, Time, Timers = #{}) when Time > 0 -> ensure_timer(Name, Time, Timers = #{}) when Time >= 0 ->
TRef = emqx_utils:start_timer(Time, {?MODULE, Name}), TRef = emqx_utils:start_timer(Time, {?MODULE, Name}),
Timers#{Name => TRef}. Timers#{Name => TRef}.
-spec reset_timer(custom_timer_name(), timeout(), timerset()) -> -spec reset_timer(custom_timer_name(), timeout(), map()) ->
timerset(). map().
reset_timer(Name, Time, Channel) -> reset_timer(Name, Time, Timers) ->
ensure_timer(Name, Time, cancel_timer(Name, Channel)). ensure_timer(Name, Time, cancel_timer(Name, Timers)).
-spec cancel_timer(custom_timer_name(), timerset()) -> -spec cancel_timer(custom_timer_name(), map()) ->
timerset(). map().
cancel_timer(Name, Timers0) -> cancel_timer(Name, Timers0) ->
case maps:take(Name, Timers0) of case maps:take(Name, Timers0) of
{TRef, Timers} -> {TRef, Timers} ->