diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 6c386e688..7436b2a1c 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -43,6 +43,8 @@ {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} ]). +-define(SERVER_FORCE_SHUTDOWN_TIMEOUT, 5000). + -endif. -define(CMD_MOVE_FRONT, front). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 311cff316..9c89915aa 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -187,7 +187,8 @@ unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> do_deinit(Name, ReqOpts) -> %% Override the request timeout to deinit grpc server to %% avoid emqx_exhook_mgr force killed by upper supervisor - _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 5000}), + NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT}, + _ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index 3423eaf59..cd49d89bb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -16,6 +16,8 @@ -module(emqx_exhook_sup). +-include("emqx_exhook.hrl"). + -behaviour(supervisor). -export([ @@ -28,12 +30,13 @@ stop_grpc_client_channel/1 ]). --define(CHILD(Mod, Type, Args), #{ +-define(DEFAULT_TIMEOUT, 5000). + +-define(CHILD(Mod, Type, Args, Timeout), #{ id => Mod, start => {Mod, start_link, Args}, type => Type, - %% long timeout for emqx_exhook_mgr - shutdown => 15000 + shutdown => Timeout }). %%-------------------------------------------------------------------- @@ -46,7 +49,7 @@ start_link() -> init([]) -> _ = emqx_exhook_metrics:init(), _ = emqx_exhook_mgr:init_ref_counter_table(), - Mngr = ?CHILD(emqx_exhook_mgr, worker, []), + Mngr = ?CHILD(emqx_exhook_mgr, worker, [], force_shutdown_timeout()), {ok, {{one_for_one, 10, 100}, [Mngr]}}. %%-------------------------------------------------------------------- @@ -71,3 +74,9 @@ stop_grpc_client_channel(Name) -> _:_:_ -> ok end. + +%% Calculate the maximum timeout, which will help to shutdown the +%% emqx_exhook_mgr process correctly. +force_shutdown_timeout() -> + Factor = max(3, length(emqx:get_config([exhook, servers])) + 1), + Factor * ?SERVER_FORCE_SHUTDOWN_TIMEOUT. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index bf1a42c9a..b566b7ab2 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -81,8 +81,14 @@ stop() -> stop(Name) -> grpc:stop_server(Name), case whereis(to_atom_name(Name)) of - undefined -> ok; - Pid -> Pid ! stop + undefined -> + ok; + Pid -> + Ref = erlang:monitor(process, Pid), + Pid ! stop, + receive + {'DOWN', Ref, process, Pid, _Reason} -> ok + end end. take() -> diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 24d63f02c..99ac3a38f 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -244,10 +244,10 @@ esockd_send(Data, #state{ esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> esockd_transport:async_send(Sock, Data). -keepalive_stats(recv_oct) -> - emqx_pd:get_counter(incoming_bytes); -keepalive_stats(send_oct) -> - emqx_pd:get_counter(outgoing_bytes). +keepalive_stats(recv) -> + emqx_pd:get_counter(recv_pkt); +keepalive_stats(send) -> + emqx_pd:get_counter(send_pkt). is_datadram_socket({esockd_transport, _}) -> false; is_datadram_socket({udp, _, _}) -> true. @@ -656,16 +656,16 @@ handle_timeout( Keepalive == keepalive; Keepalive == keepalive_send -> - Stat = + StatVal = case Keepalive of - keepalive -> recv_oct; - keepalive_send -> send_oct + keepalive -> keepalive_stats(recv); + keepalive_send -> keepalive_stats(send) end, case ChannMod:info(conn_state, Channel) of disconnected -> {ok, State}; _ -> - handle_timeout(TRef, {Keepalive, keepalive_stats(Stat)}, State) + handle_timeout(TRef, {Keepalive, StatVal}, State) end; handle_timeout( _TRef, diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 94ca031aa..be4cddcaa 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -297,7 +297,7 @@ handle_timeout( {ok, reset_timer(alive_timer, NChannel)}; {error, timeout} -> Req = #{type => 'KEEPALIVE'}, - NChannel = clean_timer(alive_timer, Channel), + NChannel = remove_timer_ref(alive_timer, Channel), %% close connection if keepalive timeout Replies = [{event, disconnected}, {close, keepalive_timeout}], {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} @@ -665,7 +665,7 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) -> ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> - StatVal = emqx_gateway_conn:keepalive_stats(recv_oct), + StatVal = emqx_gateway_conn:keepalive_stats(recv), Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). @@ -684,14 +684,14 @@ ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> - ensure_timer(Name, clean_timer(Name, Channel)). - -clean_timer(Name, Channel = #channel{timers = Timers}) -> - Channel#channel{timers = maps:remove(Name, Timers)}. + ensure_timer(Name, remove_timer_ref(Name, Channel)). cancel_timer(Name, Channel = #channel{timers = Timers}) -> emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), - clean_timer(Name, Channel). + remove_timer_ref(Name, Channel). + +remove_timer_ref(Name, Channel = #channel{timers = Timers}) -> + Channel#channel{timers = maps:remove(Name, Timers)}. interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) -> IdleTimeout; @@ -746,10 +746,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}. -default_conninfo(ConnInfo = #{peername := {PeerHost, PeerPort}}) -> +default_conninfo(ConnInfo) -> ConnInfo#{ clean_start => true, - clientid => anonymous_clientid(PeerHost, PeerPort), + clientid => anonymous_clientid(), username => undefined, conn_props => #{}, connected => true, @@ -790,14 +790,5 @@ proto_name_to_protocol(<<>>) -> proto_name_to_protocol(ProtoName) when is_binary(ProtoName) -> binary_to_atom(ProtoName). -anonymous_clientid(PeerHost, PeerPort) -> - iolist_to_binary( - [ - "exproto-anonymous-", - inet:ntoa(PeerHost), - "-", - integer_to_list(PeerPort), - "-", - emqx_misc:gen_id() - ] - ). +anonymous_clientid() -> + iolist_to_binary(["exproto-", emqx_misc:gen_id()]). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl index 45b502798..cf8ed76a7 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gcli.erl @@ -56,6 +56,7 @@ start_link(Pool, Id) -> [] ). +-spec async_call(atom(), map(), map()) -> ok. async_call( FunName, Req = #{conn := Conn}, @@ -63,17 +64,11 @@ async_call( ) -> case pick(PoolName, Conn) of false -> - ?SLOG( - error, - #{ - msg => "no_available_grpc_client", - function => FunName, - request => Req - } - ); + reply(self(), FunName, {error, no_available_grpc_client}); Pid when is_pid(Pid) -> cast(Pid, {rpc, FunName, Req, Options, self()}) - end. + end, + ok. %%-------------------------------------------------------------------- %% cast, pick