Merge pull request #8628 from HJianBo/start-idle-checking-timer

Fix(exproto): start idle timer to avoid client leaking
This commit is contained in:
JianBo He 2022-08-10 17:03:18 +08:00 committed by GitHub
commit 3e0f9b3881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 7 deletions

View File

@ -21,6 +21,12 @@ File format:
- Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654).
## v4.3.19
### Bug fixes
- Add a idle timer for ExProto UDP client to avoid client leaking [#8628]
## v4.3.18
### Enhancements

View File

@ -1,6 +1,6 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.9"}, %% 4.3.3 is used by ee
{vsn, "4.3.10"}, %% 4.3.3 is used by ee
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},

View File

@ -1,7 +1,9 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{<<"4\\.3\\.[2-8]">>,
[{"4.3.9",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-8]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,
@ -10,7 +12,9 @@
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[2-8]">>,
[{"4.3.9",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[2-8]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,

View File

@ -76,7 +76,8 @@
-define(TIMER_TABLE, #{
alive_timer => keepalive,
force_timer => force_close
force_timer => force_close,
idle_timer => force_close_idle
}).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@ -94,6 +95,8 @@
awaiting_rel_max
]).
-define(DEFAULT_IDLE_TIMEOUT, 30000).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
@ -149,8 +152,12 @@ init(ConnInfo = #{socktype := Socktype,
GRpcChann = proplists:get_value(handler, Options),
NConnInfo = default_conninfo(ConnInfo),
ClientInfo = default_clientinfo(ConnInfo),
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
NConnInfo1 = NConnInfo#{idle_timeout => IdleTimeout},
Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo,
conninfo = NConnInfo1,
clientinfo = ClientInfo,
conn_state = accepted,
timers = #{}
@ -165,7 +172,8 @@ init(ConnInfo = #{socktype := Socktype,
#{socktype => socktype(Socktype),
peername => address(Peername),
sockname => address(Sockname)})},
try_dispatch(on_socket_created, wrap(Req), Channel)
start_idle_checking_timer(
try_dispatch(on_socket_created, wrap(Req), Channel))
end.
register_the_anonymous_client(ClientInfo, ConnInfo) ->
@ -184,6 +192,12 @@ register_the_anonymous_client(ClientInfo, ConnInfo) ->
unregister_the_anonymous_client(ClientId) ->
emqx_cm:unregister_channel(ClientId).
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
ensure_timer(idle_timer, Channel);
start_idle_checking_timer(Channel) ->
Channel.
%% @private
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined ->
@ -267,6 +281,9 @@ handle_timeout(_TRef, {keepalive, StatVal},
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, Reason, Channel};
handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel};
handle_timeout(_TRef, Msg, Channel) ->
?WARN("Unexpected timeout: ~p", [Msg]),
{ok, Channel}.
@ -328,7 +345,8 @@ handle_call({start_timer, keepalive, Interval},
NConnInfo = ConnInfo#{keepalive => Interval},
NClientInfo = ClientInfo#{keepalive => Interval},
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
{reply, ok, [{event, updated}], ensure_keepalive(NChannel)};
{reply, ok, [{event, updated}],
ensure_keepalive(cancel_timer(idle_timer, NChannel))};
handle_call({subscribe, TopicFilter, Qos},
Channel = #channel{
@ -561,6 +579,12 @@ reset_timer(Name, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}.
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
clean_timer(Name, Channel).
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout;
interval(force_timer, _) ->
15000;
interval(alive_timer, #channel{keepalive = Keepalive}) ->