diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index bd8b88e0a..f4296a676 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -192,6 +192,17 @@ handle_info({timeout, _Ref, {reload, Name}}, State) -> {noreply, NState}; {error, not_found} -> {noreply, NState}; + {error, {already_started, Pid}} -> + ?LOG(warning, "Server ~s already started on ~p, try to restart it", [Name, Pid]), + case server(Name) of + undefined -> + %% force close grpc client pool + grpc_client_sup:stop_channel_pool(Name); + ServerState -> + emqx_exhook_server:unload(ServerState) + end, + %% try again immediately + handle_info({timeout, _Ref, {reload, Name}}, State); {error, Reason} -> ?LOG(warning, "Failed to reload exhook callback server \"~s\", " "Reason: ~0p", [Name, Reason]), @@ -202,11 +213,11 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State = #state{running = Running}) -> + _ = unload_exhooks(), _ = maps:fold(fun(Name, _, AccIn) -> {ok, NAccIn} = do_unload_server(Name, AccIn), NAccIn end, State, Running), - _ = unload_exhooks(), ok. %% in the emqx_exhook:v4.3.5, we have added one new field in the state last: @@ -318,7 +329,7 @@ get_request_failed_action() -> save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), - persistent_term:put(?APP, lists:reverse([Name | Saved])), + persistent_term:put(?APP, lists:usort(lists:reverse([Name | Saved]))), persistent_term:put({?APP, Name}, ServerState). unsave(Name) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 93568783a..5ce602535 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -146,13 +146,15 @@ format_http_uri(Scheme, Host0, Port) -> -spec unload(server()) -> ok. unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) -> - _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), + _ = do_deinit(Name, ReqOpts), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. do_deinit(Name, ReqOpts) -> - _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts), + %% Using shorter timeout to deinit grpc server to avoid emqx_exhook_mngr + %% force killed by upper supervisor + _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts#{timeout => 3000}), ok. do_init(ChannName, ReqOpts) -> diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl index b7ef20c13..f7fa6ba23 100644 --- a/apps/emqx_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -54,7 +54,13 @@ start_link(Pool, Id) -> ?MODULE, [Pool, Id], []). async_call(FunName, Req = #{conn := Conn}, Options) -> - cast(pick(Conn), {rpc, FunName, Req, Options, self()}). + case pick(Conn) of + false -> + ?LOG(error, "No available grpc client for ~s: ~p", + [FunName, Req]); + Pid when is_pid(Pid) -> + cast(Pid, {rpc, FunName, Req, Options, self()}) + end. %%-------------------------------------------------------------------- %% cast, pick @@ -65,6 +71,7 @@ async_call(FunName, Req = #{conn := Conn}, Options) -> cast(Deliver, Msg) -> gen_server:cast(Deliver, Msg). +-spec pick(term()) -> pid() | false. pick(Conn) -> gproc_pool:pick_worker(exproto_gcli_pool, Conn).