From 9e2987034bc6aca851eff5474668c2667a786ca5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Aug 2022 10:36:43 +0800 Subject: [PATCH 1/2] chore(exproto): start idle timer for udp clients --- .../emqx_exproto/src/emqx_exproto_channel.erl | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 80e5f4045..765dcf53e 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -76,7 +76,8 @@ -define(TIMER_TABLE, #{ alive_timer => keepalive, - force_timer => force_close + force_timer => force_close, + idle_timer => force_close_idle }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). @@ -94,6 +95,8 @@ awaiting_rel_max ]). +-define(DEFAULT_IDLE_TIMEOUT, 30000). + %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -149,8 +152,12 @@ init(ConnInfo = #{socktype := Socktype, GRpcChann = proplists:get_value(handler, Options), NConnInfo = default_conninfo(ConnInfo), ClientInfo = default_clientinfo(ConnInfo), + + IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), + + NConnInfo1 = NConnInfo#{idle_timeout => IdleTimeout}, Channel = #channel{gcli = #{channel => GRpcChann}, - conninfo = NConnInfo, + conninfo = NConnInfo1, clientinfo = ClientInfo, conn_state = accepted, timers = #{} @@ -165,7 +172,8 @@ init(ConnInfo = #{socktype := Socktype, #{socktype => socktype(Socktype), peername => address(Peername), sockname => address(Sockname)})}, - try_dispatch(on_socket_created, wrap(Req), Channel) + start_idle_checking_timer( + try_dispatch(on_socket_created, wrap(Req), Channel)) end. register_the_anonymous_client(ClientInfo, ConnInfo) -> @@ -184,6 +192,12 @@ register_the_anonymous_client(ClientInfo, ConnInfo) -> unregister_the_anonymous_client(ClientId) -> emqx_cm:unregister_channel(ClientId). +start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) -> + ensure_timer(idle_timer, Channel); + +start_idle_checking_timer(Channel) -> + Channel. + %% @private peercert(NoSsl, ConnInfo) when NoSsl == nossl; NoSsl == undefined -> @@ -267,6 +281,9 @@ handle_timeout(_TRef, {keepalive, StatVal}, handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> {shutdown, Reason, Channel}; +handle_timeout(_TRef, force_close_idle, Channel) -> + {shutdown, idle_timeout, Channel}; + handle_timeout(_TRef, Msg, Channel) -> ?WARN("Unexpected timeout: ~p", [Msg]), {ok, Channel}. @@ -328,7 +345,8 @@ handle_call({start_timer, keepalive, Interval}, NConnInfo = ConnInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, - {reply, ok, [{event, updated}], ensure_keepalive(NChannel)}; + {reply, ok, [{event, updated}], + ensure_keepalive(cancel_timer(idle_timer, NChannel))}; handle_call({subscribe, TopicFilter, Qos}, Channel = #channel{ @@ -561,6 +579,12 @@ reset_timer(Name, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. +cancel_timer(Name, Channel = #channel{timers = Timers}) -> + emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), + clean_timer(Name, Channel). + +interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> + IdleTimeout; interval(force_timer, _) -> 15000; interval(alive_timer, #channel{keepalive = Keepalive}) -> From ec5c0816f70258822b0d4e3517e3e9e5138ab168 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 3 Aug 2022 13:59:49 +0800 Subject: [PATCH 2/2] chore: update app.src & appup.src --- CHANGES-4.3.md | 6 ++++++ apps/emqx_exproto/src/emqx_exproto.app.src | 2 +- apps/emqx_exproto/src/emqx_exproto.appup.src | 8 ++++++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index db82c8eb1..d67f8e838 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -10,6 +10,12 @@ File format: - One list item per change topic Change log ends with a list of github PRs +## v4.3.19 + +### Bug fixes + +- Add a idle timer for ExProto UDP client to avoid client leaking [#8628] + ## v4.3.18 ### Enhancements diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index 76bc9fa85..a267d1daf 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.9"}, %% 4.3.3 is used by ee + {vsn, "4.3.10"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index d6406b239..8e0380089 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,7 +1,9 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.3\\.[2-8]">>, + [{"4.3.9", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-8]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>, @@ -10,7 +12,9 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[2-8]">>, + [{"4.3.9", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {<<"4\\.3\\.[2-8]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>,