diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 2443b149a..e054c4548 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -104,15 +104,6 @@ -type replies() :: reply() | [reply()]. --define(TIMER_TABLE, #{ - alive_timer => keepalive, - retry_timer => retry_delivery, - await_timer => expire_awaiting_rel, - expire_timer => expire_session, - asleep_timer => expire_asleep, - register_timer => retry_register -}). - -define(DEFAULT_OVERRIDE, #{ clientid => <<"${ConnInfo.clientid}">> %, username => <<"${ConnInfo.clientid}">> @@ -431,7 +422,7 @@ ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel) -> Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))), - ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). + ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). %%-------------------------------------------------------------------- %% Handle incoming packet @@ -669,7 +660,7 @@ handle_in( topic_name => TopicName }), NChannel = cancel_timer( - register_timer, + retry_register, Channel#channel{register_inflight = undefined} ), send_next_register_or_replay_publish(TopicName, NChannel); @@ -692,7 +683,7 @@ handle_in( topic_name => TopicName }), NChannel = cancel_timer( - register_timer, + retry_register, Channel#channel{register_inflight = undefined} ), send_next_register_or_replay_publish(TopicName, NChannel) @@ -1165,7 +1156,7 @@ do_publish( case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of {ok, _PubRes, NSession} -> NChannel1 = ensure_timer( - await_timer, + expire_awaiting_rel, Channel#channel{session = NSession} ), handle_out(pubrec, MsgId, NChannel1); @@ -1447,7 +1438,7 @@ awake( {ok, More, Session2} -> {lists:append(Publishes, More), Session2} end, - Channel1 = cancel_timer(asleep_timer, Channel), + Channel1 = cancel_timer(expire_asleep, Channel), {Replies0, NChannel0} = outgoing_deliver_and_register( do_deliver( NPublishes, @@ -1499,7 +1490,7 @@ asleep(Duration, Channel = #channel{conn_state = asleep}) -> msg => "update_asleep_timer", new_duration => Duration }), - ensure_asleep_timer(Duration, cancel_timer(asleep_timer, Channel)); + ensure_asleep_timer(Duration, cancel_timer(expire_asleep, Channel)); asleep(Duration, Channel = #channel{conn_state = connected}) -> ?SLOG(info, #{ msg => "goto_asleep_state", @@ -1907,7 +1898,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> ?UINT_MAX -> {ok, Channel}; I when I > 0 -> - {ok, ensure_timer(expire_timer, I, Channel)}; + {ok, ensure_timer(expire_session, I, Channel)}; _ -> shutdown(Reason, Channel) end. @@ -2007,7 +1998,7 @@ handle_deliver( handle_out( publish, Publishes, - ensure_timer(retry_timer, NChannel) + ensure_timer(retry_delivery, NChannel) ); {ok, NSession} -> {ok, Channel#channel{session = NSession}} @@ -2068,13 +2059,13 @@ handle_timeout( case emqx_keepalive:check(StatVal, Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, - {ok, reset_timer(alive_timer, NChannel)}; + {ok, reset_timer(keepalive, NChannel)}; {error, timeout} -> handle_out(disconnect, ?SN_RC2_KEEPALIVE_TIMEOUT, Channel) end; handle_timeout( _TRef, - retry_delivery, + _Name = retry_delivery, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; @@ -2083,42 +2074,42 @@ handle_timeout( retry_delivery, Channel = #channel{conn_state = asleep} ) -> - {ok, reset_timer(retry_timer, Channel)}; + {ok, reset_timer(retry_delivery, Channel)}; handle_timeout( _TRef, - retry_delivery, + Name = retry_delivery, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> case emqx_mqttsn_session:retry(ClientInfo, Session) of {ok, NSession} -> - {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; + {ok, clean_timer(Name, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> NChannel = Channel#channel{session = NSession}, %% XXX: These replay messages should awaiting register acked? - handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) + handle_out(publish, Publishes, reset_timer(Name, Timeout, NChannel)) end; handle_timeout( _TRef, - expire_awaiting_rel, + _Name = expire_awaiting_rel, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - expire_awaiting_rel, + Name = expire_awaiting_rel, Channel = #channel{conn_state = asleep} ) -> - {ok, reset_timer(await_timer, Channel)}; + {ok, reset_timer(Name, Channel)}; handle_timeout( _TRef, - expire_awaiting_rel, + Name = expire_awaiting_rel, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> - {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; + {ok, clean_timer(Name, Channel#channel{session = NSession})}; {ok, Timeout, NSession} -> - {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} + {ok, reset_timer(Name, Timeout, Channel#channel{session = NSession})} end; handle_timeout( _TRef, @@ -2210,7 +2201,7 @@ ensure_asleep_timer(Channel = #channel{asleep_timer_duration = Duration}) when ensure_asleep_timer(Durtion, Channel) -> ensure_timer( - asleep_timer, + expire_asleep, timer:seconds(Durtion), Channel#channel{asleep_timer_duration = Durtion} ). @@ -2219,9 +2210,8 @@ ensure_register_timer(Channel) -> ensure_register_timer(0, Channel). ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) -> - Msg = maps:get(register_timer, ?TIMER_TABLE), - TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}), - Channel#channel{timers = Timers#{register_timer => TRef}}. + TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {retry_register, RetryTimes}), + Channel#channel{timers = Timers#{retry_register => TRef}}. cancel_timer(Name, Channel = #channel{timers = Timers}) -> case maps:get(Name, Timers, undefined) of @@ -2242,8 +2232,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) -> end. ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> - Msg = maps:get(Name, ?TIMER_TABLE), - TRef = emqx_utils:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Name), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> @@ -2255,11 +2244,11 @@ reset_timer(Name, Time, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. -interval(alive_timer, #channel{keepalive = KeepAlive}) -> +interval(keepalive, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); -interval(retry_timer, #channel{session = Session}) -> +interval(retry_delivery, #channel{session = Session}) -> emqx_mqttsn_session:info(retry_interval, Session); -interval(await_timer, #channel{session = Session}) -> +interval(expire_awaiting_rel, #channel{session = Session}) -> emqx_mqttsn_session:info(await_rel_timeout, Session). %%--------------------------------------------------------------------