From dfc6e346801fea1023bb5adf0f2132b49c74d839 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 1 Sep 2022 18:03:05 +0800 Subject: [PATCH 01/10] fix(exproto): avoid udp client process leaking porting from v4.x: - https://github.com/emqx/emqx/pull/8575 - https://github.com/emqx/emqx/pull/8628 - https://github.com/emqx/emqx/pull/8725 --- apps/emqx_exhook/src/emqx_exhook_mgr.erl | 2 +- apps/emqx_exhook/src/emqx_exhook_server.erl | 6 +- .../src/bhvrs/emqx_gateway_conn.erl | 11 ++- .../src/exproto/emqx_exproto_channel.erl | 68 +++++++++++++++---- .../src/exproto/emqx_exproto_gcli.erl | 15 +++- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 6 +- rebar.config | 2 +- 7 files changed, 85 insertions(+), 25 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index cf83e8eb9..9b062e914 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -298,6 +298,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State = #{servers := Servers}) -> + _ = unload_exhooks(), _ = maps:fold( fun(Name, _, AccIn) -> do_unload_server(Name, AccIn) @@ -305,7 +306,6 @@ terminate(_Reason, State = #{servers := Servers}) -> State, Servers ), - _ = unload_exhooks(), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index b15724ff2..e5075bce4 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -179,13 +179,15 @@ filter(Ls) -> -spec unload(server()) -> ok. unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> - _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), + _ = do_deinit(Name, ReqOpts), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. do_deinit(Name, ReqOpts) -> - _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts), + %% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mgr + %% force killed by upper supervisor + _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 9e65c7eea..02d6090b4 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -240,6 +240,11 @@ esockd_send(Data, #state{ esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> esockd_transport:async_send(Sock, Data). +keepalive_stats(recv_oct) -> + emqx_pd:get_counter(incoming_bytes); +keepalive_stats(send_oct) -> + emqx_pd:get_counter(outgoing_bytes). + is_datadram_socket({esockd_transport, _}) -> false; is_datadram_socket({udp, _, _}) -> true. @@ -651,9 +656,9 @@ handle_timeout( disconnected -> {ok, State}; _ -> - case esockd_getstat(Socket, [Stat]) of - {ok, [{Stat, RecvOct}]} -> - handle_timeout(TRef, {Keepalive, RecvOct}, State); + case keepalive_stats(Stat) of + {ok, Oct} -> + handle_timeout(TRef, {Keepalive, Oct}, State); {error, Reason} -> handle_info({sock_error, Reason}, State) end diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 861ae3189..3380f35be 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -78,11 +78,14 @@ -define(TIMER_TABLE, #{ alive_timer => keepalive, - force_timer => force_close + force_timer => force_close, + idle_timer => force_close_idle }). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). +-define(DEFAULT_IDLE_TIMEOUT, 30000). + %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -151,14 +154,17 @@ init( Ctx = maps:get(ctx, Options), GRpcChann = maps:get(handler, Options), PoolName = maps:get(pool_name, Options), - NConnInfo = default_conninfo(ConnInfo), + IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), + + NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}), ListenerId = case maps:get(listener, Options, undefined) of undefined -> undefined; {GwName, Type, LisName} -> emqx_gateway_utils:listener_id(GwName, Type, LisName) end, + EnableAuthn = maps:get(enable_authn, Options, true), - DefaultClientInfo = default_clientinfo(ConnInfo), + DefaultClientInfo = default_clientinfo(NConnInfo), ClientInfo = DefaultClientInfo#{ listener => ListenerId, enable_authn => EnableAuthn @@ -183,7 +189,9 @@ init( } ) }, - try_dispatch(on_socket_created, wrap(Req), Channel). + start_idle_checking_timer( + try_dispatch(on_socket_created, wrap(Req), Channel) + ). %% @private peercert(NoSsl, ConnInfo) when @@ -217,6 +225,12 @@ socktype(dtls) -> 'DTLS'. address({Host, Port}) -> #{host => inet:ntoa(Host), port => Port}. +%% avoid udp connection process leak +start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) -> + ensure_timer(idle_timer, Channel); +start_idle_checking_timer(Channel) -> + Channel. + %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- @@ -285,10 +299,15 @@ handle_timeout( {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> Req = #{type => 'KEEPALIVE'}, - {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)} + NChannel = clean_timer(alive_timer, Channel), + %% close connection if keepalive timeout + Replies = [{event, disconnected}, {close, normal}], + {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> {shutdown, {error, {force_close, Reason}}, Channel}; +handle_timeout(_TRef, force_close_idle, Channel) -> + {shutdown, idle_timeout, Channel}; handle_timeout(_TRef, Msg, Channel) -> ?SLOG(warning, #{ msg => "unexpected_timeout_signal", @@ -390,7 +409,7 @@ handle_call( NConnInfo = ConnInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, - {reply, ok, ensure_keepalive(NChannel)}; + {reply, ok, [{event, updated}], ensure_keepalive(cancel_timer(idle_timer, NChannel))}; handle_call( {subscribe_from_client, TopicFilter, Qos}, _From, @@ -405,21 +424,21 @@ handle_call( {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel}; _ -> {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), - {reply, ok, NChannel} + {reply, ok, [{event, updated}], NChannel} end; handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> {ok, [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel), - {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel}; + {reply, {ok, {NTopicFilter, NSubOpts}}, [{event, updated}], NChannel}; handle_call( {unsubscribe_from_client, TopicFilter}, _From, Channel = #channel{conn_state = connected} ) -> {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), - {reply, ok, NChannel}; + {reply, ok, [{event, updated}], NChannel}; handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, NChannel} = do_unsubscribe([Topic], Channel), - {reply, ok, NChannel}; + {reply, ok, [{event, update}], NChannel}; handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> {reply, {ok, maps:to_list(Subs)}, Channel}; handle_call( @@ -446,7 +465,7 @@ handle_call( {reply, ok, Channel} end; handle_call(kick, _From, Channel) -> - {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)}; + {reply, ok, [{event, disconnected}, {close, kicked}], Channel}; handle_call(discard, _From, Channel) -> {shutdown, discarded, ok, Channel}; handle_call(Req, _From, Channel) -> @@ -671,6 +690,12 @@ reset_timer(Name, Channel) -> clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. +cancel_timer(Name, Channel = #channel{timers = Timers}) -> + emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), + clean_timer(Name, Channel). + +interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> + IdleTimeout; interval(force_timer, _) -> 15000; interval(alive_timer, #channel{keepalive = Keepalive}) -> @@ -722,10 +747,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}. -default_conninfo(ConnInfo) -> +default_conninfo(ConnInfo = #{peername := {PeerHost, PeerPort}}) -> ConnInfo#{ clean_start => true, - clientid => undefined, + clientid => anonymous_clientid(PeerHost, PeerPort), username => undefined, conn_props => #{}, connected => true, @@ -739,14 +764,15 @@ default_conninfo(ConnInfo) -> default_clientinfo(#{ peername := {PeerHost, _}, - sockname := {_, SockPort} + sockname := {_, SockPort}, + clientid := ClientId }) -> #{ zone => default, protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => undefined, + clientid => ClientId, username => undefined, is_bridge => false, is_superuser => false, @@ -764,3 +790,15 @@ proto_name_to_protocol(<<>>) -> exproto; proto_name_to_protocol(ProtoName) when is_binary(ProtoName) -> binary_to_atom(ProtoName). + +anonymous_clientid(PeerHost, PeerPort) -> + iolist_to_binary( + [ + "exproto-anonymous-", + inet:ntoa(PeerHost), + "-", + integer_to_list(PeerPort), + "-", + emqx_misc:gen_id() + ] + ). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index d1bf4ba94..45b502798 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -61,7 +61,19 @@ async_call( Req = #{conn := Conn}, Options = #{pool_name := PoolName} ) -> - cast(pick(PoolName, Conn), {rpc, FunName, Req, Options, self()}). + case pick(PoolName, Conn) of + false -> + ?SLOG( + error, + #{ + msg => "no_available_grpc_client", + function => FunName, + request => Req + } + ); + Pid when is_pid(Pid) -> + cast(Pid, {rpc, FunName, Req, Options, self()}) + end. %%-------------------------------------------------------------------- %% cast, pick @@ -72,6 +84,7 @@ async_call( cast(Deliver, Msg) -> gen_server:cast(Deliver, Msg). +-spec pick(term(), term()) -> pid() | false. pick(PoolName, Conn) -> gproc_pool:pick_worker(PoolName, Conn). diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 66f780d3f..65725fd19 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -240,8 +240,10 @@ t_keepalive_timeout(Cfg) -> send(Sock, ConnBin), {ok, ConnAckBin} = recv(Sock, 5000), - DisconnectBin = frame_disconnect(), - {ok, DisconnectBin} = recv(Sock, 10000), + %% Timed out connections are closed immediately, + %% so there may not be a disconnect message here + %%DisconnectBin = frame_disconnect(), + %%{ok, DisconnectBin} = recv(Sock, 10000), SockType =/= udp andalso begin diff --git a/rebar.config b/rebar.config index 271e274da..e9a8cc425 100644 --- a/rebar.config +++ b/rebar.config @@ -56,7 +56,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.4"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.4"} From ebb2824e1516fa6aa33645c4de92981d10b2b620 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 2 Sep 2022 10:26:14 +0800 Subject: [PATCH 02/10] test: ensure hooks has unloaded if grpc is blocked --- apps/emqx_exhook/src/emqx_exhook_mgr.erl | 4 ++- apps/emqx_exhook/src/emqx_exhook_server.erl | 6 ++-- apps/emqx_exhook/src/emqx_exhook_sup.erl | 1 + apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 35 +++++++++++++++++++ .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 5 ++- .../src/bhvrs/emqx_gateway_conn.erl | 1 - 6 files changed, 46 insertions(+), 6 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 9b062e914..e58555ca1 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -21,6 +21,7 @@ -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% APIs -export([start_link/0]). @@ -297,7 +298,7 @@ handle_info(refresh_tick, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State = #{servers := Servers}) -> +terminate(Reason, State = #{servers := Servers}) -> _ = unload_exhooks(), _ = maps:fold( fun(Name, _, AccIn) -> @@ -306,6 +307,7 @@ terminate(_Reason, State = #{servers := Servers}) -> State, Servers ), + ?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index e5075bce4..311cff316 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -185,9 +185,9 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> ok. do_deinit(Name, ReqOpts) -> - %% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mgr - %% force killed by upper supervisor - _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}), + %% Override the request timeout to deinit grpc server to + %% avoid emqx_exhook_mgr force killed by upper supervisor + _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 5000}), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index fb424ddff..3423eaf59 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -32,6 +32,7 @@ id => Mod, start => {Mod, start_link, Args}, type => Type, + %% long timeout for emqx_exhook_mgr shutdown => 15000 }). diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 62606cf18..a0472b2c3 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl). @@ -313,6 +314,40 @@ t_cluster_name(_) -> ), emqx_exhook_mgr:disable(<<"default">>). +t_stop_timeout(_) -> + snabbkaffe:start_trace(), + meck:new(emqx_exhook_demo_svr, [passthrough, no_history]), + meck:expect( + emqx_exhook_demo_svr, + on_provider_unloaded, + fun(Req, Md) -> + %% ensure sleep time greater than emqx_exhook_mgr shutdown timeout + timer:sleep(20000), + meck:passthrough([Req, Md]) + end + ), + + %% stop application + application:stop(emqx_exhook), + ?block_until(#{?snk_kind := exhook_mgr_terminated}, 20000), + + %% all exhook hooked point should be unloaded + Mods = lists:flatten( + lists:map( + fun({hook, _, Cbs}) -> + lists:map(fun({callback, {M, _, _}, _, _}) -> M end, Cbs) + end, + ets:tab2list(emqx_hooks) + ) + ), + ?assertEqual(false, lists:any(fun(M) -> M == emqx_exhook_handler end, Mods)), + + %% ensure started for other tests + emqx_common_test_helpers:start_apps([emqx_exhook]), + + snabbkaffe:stop(), + meck:unload(emqx_exhook_demo_svr). + %%-------------------------------------------------------------------- %% Cases Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index ea8398eeb..bf1a42c9a 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -80,7 +80,10 @@ stop() -> stop(Name) -> grpc:stop_server(Name), - to_atom_name(Name) ! stop. + case whereis(to_atom_name(Name)) of + undefined -> ok; + Pid -> Pid ! stop + end. take() -> to_atom_name(?NAME) ! {take, self()}, diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 02d6090b4..786142cdd 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -640,7 +640,6 @@ handle_timeout( Keepalive, State = #state{ chann_mod = ChannMod, - socket = Socket, channel = Channel } ) when From f8614196ac1e69ba0e2ca3eba571eb47829ecddb Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 2 Sep 2022 14:50:17 +0800 Subject: [PATCH 03/10] test: ensure udp client keepalive value getting right value --- .../src/bhvrs/emqx_gateway_conn.erl | 19 ++++--- .../src/exproto/emqx_exproto_channel.erl | 9 ++- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 56 ++++++++++++++----- 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 786142cdd..24d63f02c 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API -export([ @@ -51,6 +52,9 @@ %% Internal callback -export([wakeup_from_hib/2, recvloop/2]). +%% for channel module +-export([keepalive_stats/1]). + -record(state, { %% TCP/SSL/UDP/DTLS Wrapped Socket socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, @@ -573,9 +577,15 @@ terminate( channel = Channel } ) -> - ?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}), _ = ChannMod:terminate(Reason, Channel), _ = close_socket(State), + ClientId = + try ChannMod:info(clientid, Channel) of + Id -> Id + catch + _:_ -> undefined + end, + ?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}), exit(Reason). %%-------------------------------------------------------------------- @@ -655,12 +665,7 @@ handle_timeout( disconnected -> {ok, State}; _ -> - case keepalive_stats(Stat) of - {ok, Oct} -> - handle_timeout(TRef, {Keepalive, Oct}, State); - {error, Reason} -> - handle_info({sock_error, Reason}, State) - end + handle_timeout(TRef, {Keepalive, keepalive_stats(Stat)}, State) end; handle_timeout( _TRef, diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 3380f35be..94ca031aa 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -84,8 +84,6 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). --define(DEFAULT_IDLE_TIMEOUT, 30000). - %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -154,7 +152,7 @@ init( Ctx = maps:get(ctx, Options), GRpcChann = maps:get(handler, Options), PoolName = maps:get(pool_name, Options), - IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), + IdleTimeout = emqx_gateway_utils:idle_timeout(Options), NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}), ListenerId = @@ -301,7 +299,7 @@ handle_timeout( Req = #{type => 'KEEPALIVE'}, NChannel = clean_timer(alive_timer, Channel), %% close connection if keepalive timeout - Replies = [{event, disconnected}, {close, normal}], + Replies = [{event, disconnected}, {close, keepalive_timeout}], {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> @@ -667,7 +665,8 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) -> ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> - Keepalive = emqx_keepalive:init(timer:seconds(Interval)), + StatVal = emqx_gateway_conn:keepalive_stats(recv_oct), + Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(Name, Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 65725fd19..73dc2ad45 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("eunit/include/eunit.hrl"). -import( emqx_exproto_echo_svr, @@ -38,6 +39,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TCPOPTS, [binary, {active, false}]). -define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). @@ -223,14 +225,16 @@ t_acl_deny(Cfg) -> close(Sock). t_keepalive_timeout(Cfg) -> + ok = snabbkaffe:start_trace(), SockType = proplists:get_value(listener_type, Cfg), Sock = open(SockType), + ClientId1 = <<"keepalive_test_client1">>, Client = #{ proto_name => <<"demo">>, proto_ver => <<"v0.1">>, - clientid => <<"test_client_1">>, - keepalive => 2 + clientid => ClientId1, + keepalive => 5 }, Password = <<"123456">>, @@ -238,18 +242,41 @@ t_keepalive_timeout(Cfg) -> ConnAckBin = frame_connack(0), send(Sock, ConnBin), - {ok, ConnAckBin} = recv(Sock, 5000), + {ok, ConnAckBin} = recv(Sock), - %% Timed out connections are closed immediately, - %% so there may not be a disconnect message here - %%DisconnectBin = frame_disconnect(), - %%{ok, DisconnectBin} = recv(Sock, 10000), - - SockType =/= udp andalso - begin - {error, closed} = recv(Sock, 5000) - end, - ok. + case SockType of + udp -> + %% another udp client should not affect the first + %% udp client keepalive check + timer:sleep(4000), + Sock2 = open(SockType), + ConnBin2 = frame_connect( + Client#{clientid => <<"keepalive_test_client2">>}, + Password + ), + send(Sock2, ConnBin2), + %% first client will be keepalive timeouted in 6s + ?assertMatch( + {ok, #{ + clientid := ClientId1, + reason := {shutdown, {sock_closed, keepalive_timeout}} + }}, + ?block_until(#{?snk_kind := conn_process_terminated}, 8000) + ); + _ -> + ?assertMatch( + {ok, #{ + clientid := ClientId1, + reason := {shutdown, {sock_closed, keepalive_timeout}} + }}, + ?block_until(#{?snk_kind := conn_process_terminated}, 12000) + ), + Trace = snabbkaffe:collect_trace(), + %% conn process should be terminated + ?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))), + %% socket port should be closed + ?assertEqual({error, closed}, recv(Sock, 5000)) + end. t_hook_connected_disconnected(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), @@ -424,6 +451,9 @@ send({ssl, Sock}, Bin) -> send({dtls, Sock}, Bin) -> ssl:send(Sock, Bin). +recv(Sock) -> + recv(Sock, infinity). + recv({tcp, Sock}, Ts) -> gen_tcp:recv(Sock, 0, Ts); recv({udp, Sock}, Ts) -> From fbc213086566f921609d68178f19ae577f0b4aab Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 2 Sep 2022 15:19:08 +0800 Subject: [PATCH 04/10] test: add test for idle_timeout parameter --- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 73dc2ad45..8e9783003 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -89,6 +89,7 @@ set_special_cfg(emqx_gateway) -> [gateway, exproto], #{ server => #{bind => 9100}, + idle_timeout => 5000, handler => #{address => "http://127.0.0.1:9001"}, listeners => listener_confs(LisType) } @@ -402,6 +403,44 @@ t_hook_message_delivered(Cfg) -> close(Sock), emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}). +t_idle_timeout(Cfg) -> + ok = snabbkaffe:start_trace(), + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + %% need to create udp client by sending something + case SockType of + udp -> + %% nothing to do + meck:new(emqx_exproto_echo_svr, [passthrough, no_history]), + meck:expect( + emqx_exproto_echo_svr, + on_received_bytes, + fun(Stream, _Md) -> {ok, Stream} end + ), + %% send request, but nobody can respond to it + ClientId = <<"idle_test_client1">>, + Client = #{ + proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => ClientId, + keepalive => 5 + }, + Password = <<"123456">>, + ConnBin = frame_connect(Client, Password), + send(Sock, ConnBin), + ?assertMatch( + {ok, #{reason := {shutdown, idle_timeout}}}, + ?block_until(#{?snk_kind := conn_process_terminated}, 10000) + ), + meck:unload(emqx_exproto_echo_svr); + _ -> + ?assertMatch( + {ok, #{reason := {shutdown, idle_timeout}}}, + ?block_until(#{?snk_kind := conn_process_terminated}, 10000) + ) + end. + %%-------------------------------------------------------------------- %% Utils From 880371934671bb491bf78e6e7fba605e6e9d38a8 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 2 Sep 2022 15:24:10 +0800 Subject: [PATCH 05/10] chore: update changes --- CHANGES-5.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES-5.0.md b/CHANGES-5.0.md index 8f85047e2..db1d09758 100644 --- a/CHANGES-5.0.md +++ b/CHANGES-5.0.md @@ -10,6 +10,8 @@ * Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857) * Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911) * Fix that redis authn will deny the unknown users [#8934](https://github.com/emqx/emqx/pull/8934) +* Fix ExProto UDP client keepalive checking error. + This causes the clients to not expire as long as a new UDP packet arrives [#8866](https://github.com/emqx/emqx/pull/8866) ## Enhancements @@ -18,6 +20,8 @@ * Remove `node.etc_dir` from emqx.conf, because it is never used. Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892) * Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876) +* Close ExProto client process immediately if it's keepalive timeouted. [#8866](https://github.com/emqx/emqx/pull/8866) +* Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8866](https://github.com/emqx/emqx/pull/8866) # 5.0.7 From 533569ad2a1dcd6b6f53af2455df03cea8518cd9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 2 Sep 2022 15:44:20 +0800 Subject: [PATCH 06/10] chore: fix app vsn check --- apps/emqx_exhook/src/emqx_exhook.app.src | 2 +- apps/emqx_gateway/src/emqx_gateway.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index c4a43d846..f10155f0e 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [ {description, "EMQX Extension for Hook"}, - {vsn, "5.0.4"}, + {vsn, "5.0.3"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 47245c0a2..9fb78c825 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.5"}, + {vsn, "0.1.4"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, grpc, emqx, emqx_authn]}, From aed2df58114bca90d4c79bca3dd22590daf57f57 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 2 Sep 2022 16:09:15 +0800 Subject: [PATCH 07/10] chore: update grpc vsn in mix.exs --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 3f55de64b..c4c827f92 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, {:ekka, github: "emqx/ekka", tag: "0.13.4", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, - {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true}, + {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2"}, {:replayq, "0.3.4", override: true}, From 4384cae29e7123469d33fb110ceaee0f12aa770f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 8 Sep 2022 10:58:20 +0800 Subject: [PATCH 08/10] test: fix failed tests --- apps/emqx_gateway/test/emqx_exproto_SUITE.erl | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 8e9783003..0e863a14c 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -64,6 +64,9 @@ all() -> [{group, Name} || Name <- metrics()]. +suite() -> + [{timetrap, {seconds, 30}}]. + groups() -> Cases = emqx_common_test_helpers:all(?MODULE), [{Name, Cases} || Name <- metrics()]. @@ -277,7 +280,8 @@ t_keepalive_timeout(Cfg) -> ?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))), %% socket port should be closed ?assertEqual({error, closed}, recv(Sock, 5000)) - end. + end, + snabbkaffe:stop(). t_hook_connected_disconnected(Cfg) -> SockType = proplists:get_value(listener_type, Cfg), @@ -367,6 +371,8 @@ t_hook_session_subscribed_unsubscribed(Cfg) -> error(hook_is_not_running) end, + send(Sock, frame_disconnect()), + close(Sock), emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}), emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}). @@ -412,11 +418,14 @@ t_idle_timeout(Cfg) -> case SockType of udp -> %% nothing to do - meck:new(emqx_exproto_echo_svr, [passthrough, no_history]), - meck:expect( - emqx_exproto_echo_svr, - on_received_bytes, - fun(Stream, _Md) -> {ok, Stream} end + ok = meck:new(emqx_exproto_gcli, [passthrough, no_history]), + ok = meck:expect( + emqx_exproto_gcli, + async_call, + fun(FunName, _Req, _GClient) -> + self() ! {hreply, FunName, ok}, + ok + end ), %% send request, but nobody can respond to it ClientId = <<"idle_test_client1">>, @@ -433,13 +442,14 @@ t_idle_timeout(Cfg) -> {ok, #{reason := {shutdown, idle_timeout}}}, ?block_until(#{?snk_kind := conn_process_terminated}, 10000) ), - meck:unload(emqx_exproto_echo_svr); + ok = meck:unload(emqx_exproto_gcli); _ -> ?assertMatch( {ok, #{reason := {shutdown, idle_timeout}}}, ?block_until(#{?snk_kind := conn_process_terminated}, 10000) ) - end. + end, + snabbkaffe:stop(). %%-------------------------------------------------------------------- %% Utils From 44f8108228cae55c3d099391134e66c4f1aa45a3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 8 Sep 2022 11:03:17 +0800 Subject: [PATCH 09/10] chore: update app vsn --- apps/emqx_exhook/src/emqx_exhook.app.src | 2 +- apps/emqx_gateway/src/emqx_gateway.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index f10155f0e..c4a43d846 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [ {description, "EMQX Extension for Hook"}, - {vsn, "5.0.3"}, + {vsn, "5.0.4"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 9fb78c825..47245c0a2 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, grpc, emqx, emqx_authn]}, From 522f650096eb1da200b0afa6704309f7884a1227 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 13 Sep 2022 10:28:53 +0800 Subject: [PATCH 10/10] chore: apply review suggestions --- apps/emqx_exhook/include/emqx_exhook.hrl | 2 ++ apps/emqx_exhook/src/emqx_exhook_server.erl | 3 +- apps/emqx_exhook/src/emqx_exhook_sup.erl | 17 +++++++--- .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 10 ++++-- .../src/bhvrs/emqx_gateway_conn.erl | 16 +++++----- .../src/exproto/emqx_exproto_channel.erl | 31 +++++++------------ .../src/exproto/emqx_exproto_gcli.erl | 13 +++----- 7 files changed, 48 insertions(+), 44 deletions(-) diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 6c386e688..7436b2a1c 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -43,6 +43,8 @@ {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} ]). +-define(SERVER_FORCE_SHUTDOWN_TIMEOUT, 5000). + -endif. -define(CMD_MOVE_FRONT, front). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 311cff316..9c89915aa 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -187,7 +187,8 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> do_deinit(Name, ReqOpts) -> %% Override the request timeout to deinit grpc server to %% avoid emqx_exhook_mgr force killed by upper supervisor - _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 5000}), + NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT}, + _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index 3423eaf59..cd49d89bb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -16,6 +16,8 @@ -module(emqx_exhook_sup). +-include("emqx_exhook.hrl"). + -behaviour(supervisor). -export([ @@ -28,12 +30,13 @@ stop_grpc_client_channel/1 ]). --define(CHILD(Mod, Type, Args), #{ +-define(DEFAULT_TIMEOUT, 5000). + +-define(CHILD(Mod, Type, Args, Timeout), #{ id => Mod, start => {Mod, start_link, Args}, type => Type, - %% long timeout for emqx_exhook_mgr - shutdown => 15000 + shutdown => Timeout }). %%-------------------------------------------------------------------- @@ -46,7 +49,7 @@ start_link() -> init([]) -> _ = emqx_exhook_metrics:init(), _ = emqx_exhook_mgr:init_ref_counter_table(), - Mngr = ?CHILD(emqx_exhook_mgr, worker, []), + Mngr = ?CHILD(emqx_exhook_mgr, worker, [], force_shutdown_timeout()), {ok, {{one_for_one, 10, 100}, [Mngr]}}. %%-------------------------------------------------------------------- @@ -71,3 +74,9 @@ stop_grpc_client_channel(Name) -> _:_:_ -> ok end. + +%% Calculate the maximum timeout, which will help to shutdown the +%% emqx_exhook_mgr process correctly. +force_shutdown_timeout() -> + Factor = max(3, length(emqx:get_config([exhook, servers])) + 1), + Factor * ?SERVER_FORCE_SHUTDOWN_TIMEOUT. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index bf1a42c9a..b566b7ab2 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -81,8 +81,14 @@ stop() -> stop(Name) -> grpc:stop_server(Name), case whereis(to_atom_name(Name)) of - undefined -> ok; - Pid -> Pid ! stop + undefined -> + ok; + Pid -> + Ref = erlang:monitor(process, Pid), + Pid ! stop, + receive + {'DOWN', Ref, process, Pid, _Reason} -> ok + end end. take() -> diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 24d63f02c..99ac3a38f 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -244,10 +244,10 @@ esockd_send(Data, #state{ esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> esockd_transport:async_send(Sock, Data). -keepalive_stats(recv_oct) -> - emqx_pd:get_counter(incoming_bytes); -keepalive_stats(send_oct) -> - emqx_pd:get_counter(outgoing_bytes). +keepalive_stats(recv) -> + emqx_pd:get_counter(recv_pkt); +keepalive_stats(send) -> + emqx_pd:get_counter(send_pkt). is_datadram_socket({esockd_transport, _}) -> false; is_datadram_socket({udp, _, _}) -> true. @@ -656,16 +656,16 @@ handle_timeout( Keepalive == keepalive; Keepalive == keepalive_send -> - Stat = + StatVal = case Keepalive of - keepalive -> recv_oct; - keepalive_send -> send_oct + keepalive -> keepalive_stats(recv); + keepalive_send -> keepalive_stats(send) end, case ChannMod:info(conn_state, Channel) of disconnected -> {ok, State}; _ -> - handle_timeout(TRef, {Keepalive, keepalive_stats(Stat)}, State) + handle_timeout(TRef, {Keepalive, StatVal}, State) end; handle_timeout( _TRef, diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 94ca031aa..be4cddcaa 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -297,7 +297,7 @@ handle_timeout( {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> Req = #{type => 'KEEPALIVE'}, - NChannel = clean_timer(alive_timer, Channel), + NChannel = remove_timer_ref(alive_timer, Channel), %% close connection if keepalive timeout Replies = [{event, disconnected}, {close, keepalive_timeout}], {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} @@ -665,7 +665,7 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) -> ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> - StatVal = emqx_gateway_conn:keepalive_stats(recv_oct), + StatVal = emqx_gateway_conn:keepalive_stats(recv), Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). @@ -684,14 +684,14 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> - ensure_timer(Name, clean_timer(Name, Channel)). - -clean_timer(Name, Channel = #channel{timers = Timers}) -> - Channel#channel{timers = maps:remove(Name, Timers)}. + ensure_timer(Name, remove_timer_ref(Name, Channel)). cancel_timer(Name, Channel = #channel{timers = Timers}) -> emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), - clean_timer(Name, Channel). + remove_timer_ref(Name, Channel). + +remove_timer_ref(Name, Channel = #channel{timers = Timers}) -> + Channel#channel{timers = maps:remove(Name, Timers)}. interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> IdleTimeout; @@ -746,10 +746,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}. -default_conninfo(ConnInfo = #{peername := {PeerHost, PeerPort}}) -> +default_conninfo(ConnInfo) -> ConnInfo#{ clean_start => true, - clientid => anonymous_clientid(PeerHost, PeerPort), + clientid => anonymous_clientid(), username => undefined, conn_props => #{}, connected => true, @@ -790,14 +790,5 @@ proto_name_to_protocol(<<>>) -> proto_name_to_protocol(ProtoName) when is_binary(ProtoName) -> binary_to_atom(ProtoName). -anonymous_clientid(PeerHost, PeerPort) -> - iolist_to_binary( - [ - "exproto-anonymous-", - inet:ntoa(PeerHost), - "-", - integer_to_list(PeerPort), - "-", - emqx_misc:gen_id() - ] - ). +anonymous_clientid() -> + iolist_to_binary(["exproto-", emqx_misc:gen_id()]). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index 45b502798..cf8ed76a7 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -56,6 +56,7 @@ start_link(Pool, Id) -> [] ). +-spec async_call(atom(), map(), map()) -> ok. async_call( FunName, Req = #{conn := Conn}, @@ -63,17 +64,11 @@ async_call( ) -> case pick(PoolName, Conn) of false -> - ?SLOG( - error, - #{ - msg => "no_available_grpc_client", - function => FunName, - request => Req - } - ); + reply(self(), FunName, {error, no_available_grpc_client}); Pid when is_pid(Pid) -> cast(Pid, {rpc, FunName, Req, Options, self()}) - end. + end, + ok. %%-------------------------------------------------------------------- %% cast, pick