diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 8a031efa8..2c7fce55f 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -18,6 +18,8 @@ File format: - Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671) - Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597) - Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737) +- Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725) +- Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8725](https://github.com/emqx/emqx/pull/8725) - Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736) ### Bug fixes @@ -28,8 +30,10 @@ File format: The `foo` variable is a null value, so `clientid != foo` should be evaluated as true. - Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655) - Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628) +- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](//github.com/emqx/emqx/pull/8725) - Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752) + ## v4.3.18 ### Enhancements diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index cf39d9e42..d0067742a 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.3.6"}, + {vsn, "4.3.7"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index f286255f0..eada3146e 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,37 +1,30 @@ %% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [ - {"4.3.5", [ - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} - ]}, - {"4.3.4", [ - {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_handler, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook, brutal_purge, soft_purge, []}, - {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} - ]}, - {<<"4\\.3\\.[0-3]">>, [ - {restart_application, emqx_exhook} - ]}, - {<<".*">>, []} - ], - [ - {"4.3.5", [ - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} - ]}, - {"4.3.4", [ - {load_module, emqx_exhook_sup, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_handler, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, - {load_module, emqx_exhook, brutal_purge, soft_purge, []}, - {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} - ]}, - {<<"4\\.3\\.[0-3]">>, [ - {restart_application, emqx_exhook} - ]}, - {<<".*">>, []} - ] -}. + [{<<"4\\.3\\.[5-6]">>, + [{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, + {"4.3.4", + [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, + {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, + {<<".*">>,[]}], + [{<<"4\\.3\\.[5-6]">>, + [{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, + {"4.3.5", + [{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]}, + {"4.3.4", + [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, + {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, + {<<".*">>,[]}]}. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index bd8b88e0a..f4296a676 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -192,6 +192,17 @@ handle_info({timeout, _Ref, {reload, Name}}, State) -> {noreply, NState}; {error, not_found} -> {noreply, NState}; + {error, {already_started, Pid}} -> + ?LOG(warning, "Server ~s already started on ~p, try to restart it", [Name, Pid]), + case server(Name) of + undefined -> + %% force close grpc client pool + grpc_client_sup:stop_channel_pool(Name); + ServerState -> + emqx_exhook_server:unload(ServerState) + end, + %% try again immediately + handle_info({timeout, _Ref, {reload, Name}}, State); {error, Reason} -> ?LOG(warning, "Failed to reload exhook callback server \"~s\", " "Reason: ~0p", [Name, Reason]), @@ -202,11 +213,11 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State = #state{running = Running}) -> + _ = unload_exhooks(), _ = maps:fold(fun(Name, _, AccIn) -> {ok, NAccIn} = do_unload_server(Name, AccIn), NAccIn end, State, Running), - _ = unload_exhooks(), ok. %% in the emqx_exhook:v4.3.5, we have added one new field in the state last: @@ -318,7 +329,7 @@ get_request_failed_action() -> save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), - persistent_term:put(?APP, lists:reverse([Name | Saved])), + persistent_term:put(?APP, lists:usort(lists:reverse([Name | Saved]))), persistent_term:put({?APP, Name}, ServerState). unsave(Name) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 93568783a..5ce602535 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -146,13 +146,15 @@ format_http_uri(Scheme, Host0, Port) -> -spec unload(server()) -> ok. unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) -> - _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), + _ = do_deinit(Name, ReqOpts), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. do_deinit(Name, ReqOpts) -> - _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts), + %% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mngr + %% force killed by upper supervisor + _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index 8e0380089..20939ae63 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -2,9 +2,11 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.9", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-8]">>, - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, + [{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, @@ -13,9 +15,11 @@ {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.9", - [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[2-8]">>, - [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, + [{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]}, + {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<"4\\.3\\.[0-1]">>, [{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 765dcf53e..cf6229be3 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -151,7 +151,7 @@ init(ConnInfo = #{socktype := Socktype, peercert := Peercert}, Options) -> GRpcChann = proplists:get_value(handler, Options), NConnInfo = default_conninfo(ConnInfo), - ClientInfo = default_clientinfo(ConnInfo), + ClientInfo = default_clientinfo(NConnInfo), IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), @@ -275,7 +275,9 @@ handle_timeout(_TRef, {keepalive, StatVal}, {error, timeout} -> Req = #{type => 'KEEPALIVE'}, NChannel = clean_timer(alive_timer, Channel), - {ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} + %% close connection if keepalive timeout + Replies = [{event, disconnected}, {close, normal}], + {ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)} end; handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> @@ -381,7 +383,7 @@ handle_call({publish, Topic, Qos, Payload}, end; handle_call(kick, Channel) -> - {shutdown, kicked, ok, Channel}; + {reply, ok, [{event, disconnected}, {close, kicked}], Channel}; handle_call(discard, Channel) -> {shutdown, discarded, ok, Channel}; @@ -633,9 +635,10 @@ enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), NClientInfo#{protocol => ProtoName}. -default_conninfo(ConnInfo) -> +default_conninfo(ConnInfo = + #{peername := {PeerHost, PeerPort}}) -> ConnInfo#{clean_start => true, - clientid => undefined, + clientid => anonymous_clientid(PeerHost, PeerPort), username => undefined, conn_props => #{}, connected => true, @@ -646,13 +649,15 @@ default_conninfo(ConnInfo) -> receive_maximum => 0, expiry_interval => 0}. -default_clientinfo(#{peername := {PeerHost, PeerPort}, - sockname := {_, SockPort}}) -> +default_clientinfo(#{peername := {PeerHost, _}, + sockname := {_, SockPort}, + clientid := ClientId + }) -> #{zone => external, protocol => exproto, peerhost => PeerHost, sockport => SockPort, - clientid => anonymous_clientid(PeerHost, PeerPort), + clientid => ClientId, username => undefined, is_bridge => false, is_superuser => false, diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl index b7ef20c13..f7fa6ba23 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -54,7 +54,13 @@ start_link(Pool, Id) -> ?MODULE, [Pool, Id], []). async_call(FunName, Req = #{conn := Conn}, Options) -> - cast(pick(Conn), {rpc, FunName, Req, Options, self()}). + case pick(Conn) of + false -> + ?LOG(error, "No available grpc client for ~s: ~p", + [FunName, Req]); + Pid when is_pid(Pid) -> + cast(Pid, {rpc, FunName, Req, Options, self()}) + end. %%-------------------------------------------------------------------- %% cast, pick @@ -65,6 +71,7 @@ async_call(FunName, Req = #{conn := Conn}, Options) -> cast(Deliver, Msg) -> gen_server:cast(Deliver, Msg). +-spec pick(term()) -> pid() | false. pick(Conn) -> gproc_pool:pick_worker(exproto_gcli_pool, Conn). diff --git a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl index a81cbe253..d4af25d54 100644 --- a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl @@ -213,11 +213,13 @@ t_keepalive_timeout(Cfg) -> send(Sock, ConnBin), {ok, ConnAckBin} = recv(Sock, 5000), - DisconnectBin = frame_disconnect(), - {ok, DisconnectBin} = recv(Sock, 10000), + %% Timed out connections are closed immediately, + %% so there may not be a disconnect message here + %%DisconnectBin = frame_disconnect(), + %%{ok, DisconnectBin} = recv(Sock, 10000), SockType =/= udp andalso begin - {error, closed} = recv(Sock, 5000) + {error, closed} = recv(Sock, 10000) end, ok. t_hook_connected_disconnected(Cfg) -> diff --git a/rebar.config b/rebar.config index 799631dc7..074949bbf 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} ]}. {xref_ignores,