From 9e2987034bc6aca851eff5474668c2667a786ca5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Aug 2022 10:36:43 +0800 Subject: [PATCH] chore(exproto): start idle timer for udp clients --- .../emqx_exproto/src/emqx_exproto_channel.erl | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 80e5f4045..765dcf53e 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -76,7 +76,8 @@ -define(TIMER_TABLE, #{ alive_timer => keepalive, - force_timer => force_close + force_timer => force_close, + idle_timer => force_close_idle }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -94,6 +95,8 @@ awaiting_rel_max ]). +-define(DEFAULT_IDLE_TIMEOUT, 30000). + %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -149,8 +152,12 @@ init(ConnInfo = #{socktype := Socktype, GRpcChann = proplists:get_value(handler, Options), NConnInfo = default_conninfo(ConnInfo), ClientInfo = default_clientinfo(ConnInfo), + + IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), + + NConnInfo1 = NConnInfo#{idle_timeout => IdleTimeout}, Channel = #channel{gcli = #{channel => GRpcChann}, - conninfo = NConnInfo, + conninfo = NConnInfo1, clientinfo = ClientInfo, conn_state = accepted, timers = #{} @@ -165,7 +172,8 @@ init(ConnInfo = #{socktype := Socktype, #{socktype => socktype(Socktype), peername => address(Peername), sockname => address(Sockname)})}, - try_dispatch(on_socket_created, wrap(Req), Channel) + start_idle_checking_timer( + try_dispatch(on_socket_created, wrap(Req), Channel)) end. register_the_anonymous_client(ClientInfo, ConnInfo) -> @@ -184,6 +192,12 @@ register_the_anonymous_client(ClientInfo, ConnInfo) -> unregister_the_anonymous_client(ClientId) -> emqx_cm:unregister_channel(ClientId). +start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) -> + ensure_timer(idle_timer, Channel); + +start_idle_checking_timer(Channel) -> + Channel. + %% @private peercert(NoSsl, ConnInfo) when NoSsl == nossl; NoSsl == undefined -> @@ -267,6 +281,9 @@ handle_timeout(_TRef, {keepalive, StatVal}, handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> {shutdown, Reason, Channel}; +handle_timeout(_TRef, force_close_idle, Channel) -> + {shutdown, idle_timeout, Channel}; + handle_timeout(_TRef, Msg, Channel) -> ?WARN("Unexpected timeout: ~p", [Msg]), {ok, Channel}. @@ -328,7 +345,8 @@ handle_call({start_timer, keepalive, Interval}, NConnInfo = ConnInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, - {reply, ok, [{event, updated}], ensure_keepalive(NChannel)}; + {reply, ok, [{event, updated}], + ensure_keepalive(cancel_timer(idle_timer, NChannel))}; handle_call({subscribe, TopicFilter, Qos}, Channel = #channel{ @@ -561,6 +579,12 @@ reset_timer(Name, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. +cancel_timer(Name, Channel = #channel{timers = Timers}) -> + emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), + clean_timer(Name, Channel). + +interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> + IdleTimeout; interval(force_timer, _) -> 15000; interval(alive_timer, #channel{keepalive = Keepalive}) ->