refactor(mqttsn): make timer names equal to messages they send

Because keeping timer names different from the messages they send
complicates understanding of the control flow, and spends few
reductions per timer operation unnecessarily.
This commit is contained in:
Andrew Mayorov 2023-09-08 12:48:06 +04:00
parent e4866adc2f
commit 57ae5b14f1
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 27 additions and 38 deletions

View File

@ -104,15 +104,6 @@
-type replies() :: reply() | [reply()]. -type replies() :: reply() | [reply()].
-define(TIMER_TABLE, #{
alive_timer => keepalive,
retry_timer => retry_delivery,
await_timer => expire_awaiting_rel,
expire_timer => expire_session,
asleep_timer => expire_asleep,
register_timer => retry_register
}).
-define(DEFAULT_OVERRIDE, #{ -define(DEFAULT_OVERRIDE, #{
clientid => <<"${ConnInfo.clientid}">> clientid => <<"${ConnInfo.clientid}">>
%, username => <<"${ConnInfo.clientid}">> %, username => <<"${ConnInfo.clientid}">>
@ -431,7 +422,7 @@ ensure_keepalive_timer(0, Channel) ->
Channel; Channel;
ensure_keepalive_timer(Interval, Channel) -> ensure_keepalive_timer(Interval, Channel) ->
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))), Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle incoming packet %% Handle incoming packet
@ -669,7 +660,7 @@ handle_in(
topic_name => TopicName topic_name => TopicName
}), }),
NChannel = cancel_timer( NChannel = cancel_timer(
register_timer, retry_register,
Channel#channel{register_inflight = undefined} Channel#channel{register_inflight = undefined}
), ),
send_next_register_or_replay_publish(TopicName, NChannel); send_next_register_or_replay_publish(TopicName, NChannel);
@ -692,7 +683,7 @@ handle_in(
topic_name => TopicName topic_name => TopicName
}), }),
NChannel = cancel_timer( NChannel = cancel_timer(
register_timer, retry_register,
Channel#channel{register_inflight = undefined} Channel#channel{register_inflight = undefined}
), ),
send_next_register_or_replay_publish(TopicName, NChannel) send_next_register_or_replay_publish(TopicName, NChannel)
@ -1165,7 +1156,7 @@ do_publish(
case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of
{ok, _PubRes, NSession} -> {ok, _PubRes, NSession} ->
NChannel1 = ensure_timer( NChannel1 = ensure_timer(
await_timer, expire_awaiting_rel,
Channel#channel{session = NSession} Channel#channel{session = NSession}
), ),
handle_out(pubrec, MsgId, NChannel1); handle_out(pubrec, MsgId, NChannel1);
@ -1447,7 +1438,7 @@ awake(
{ok, More, Session2} -> {ok, More, Session2} ->
{lists:append(Publishes, More), Session2} {lists:append(Publishes, More), Session2}
end, end,
Channel1 = cancel_timer(asleep_timer, Channel), Channel1 = cancel_timer(expire_asleep, Channel),
{Replies0, NChannel0} = outgoing_deliver_and_register( {Replies0, NChannel0} = outgoing_deliver_and_register(
do_deliver( do_deliver(
NPublishes, NPublishes,
@ -1499,7 +1490,7 @@ asleep(Duration, Channel = #channel{conn_state = asleep}) ->
msg => "update_asleep_timer", msg => "update_asleep_timer",
new_duration => Duration new_duration => Duration
}), }),
ensure_asleep_timer(Duration, cancel_timer(asleep_timer, Channel)); ensure_asleep_timer(Duration, cancel_timer(expire_asleep, Channel));
asleep(Duration, Channel = #channel{conn_state = connected}) -> asleep(Duration, Channel = #channel{conn_state = connected}) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "goto_asleep_state", msg => "goto_asleep_state",
@ -1907,7 +1898,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
?UINT_MAX -> ?UINT_MAX ->
{ok, Channel}; {ok, Channel};
I when I > 0 -> I when I > 0 ->
{ok, ensure_timer(expire_timer, I, Channel)}; {ok, ensure_timer(expire_session, I, Channel)};
_ -> _ ->
shutdown(Reason, Channel) shutdown(Reason, Channel)
end. end.
@ -2007,7 +1998,7 @@ handle_deliver(
handle_out( handle_out(
publish, publish,
Publishes, Publishes,
ensure_timer(retry_timer, NChannel) ensure_timer(retry_delivery, NChannel)
); );
{ok, NSession} -> {ok, NSession} ->
{ok, Channel#channel{session = NSession}} {ok, Channel#channel{session = NSession}}
@ -2068,13 +2059,13 @@ handle_timeout(
case emqx_keepalive:check(StatVal, Keepalive) of case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} -> {ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive}, NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)}; {ok, reset_timer(keepalive, NChannel)};
{error, timeout} -> {error, timeout} ->
handle_out(disconnect, ?SN_RC2_KEEPALIVE_TIMEOUT, Channel) handle_out(disconnect, ?SN_RC2_KEEPALIVE_TIMEOUT, Channel)
end; end;
handle_timeout( handle_timeout(
_TRef, _TRef,
retry_delivery, _Name = retry_delivery,
Channel = #channel{conn_state = disconnected} Channel = #channel{conn_state = disconnected}
) -> ) ->
{ok, Channel}; {ok, Channel};
@ -2083,42 +2074,42 @@ handle_timeout(
retry_delivery, retry_delivery,
Channel = #channel{conn_state = asleep} Channel = #channel{conn_state = asleep}
) -> ) ->
{ok, reset_timer(retry_timer, Channel)}; {ok, reset_timer(retry_delivery, Channel)};
handle_timeout( handle_timeout(
_TRef, _TRef,
retry_delivery, Name = retry_delivery,
Channel = #channel{session = Session, clientinfo = ClientInfo} Channel = #channel{session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_mqttsn_session:retry(ClientInfo, Session) of case emqx_mqttsn_session:retry(ClientInfo, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, clean_timer(Name, Channel#channel{session = NSession})};
{ok, Publishes, Timeout, NSession} -> {ok, Publishes, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
%% XXX: These replay messages should awaiting register acked? %% XXX: These replay messages should awaiting register acked?
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel))
end; end;
handle_timeout( handle_timeout(
_TRef, _TRef,
expire_awaiting_rel, _Name = expire_awaiting_rel,
Channel = #channel{conn_state = disconnected} Channel = #channel{conn_state = disconnected}
) -> ) ->
{ok, Channel}; {ok, Channel};
handle_timeout( handle_timeout(
_TRef, _TRef,
expire_awaiting_rel, Name = expire_awaiting_rel,
Channel = #channel{conn_state = asleep} Channel = #channel{conn_state = asleep}
) -> ) ->
{ok, reset_timer(await_timer, Channel)}; {ok, reset_timer(Name, Channel)};
handle_timeout( handle_timeout(
_TRef, _TRef,
expire_awaiting_rel, Name = expire_awaiting_rel,
Channel = #channel{session = Session, clientinfo = ClientInfo} Channel = #channel{session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, clean_timer(await_timer, Channel#channel{session = NSession})}; {ok, clean_timer(Name, Channel#channel{session = NSession})};
{ok, Timeout, NSession} -> {ok, Timeout, NSession} ->
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} {ok, reset_timer(Name, Timeout, Channel#channel{session = NSession})}
end; end;
handle_timeout( handle_timeout(
_TRef, _TRef,
@ -2210,7 +2201,7 @@ ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration}) when
ensure_asleep_timer(Durtion, Channel) -> ensure_asleep_timer(Durtion, Channel) ->
ensure_timer( ensure_timer(
asleep_timer, expire_asleep,
timer:seconds(Durtion), timer:seconds(Durtion),
Channel#channel{asleep_timer_duration = Durtion} Channel#channel{asleep_timer_duration = Durtion}
). ).
@ -2219,9 +2210,8 @@ ensure_register_timer(Channel) ->
ensure_register_timer(0, Channel). ensure_register_timer(0, Channel).
ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) -> ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) ->
Msg = maps:get(register_timer, ?TIMER_TABLE), TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {retry_register, RetryTimes}),
TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}), Channel#channel{timers = Timers#{retry_register => TRef}}.
Channel#channel{timers = Timers#{register_timer => TRef}}.
cancel_timer(Name, Channel = #channel{timers = Timers}) -> cancel_timer(Name, Channel = #channel{timers = Timers}) ->
case maps:get(Name, Timers, undefined) of case maps:get(Name, Timers, undefined) of
@ -2242,8 +2232,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
end. end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE), TRef = emqx_utils:start_timer(Time, Name),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}. Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) -> reset_timer(Name, Channel) ->
@ -2255,11 +2244,11 @@ reset_timer(Name, Time, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) -> clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) -> interval(retry_delivery, #channel{session = Session}) ->
emqx_mqttsn_session:info(retry_interval, Session); emqx_mqttsn_session:info(retry_interval, Session);
interval(await_timer, #channel{session = Session}) -> interval(expire_awaiting_rel, #channel{session = Session}) ->
emqx_mqttsn_session:info(await_rel_timeout, Session). emqx_mqttsn_session:info(await_rel_timeout, Session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------