diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 0d83c60a6..328b345e7 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -130,15 +130,6 @@ -define(IS_MQTT_V5, #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V5}}). --define(TIMER_TABLE, #{ - alive_timer => keepalive, - retry_timer => retry_delivery, - await_timer => expire_awaiting_rel, - expire_timer => expire_session, - will_timer => will_message, - quota_timer => expire_quota_limit -}). - -define(LIMITER_ROUTING, message_routing). -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). @@ -734,7 +725,7 @@ do_publish( {ok, PubRes, NSession} -> RC = pubrec_reason_code(PubRes), NChannel0 = Channel#channel{session = NSession}, - NChannel1 = ensure_timer(await_timer, NChannel0), + NChannel1 = ensure_timer(expire_awaiting_rel, NChannel0), NChannel2 = ensure_quota(PubRes, NChannel1), handle_out(pubrec, {PacketId, RC}, NChannel2); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -765,7 +756,7 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) -> {ok, NLimiter} -> Channel#channel{quota = NLimiter}; {_, Intv, NLimiter} -> - ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter}) + ensure_timer(expire_quota_limit, Intv, Channel#channel{quota = NLimiter}) end. -compile({inline, [pubrec_reason_code/1]}). @@ -961,7 +952,7 @@ handle_deliver( case emqx_session:deliver(ClientInfo, Delivers, Session) of {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, - handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); + handle_out(publish, Publishes, ensure_timer(retry_delivery, NChannel)); {ok, NSession} -> {ok, Channel#channel{session = NSession}} end. @@ -1199,7 +1190,7 @@ handle_call( SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), ChanInfo1 = info(NChannel), emqx_cm:set_chan_info(ClientId, ChanInfo1#{sockinfo => SockInfo}), - reply(ok, reset_timer(alive_timer, NChannel)); + reply(ok, reset_timer(keepalive, NChannel)); handle_call(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), reply(ignored, Channel). @@ -1305,66 +1296,68 @@ handle_timeout( case emqx_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, - {ok, reset_timer(alive_timer, NChannel)}; + {ok, reset_timer(keepalive, NChannel)}; {error, timeout} -> handle_out(disconnect, ?RC_KEEP_ALIVE_TIMEOUT, Channel) end; handle_timeout( _TRef, - retry_delivery, + _Name = retry_delivery, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - retry_delivery, + Name = retry_delivery, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, - {ok, clean_timer(retry_timer, NChannel)}; + {ok, clean_timer(Name, NChannel)}; {ok, Publishes, Timeout, NSession} -> NChannel = Channel#channel{session = NSession}, - handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) + handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel)) end; handle_timeout( _TRef, - expire_awaiting_rel, + _Name = expire_awaiting_rel, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - expire_awaiting_rel, + Name = expire_awaiting_rel, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> case emqx_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, - {ok, clean_timer(await_timer, NChannel)}; + {ok, clean_timer(Name, NChannel)}; {ok, Timeout, NSession} -> NChannel = Channel#channel{session = NSession}, - {ok, reset_timer(await_timer, Timeout, NChannel)} + {ok, reset_timer(Name, Timeout, NChannel)} end; handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); handle_timeout( - _TRef, will_message, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg} + _TRef, + Name = will_message, + Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg} ) -> (WillMsg =/= undefined) andalso publish_will_msg(ClientInfo, WillMsg), - {ok, clean_timer(will_timer, Channel#channel{will_msg = undefined})}; + {ok, clean_timer(Name, Channel#channel{will_msg = undefined})}; handle_timeout( _TRef, - expire_quota_limit, + expire_quota_limit = Name, #channel{quota = Quota} = Channel ) -> case emqx_limiter_container:retry(?LIMITER_ROUTING, Quota) of {_, Intv, Quota2} -> - Channel2 = ensure_timer(quota_timer, Intv, Channel#channel{quota = Quota2}), + Channel2 = ensure_timer(Name, Intv, Channel#channel{quota = Quota2}), {ok, Channel2}; {_, Quota2} -> - {ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})} + {ok, clean_timer(Name, Channel#channel{quota = Quota2})} end; handle_timeout(TRef, Msg, Channel) -> case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of @@ -1392,8 +1385,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) -> end. ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> - Msg = maps:get(Name, ?TIMER_TABLE), - TRef = emqx_utils:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Name), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> @@ -1405,15 +1397,15 @@ reset_timer(Name, Time, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. -interval(alive_timer, #channel{keepalive = KeepAlive}) -> +interval(keepalive, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); -interval(retry_timer, #channel{session = Session}) -> +interval(retry_delivery, #channel{session = Session}) -> emqx_session:info(retry_interval, Session); -interval(await_timer, #channel{session = Session}) -> +interval(expire_awaiting_rel, #channel{session = Session}) -> emqx_session:info(await_rel_timeout, Session); -interval(expire_timer, #channel{conninfo = ConnInfo}) -> +interval(expire_session, #channel{conninfo = ConnInfo}) -> maps:get(expiry_interval, ConnInfo); -interval(will_timer, #channel{will_msg = WillMsg}) -> +interval(will_message, #channel{will_msg = WillMsg}) -> timer:seconds(will_delay_interval(WillMsg)). %%-------------------------------------------------------------------- @@ -1783,7 +1775,7 @@ packing_alias(Packet, Channel) -> %% Check quota state check_quota_exceeded(_, #channel{timers = Timers}) -> - case maps:get(quota_timer, Timers, undefined) of + case maps:get(expire_quota_limit, Timers, undefined) of undefined -> ok; _ -> {error, ?RC_QUOTA_EXCEEDED} end. @@ -2044,15 +2036,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone} Multiplier = get_mqtt_conf(Zone, keepalive_multiplier), RecvCnt = emqx_pd:get_counter(recv_pkt), Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)), - ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). + ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> - case maps:get(alive_timer, Timers, undefined) of + case maps:get(keepalive, Timers, undefined) of undefined -> Channel; TRef -> emqx_utils:cancel_timer(TRef), - Channel#channel{timers = maps:without([alive_timer], Timers)} + Channel#channel{timers = maps:without([keepalive], Timers)} end. %%-------------------------------------------------------------------- %% Maybe Resume Session @@ -2081,7 +2073,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> ?EXPIRE_INTERVAL_INFINITE -> {ok, Channel}; I when I > 0 -> - {ok, ensure_timer(expire_timer, I, Channel)}; + {ok, ensure_timer(expire_session, I, Channel)}; _ -> shutdown(Reason, Channel) end. @@ -2120,7 +2112,7 @@ maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = Wi ok = publish_will_msg(ClientInfo, WillMsg), Channel#channel{will_msg = undefined}; I -> - ensure_timer(will_timer, timer:seconds(I), Channel) + ensure_timer(will_message, timer:seconds(I), Channel) end. will_delay_interval(WillMsg) -> diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 6520d820a..0bb7fad18 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -768,32 +768,26 @@ t_handle_info_sock_closed(_) -> %% Test cases for handle_timeout %%-------------------------------------------------------------------- -t_handle_timeout_emit_stats(_) -> - TRef = make_ref(), - ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end), - Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()), - {ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel). - t_handle_timeout_keepalive(_) -> TRef = make_ref(), - Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()), + Channel = emqx_channel:set_field(timers, #{keepalive => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, Channel). t_handle_timeout_retry_delivery(_) -> TRef = make_ref(), ok = meck:expect(emqx_session, retry, fun(_, Session) -> {ok, Session} end), - Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()), + Channel = emqx_channel:set_field(timers, #{retry_delivery => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel). t_handle_timeout_expire_awaiting_rel(_) -> TRef = make_ref(), ok = meck:expect(emqx_session, expire, fun(_, _, Session) -> {ok, Session} end), - Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()), + Channel = emqx_channel:set_field(timers, #{expire_awaiting_rel => TRef}, channel()), {ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel). t_handle_timeout_expire_session(_) -> TRef = make_ref(), - Channel = emqx_channel:set_field(timers, #{expire_timer => TRef}, channel()), + Channel = emqx_channel:set_field(timers, #{expire_session => TRef}, channel()), {shutdown, expired, _Chan} = emqx_channel:handle_timeout(TRef, expire_session, Channel). t_handle_timeout_will_message(_) ->