From 58db1eb5a9379dfa5a18f2c29eb4e7f74ca8a337 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Aug 2022 14:41:47 +0800 Subject: [PATCH 1/9] fix(exhook): avoid emqx_exhook_mgnr to force killed due to exceed supervior shutdown timeout --- apps/emqx_exhook/src/emqx_exhook_mngr.erl | 15 +++++++++++++-- apps/emqx_exhook/src/emqx_exhook_server.erl | 6 ++++-- apps/emqx_exproto/src/emqx_exproto_gcli.erl | 9 ++++++++- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index bd8b88e0a..f4296a676 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -192,6 +192,17 @@ handle_info({timeout, _Ref, {reload, Name}}, State) -> {noreply, NState}; {error, not_found} -> {noreply, NState}; + {error, {already_started, Pid}} -> + ?LOG(warning, "Server ~s already started on ~p, try to restart it", [Name, Pid]), + case server(Name) of + undefined -> + %% force close grpc client pool + grpc_client_sup:stop_channel_pool(Name); + ServerState -> + emqx_exhook_server:unload(ServerState) + end, + %% try again immediately + handle_info({timeout, _Ref, {reload, Name}}, State); {error, Reason} -> ?LOG(warning, "Failed to reload exhook callback server \"~s\", " "Reason: ~0p", [Name, Reason]), @@ -202,11 +213,11 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State = #state{running = Running}) -> + _ = unload_exhooks(), _ = maps:fold(fun(Name, _, AccIn) -> {ok, NAccIn} = do_unload_server(Name, AccIn), NAccIn end, State, Running), - _ = unload_exhooks(), ok. %% in the emqx_exhook:v4.3.5, we have added one new field in the state last: @@ -318,7 +329,7 @@ get_request_failed_action() -> save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), - persistent_term:put(?APP, lists:reverse([Name | Saved])), + persistent_term:put(?APP, lists:usort(lists:reverse([Name | Saved]))), persistent_term:put({?APP, Name}, ServerState). unsave(Name) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 93568783a..5ce602535 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -146,13 +146,15 @@ format_http_uri(Scheme, Host0, Port) -> -spec unload(server()) -> ok. unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) -> - _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), + _ = do_deinit(Name, ReqOpts), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. do_deinit(Name, ReqOpts) -> - _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts), + %% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mngr + %% force killed by upper supervisor + _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl index b7ef20c13..f7fa6ba23 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -54,7 +54,13 @@ start_link(Pool, Id) -> ?MODULE, [Pool, Id], []). async_call(FunName, Req = #{conn := Conn}, Options) -> - cast(pick(Conn), {rpc, FunName, Req, Options, self()}). + case pick(Conn) of + false -> + ?LOG(error, "No available grpc client for ~s: ~p", + [FunName, Req]); + Pid when is_pid(Pid) -> + cast(Pid, {rpc, FunName, Req, Options, self()}) + end. %%-------------------------------------------------------------------- %% cast, pick @@ -65,6 +71,7 @@ async_call(FunName, Req = #{conn := Conn}, Options) -> cast(Deliver, Msg) -> gen_server:cast(Deliver, Msg). +-spec pick(term()) -> pid() | false. pick(Conn) -> gproc_pool:pick_worker(exproto_gcli_pool, Conn). From 515fd014d3b68d82991b89836ac9aca390bf4719 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Aug 2022 13:59:16 +0800 Subject: [PATCH 2/9] fix(exproto): fix undefined clientid in client.connect hook --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 765dcf53e..30f283d9d 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -151,7 +151,7 @@ init(ConnInfo = #{socktype := Socktype, peercert := Peercert}, Options) -> GRpcChann = proplists:get_value(handler, Options), NConnInfo = default_conninfo(ConnInfo), - ClientInfo = default_clientinfo(ConnInfo), + ClientInfo = default_clientinfo(NConnInfo), IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), @@ -633,9 +633,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), NClientInfo#{protocol => ProtoName}. -default_conninfo(ConnInfo) -> +default_conninfo(ConnInfo = + #{peername := {PeerHost, PeerPort}}) -> ConnInfo#{clean_start => true, - clientid => undefined, + clientid => anonymous_clientid(PeerHost, PeerPort), username => undefined, conn_props => #{}, connected => true, @@ -646,13 +647,15 @@ default_conninfo(ConnInfo) -> receive_maximum => 0, expiry_interval => 0}. -default_clientinfo(#{peername := {PeerHost, PeerPort}, - sockname := {_, SockPort}}) -> +default_clientinfo(#{peername := {PeerHost, _}, + sockname := {_, SockPort}, + clientid := ClientId + }) -> #{zone => external, protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => anonymous_clientid(PeerHost, PeerPort), + clientid => ClientId, username => undefined, is_bridge => false, is_superuser => false, From 7d3ea85ef3a0762d897080bc1a10def722124057 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Aug 2022 11:18:45 +0800 Subject: [PATCH 3/9] fix(exproto): produce disconnected event if kicked --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 30f283d9d..50dd203b6 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -381,7 +381,7 @@ handle_call({publish, Topic, Qos, Payload}, end; handle_call(kick, Channel) -> - {shutdown, kicked, ok, Channel}; + {reply, ok, [{event, disconnected}, {close, kicked}], Channel}; handle_call(discard, Channel) -> {shutdown, discarded, ok, Channel}; From 8186e9e47a62d9350d054cd78d46a778c6c46225 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 9 Aug 2022 11:04:51 +0800 Subject: [PATCH 4/9] chore: close keepalive timeout channel --- apps/emqx_exproto/src/emqx_exproto_channel.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 50dd203b6..cf6229be3 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -275,7 +275,9 @@ handle_timeout(_TRef, {keepalive, StatVal}, {error, timeout} -> Req = #{type => 'KEEPALIVE'}, NChannel = clean_timer(alive_timer, Channel), - {ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} + %% close connection if keepalive timeout + Replies = [{event, disconnected}, {close, normal}], + {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> From 5ba048f78729762a0909a33e12ed1917cbe8241e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Aug 2022 15:01:43 +0800 Subject: [PATCH 5/9] chore: upgrade grpc-erl to 0.6.7 --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 799631dc7..074949bbf 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} ]}. {xref_ignores, From 5e505fa41c997d0d8ddf07f05218ba8aefbc4d41 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Aug 2022 15:23:47 +0800 Subject: [PATCH 6/9] chore: update appup.src --- apps/emqx_exhook/src/emqx_exhook.app.src | 2 +- apps/emqx_exhook/src/emqx_exhook.appup.src | 11 +++++++++-- apps/emqx_exproto/src/emqx_exproto.appup.src | 6 ++++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index cf39d9e42..d0067742a 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.3.6"}, + {vsn, "4.3.7"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index f286255f0..d5a848fb1 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,8 +1,10 @@ %% -*- mode: erlang -*- {VSN, [ - {"4.3.5", [ - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} + {<<"4\\.3\\.[5-6]">>, [ + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_mngr, brutal_purge, soft_purge, []} + ]}, {"4.3.4", [ {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, @@ -18,6 +20,11 @@ {<<".*">>, []} ], [ + {<<"4\\.3\\.[5-6]">>, [ + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_mngr, brutal_purge, soft_purge, []} + + ]}, {"4.3.5", [ {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} ]}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 8e0380089..d7414e53e 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.9", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-8]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, @@ -13,7 +14,8 @@ {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.9", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-8]">>, [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, From a66dc7c02b98e7fcfba2bffccdc71c08683c287a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 15 Aug 2022 15:14:19 +0800 Subject: [PATCH 7/9] chore: update changes-4.3 --- CHANGES-4.3.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 8a031efa8..2c7fce55f 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -18,6 +18,8 @@ File format: - Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671) - Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597) - Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737) +- Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725) +- Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8725](https://github.com/emqx/emqx/pull/8725) - Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736) ### Bug fixes @@ -28,8 +30,10 @@ File format: The `foo` variable is a null value, so `clientid != foo` should be evaluated as true. - Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655) - Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628) +- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](//github.com/emqx/emqx/pull/8725) - Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752) + ## v4.3.18 ### Enhancements From f4ad7acd06d75519db90b59fbf469763fcef0497 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 17 Aug 2022 18:32:44 +0800 Subject: [PATCH 8/9] chore: update appup.src --- apps/emqx_exhook/src/emqx_exhook.appup.src | 70 ++++++++------------ apps/emqx_exproto/src/emqx_exproto.appup.src | 6 +- 2 files changed, 32 insertions(+), 44 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index d5a848fb1..eada3146e 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,44 +1,30 @@ %% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [ - {<<"4\\.3\\.[5-6]">>, [ - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_mngr, brutal_purge, soft_purge, []} - - ]}, - {"4.3.4", [ - {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_handler, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook, brutal_purge, soft_purge, []}, - {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} - ]}, - {<<"4\\.3\\.[0-3]">>, [ - {restart_application, emqx_exhook} - ]}, - {<<".*">>, []} - ], - [ - {<<"4\\.3\\.[5-6]">>, [ - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_mngr, brutal_purge, soft_purge, []} - - ]}, - {"4.3.5", [ - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} - ]}, - {"4.3.4", [ - {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_handler, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook, brutal_purge, soft_purge, []}, - {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} - ]}, - {<<"4\\.3\\.[0-3]">>, [ - {restart_application, emqx_exhook} - ]}, - {<<".*">>, []} - ] -}. + [{<<"4\\.3\\.[5-6]">>, + [{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, + {"4.3.4", + [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, + {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, + {<<".*">>,[]}], + [{<<"4\\.3\\.[5-6]">>, + [{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]}, + {"4.3.4", + [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, + {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, + {<<".*">>,[]}]}. diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index d7414e53e..20939ae63 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -5,7 +5,8 @@ [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-8]">>, - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, + [{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, @@ -17,7 +18,8 @@ [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-8]">>, - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, + [{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, From efdde6e1078dba033e8f28fad981fc9bc6c41376 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 19 Aug 2022 18:13:59 +0800 Subject: [PATCH 9/9] test(exproto): fix timeout cases --- apps/emqx_exproto/test/emqx_exproto_SUITE.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl index a81cbe253..d4af25d54 100644 --- a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl @@ -213,11 +213,13 @@ t_keepalive_timeout(Cfg) -> send(Sock, ConnBin), {ok, ConnAckBin} = recv(Sock, 5000), - DisconnectBin = frame_disconnect(), - {ok, DisconnectBin} = recv(Sock, 10000), + %% Timed out connections are closed immediately, + %% so there may not be a disconnect message here + %%DisconnectBin = frame_disconnect(), + %%{ok, DisconnectBin} = recv(Sock, 10000), SockType =/= udp andalso begin - {error, closed} = recv(Sock, 5000) + {error, closed} = recv(Sock, 10000) end, ok. t_hook_connected_disconnected(Cfg) ->