refactor(chan): make timer names equal to messages they send

Because keeping timer names different from the messages they send
complicates understanding of the control flow, and spends few
reductions per timer operation unnecessarily.
This commit is contained in:
Andrew Mayorov 2023-09-08 12:43:05 +04:00
parent cfb1bf1fa4
commit e4866adc2f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 36 additions and 50 deletions

View File

@ -130,15 +130,6 @@
-define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}).
-define(TIMER_TABLE, #{
alive_timer => keepalive,
retry_timer => retry_delivery,
await_timer => expire_awaiting_rel,
expire_timer => expire_session,
will_timer => will_message,
quota_timer => expire_quota_limit
}).
-define(LIMITER_ROUTING, message_routing).
-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
@ -734,7 +725,7 @@ do_publish(
{ok, PubRes, NSession} ->
RC = pubrec_reason_code(PubRes),
NChannel0 = Channel#channel{session = NSession},
NChannel1 = ensure_timer(await_timer, NChannel0),
NChannel1 = ensure_timer(expire_awaiting_rel, NChannel0),
NChannel2 = ensure_quota(PubRes, NChannel1),
handle_out(pubrec, {PacketId, RC}, NChannel2);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
@ -765,7 +756,7 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
{ok, NLimiter} ->
Channel#channel{quota = NLimiter};
{_, Intv, NLimiter} ->
ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
ensure_timer(expire_quota_limit, Intv, Channel#channel{quota = NLimiter})
end.
-compile({inline, [pubrec_reason_code/1]}).
@ -961,7 +952,7 @@ handle_deliver(
case emqx_session:deliver(ClientInfo, Delivers, Session) of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
handle_out(publish, Publishes, ensure_timer(retry_delivery, NChannel));
{ok, NSession} ->
{ok, Channel#channel{session = NSession}}
end.
@ -1199,7 +1190,7 @@ handle_call(
SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
ChanInfo1 = info(NChannel),
emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}),
reply(ok, reset_timer(alive_timer, NChannel));
reply(ok, reset_timer(keepalive, NChannel));
handle_call(Req, Channel) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
reply(ignored, Channel).
@ -1305,66 +1296,68 @@ handle_timeout(
case emqx_keepalive:check(StatVal, Keepalive) of
{ok, NKeepalive} ->
NChannel = Channel#channel{keepalive = NKeepalive},
{ok, reset_timer(alive_timer, NChannel)};
{ok, reset_timer(keepalive, NChannel)};
{error, timeout} ->
handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel)
end;
handle_timeout(
_TRef,
retry_delivery,
_Name = retry_delivery,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
retry_delivery,
Name = retry_delivery,
Channel = #channel{session = Session, clientinfo = ClientInfo}
) ->
case emqx_session:retry(ClientInfo, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
{ok, clean_timer(retry_timer, NChannel)};
{ok, clean_timer(Name, NChannel)};
{ok, Publishes, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel))
end;
handle_timeout(
_TRef,
expire_awaiting_rel,
_Name = expire_awaiting_rel,
Channel = #channel{conn_state = disconnected}
) ->
{ok, Channel};
handle_timeout(
_TRef,
expire_awaiting_rel,
Name = expire_awaiting_rel,
Channel = #channel{session = Session, clientinfo = ClientInfo}
) ->
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
{ok, NSession} ->
NChannel = Channel#channel{session = NSession},
{ok, clean_timer(await_timer, NChannel)};
{ok, clean_timer(Name, NChannel)};
{ok, Timeout, NSession} ->
NChannel = Channel#channel{session = NSession},
{ok, reset_timer(await_timer, Timeout, NChannel)}
{ok, reset_timer(Name, Timeout, NChannel)}
end;
handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel);
handle_timeout(
_TRef, will_message, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}
_TRef,
Name = will_message,
Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}
) ->
(WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg),
{ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})};
{ok, clean_timer(Name, Channel#channel{will_msg = undefined})};
handle_timeout(
_TRef,
expire_quota_limit,
expire_quota_limit = Name,
#channel{quota = Quota} = Channel
) ->
case emqx_limiter_container:retry(?LIMITER_ROUTING, Quota) of
{_, Intv, Quota2} ->
Channel2 = ensure_timer(quota_timer, Intv, Channel#channel{quota = Quota2}),
Channel2 = ensure_timer(Name, Intv, Channel#channel{quota = Quota2}),
{ok, Channel2};
{_, Quota2} ->
{ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})}
{ok, clean_timer(Name, Channel#channel{quota = Quota2})}
end;
handle_timeout(TRef, Msg, Channel) ->
case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of
@ -1392,8 +1385,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
end.
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_utils:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Name),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
@ -1405,15 +1397,15 @@ reset_timer(Name, Time, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
interval(keepalive, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) ->
interval(retry_delivery, #channel{session = Session}) ->
emqx_session:info(retry_interval, Session);
interval(await_timer, #channel{session = Session}) ->
interval(expire_awaiting_rel, #channel{session = Session}) ->
emqx_session:info(await_rel_timeout, Session);
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
interval(expire_session, #channel{conninfo = ConnInfo}) ->
maps:get(expiry_interval, ConnInfo);
interval(will_timer, #channel{will_msg = WillMsg}) ->
interval(will_message, #channel{will_msg = WillMsg}) ->
timer:seconds(will_delay_interval(WillMsg)).
%%--------------------------------------------------------------------
@ -1783,7 +1775,7 @@ packing_alias(Packet, Channel) ->
%% Check quota state
check_quota_exceeded(_, #channel{timers = Timers}) ->
case maps:get(quota_timer, Timers, undefined) of
case maps:get(expire_quota_limit, Timers, undefined) of
undefined -> ok;
_ -> {error, ?RC_QUOTA_EXCEEDED}
end.
@ -2044,15 +2036,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}
Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
RecvCnt = emqx_pd:get_counter(recv_pkt),
Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) ->
case maps:get(alive_timer, Timers, undefined) of
case maps:get(keepalive, Timers, undefined) of
undefined ->
Channel;
TRef ->
emqx_utils:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
Channel#channel{timers = maps:without([keepalive], Timers)}
end.
%%--------------------------------------------------------------------
%% Maybe Resume Session
@ -2081,7 +2073,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
?EXPIRE_INTERVAL_INFINITE ->
{ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_timer, I, Channel)};
{ok, ensure_timer(expire_session, I, Channel)};
_ ->
shutdown(Reason, Channel)
end.
@ -2120,7 +2112,7 @@ maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = Wi
ok = publish_will_msg(ClientInfo, WillMsg),
Channel#channel{will_msg = undefined};
I ->
ensure_timer(will_timer, timer:seconds(I), Channel)
ensure_timer(will_message, timer:seconds(I), Channel)
end.
will_delay_interval(WillMsg) ->

View File

@ -768,32 +768,26 @@ t_handle_info_sock_closed(_) ->
%% Test cases for handle_timeout
%%--------------------------------------------------------------------
t_handle_timeout_emit_stats(_) ->
TRef = make_ref(),
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel).
t_handle_timeout_keepalive(_) ->
TRef = make_ref(),
Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
Channel = emqx_channel:set_field(timers, #{keepalive => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, Channel).
t_handle_timeout_retry_delivery(_) ->
TRef = make_ref(),
ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end),
Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
Channel = emqx_channel:set_field(timers, #{retry_delivery => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
t_handle_timeout_expire_awaiting_rel(_) ->
TRef = make_ref(),
ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end),
Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
Channel = emqx_channel:set_field(timers, #{expire_awaiting_rel => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
t_handle_timeout_expire_session(_) ->
TRef = make_ref(),
Channel = emqx_channel:set_field(timers, #{expire_timer => TRef}, channel()),
Channel = emqx_channel:set_field(timers, #{expire_session => TRef}, channel()),
{shutdown, expired, _Chan} = emqx_channel:handle_timeout(TRef, expire_session, Channel).
t_handle_timeout_will_message(_) ->