refactor(sessds): Introduce macros for the timers

This commit is contained in:
ieQu1 2023-11-30 23:23:55 +01:00
parent 0d245acdc1
commit d9e7544070
1 changed files with 14 additions and 10 deletions

View File

@ -120,7 +120,11 @@
-type clientinfo() :: emqx_types:clientinfo(). -type clientinfo() :: emqx_types:clientinfo().
-type conninfo() :: emqx_session:conninfo(). -type conninfo() :: emqx_session:conninfo().
-type replies() :: emqx_session:replies(). -type replies() :: emqx_session:replies().
-type timer() :: pull | get_streams | bump_last_alive_at.
-define(TIMER_PULL, timer_pull).
-define(TIMER_GET_STREAMS, timer_get_streams).
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT.
-define(STATS_KEYS, [ -define(STATS_KEYS, [
subscriptions_cnt, subscriptions_cnt,
@ -399,7 +403,7 @@ deliver(_ClientInfo, _Delivers, Session) ->
{ok, replies(), session()} | {ok, replies(), timeout(), session()}. {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
handle_timeout( handle_timeout(
_ClientInfo, _ClientInfo,
pull, ?TIMER_PULL,
Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} Session = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
) -> ) ->
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(
@ -422,13 +426,13 @@ handle_timeout(
[_ | _] -> [_ | _] ->
0 0
end, end,
ensure_timer(pull, Timeout), ensure_timer(?TIMER_PULL, Timeout),
{ok, Publishes, Session#{inflight := Inflight}}; {ok, Publishes, Session#{inflight := Inflight}};
handle_timeout(_ClientInfo, get_streams, Session) -> handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) ->
renew_streams(Session), renew_streams(Session),
ensure_timer(get_streams), ensure_timer(?TIMER_GET_STREAMS),
{ok, [], Session}; {ok, [], Session};
handle_timeout(_ClientInfo, bump_last_alive_at, Session0) -> handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
%% Note: we take a pessimistic approach here and assume that the client will be alive %% Note: we take a pessimistic approach here and assume that the client will be alive
%% until the next bump timeout. With this, we avoid garbage collecting this session %% until the next bump timeout. With this, we avoid garbage collecting this session
%% too early in case the session/connection/node crashes earlier without having time %% too early in case the session/connection/node crashes earlier without having time
@ -436,7 +440,7 @@ handle_timeout(_ClientInfo, bump_last_alive_at, Session0) ->
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
EstimatedLastAliveAt = now_ms() + BumpInterval, EstimatedLastAliveAt = now_ms() + BumpInterval,
Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt),
ensure_timer(bump_last_alive_at), ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT),
{ok, [], Session}. {ok, [], Session}.
-spec replay(clientinfo(), [], session()) -> -spec replay(clientinfo(), [], session()) ->
@ -958,9 +962,9 @@ export_record(_, _, [], Acc) ->
%% TODO: find a more reliable way to perform actions that have side %% TODO: find a more reliable way to perform actions that have side
%% effects. Add `CBM:init' callback to the session behavior? %% effects. Add `CBM:init' callback to the session behavior?
ensure_timers() -> ensure_timers() ->
ensure_timer(pull), ensure_timer(?TIMER_PULL),
ensure_timer(get_streams), ensure_timer(?TIMER_GET_STREAMS),
ensure_timer(bump_last_alive_at). ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT).
-spec ensure_timer(timer()) -> ok. -spec ensure_timer(timer()) -> ok.
ensure_timer(bump_last_alive_at = Type) -> ensure_timer(bump_last_alive_at = Type) ->