From 7e5be6ed6c62b8de39518bb910ab0415c371e7d2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 10 Aug 2021 16:37:31 +0800 Subject: [PATCH] refactor(exhook): add mechanism to reload the failure server --- apps/emqx_exhook/etc/emqx_exhook.conf | 16 -- apps/emqx_exhook/src/emqx_exhook_app.erl | 51 ------- apps/emqx_exhook/src/emqx_exhook_mngr.erl | 171 ++++++++++++++++++++++ apps/emqx_exhook/src/emqx_exhook_sup.erl | 16 +- 4 files changed, 186 insertions(+), 68 deletions(-) delete mode 100644 apps/emqx_exhook/etc/emqx_exhook.conf create mode 100644 apps/emqx_exhook/src/emqx_exhook_mngr.erl diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf deleted file mode 100644 index 648eb554f..000000000 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ /dev/null @@ -1,16 +0,0 @@ -##==================================================================== -## EMQ X Hooks -##==================================================================== - -exhook: { - servers: [ - # { name: "default" - # url: "http://127.0.0.1:9000" - # #ssl: { - # # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" - # # certfile: "{{ platform_etc_dir }}/certs/cert.pem" - # # keyfile: "{{ platform_etc_dir }}/certs/key.pem" - # #} - # } - ] -} diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl index c97b26677..80dc24b70 100644 --- a/apps/emqx_exhook/src/emqx_exhook_app.erl +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -20,41 +20,22 @@ -include("emqx_exhook.hrl"). --define(CNTER, emqx_exhook_counter). - -export([ start/2 , stop/1 , prep_stop/1 ]). -%% Internal export --export([ load_server/2 - , unload_server/1 - , unload_exhooks/0 - , init_hooks_cnter/0 - ]). - %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- start(_StartType, _StartArgs) -> {ok, Sup} = emqx_exhook_sup:start_link(), - - %% Init counter - init_hooks_cnter(), - - %% Load all dirvers - load_all_servers(), - - %% Register CLI emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), {ok, Sup}. prep_stop(State) -> emqx_ctl:unregister_command(exhook), - _ = unload_exhooks(), - ok = unload_all_servers(), State. stop(_State) -> @@ -63,35 +44,3 @@ stop(_State) -> %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- - -load_all_servers() -> - try - lists:foreach(fun(#{name := Name} = Options) -> - load_server(Name, maps:remove(name, Options)) - end, emqx_config:get([exhook, servers])) - catch - _Class : _Reason -> - ok - end, ok. - -unload_all_servers() -> - emqx_exhook:disable_all(). - -load_server(Name, Options) -> - emqx_exhook:enable(Name, Options). - -unload_server(Name) -> - emqx_exhook:disable(Name). - -unload_exhooks() -> - [emqx:unhook(Name, {M, F}) || - {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. - -init_hooks_cnter() -> - try - _ = ets:new(?CNTER, [named_table, public]), ok - catch - error:badarg:_ -> - ok - end. - diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl new file mode 100644 index 000000000..8e3dfaf82 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -0,0 +1,171 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Manage the server status and reload strategy +-module(emqx_exhook_mngr). + +-behaviour(gen_server). + +-include("emqx_exhook.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% APIs +-export([start_link/2]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(state, { + %% Running servers + running :: map(), + %% Wait to reload servers + waiting :: map(), + %% Marked stopped servers + stopped :: map(), + %% Auto reconnect timer interval + auto_reconnect :: false | non_neg_integer(), + %% Timer references + trefs :: map() + }). + +-type servers() :: [{Name :: atom(), server_options()}]. + +-type server_options() :: [ {scheme, http | https} + | {host, string()} + | {port, inet:port_number()} + ]. + +-define(CNTER, emqx_exhook_counter). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec start_link(servers(), false | non_neg_integer()) + ->ignore + | {ok, pid()} + | {error, any()}. +start_link(Servers, AutoReconnect) -> + gen_server:start_link(?MODULE, [Servers, AutoReconnect], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Servers, AutoReconnect]) -> + %% XXX: Due to the ExHook Module in the enterprise, + %% this process may start multiple times and they will share this table + try + _ = ets:new(?CNTER, [named_table, public]), ok + catch + error:badarg:_ -> + ok + end, + + %% Load the hook servers + {Waiting, Running} = load_all_servers(Servers), + {ok, ensure_reload_timer( + #state{waiting = Waiting, + running = Running, + stopped = #{}, + auto_reconnect = AutoReconnect, + trefs = #{} + } + )}. + +%% @private +load_all_servers(Servers) -> + load_all_servers(Servers, #{}, #{}). +load_all_servers([], Waiting, Running) -> + {Waiting, Running}; +load_all_servers([{Name, Options}|More], Waiting, Running) -> + {NWaiting, NRunning} = case emqx_exhook:enable(Name, Options) of + ok -> + {Waiting, Running#{Name => Options}}; + {error, _} -> + {Waiting#{Name => Options}, Running} + end, + load_all_servers(More, NWaiting, NRunning). + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({timeout, _Ref, {reload, Name}}, + State0 = #state{waiting = Waiting, + running = Running, + trefs = TRefs}) -> + State = State0#state{trefs = maps:remove(Name, TRefs)}, + case maps:get(Name, Waiting, undefined) of + undefined -> + {noreply, State}; + Options -> + case emqx_exhook:enable(Name, Options) of + ok -> + ?LOG(warning, "Reconnect to exhook callback server " + "\"~s\" successfully!", [Name]), + {noreply, State#state{ + running = maps:put(Name, Options, Running), + waiting = maps:remove(Name, Waiting)} + }; + {error, _} -> + {noreply, ensure_reload_timer(State)} + end + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + _ = emqx_exhook:disable_all(), + _ = unload_exhooks(), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +unload_exhooks() -> + [emqx:unhook(Name, {M, F}) || + {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. + +ensure_reload_timer(State = #state{auto_reconnect = false}) -> + State; +ensure_reload_timer(State = #state{waiting = Waiting, + trefs = TRefs, + auto_reconnect = Intv}) -> + NRefs = maps:fold(fun(Name, _, AccIn) -> + case maps:get(Name, AccIn, undefined) of + undefined -> + Ref = erlang:start_timer(Intv, self(), {reload, Name}), + AccIn#{Name => Ref}; + _HasRef -> + AccIn + end + end, TRefs, Waiting), + State#state{trefs = NRefs}. diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index c3ca811bd..96509181a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -26,6 +26,13 @@ , stop_grpc_client_channel/1 ]). +-define(CHILD(Mod, Type, Args), + #{ id => Mod + , start => {Mod, start_link, Args} + , type => Type + } + ). + %%-------------------------------------------------------------------- %% Supervisor APIs & Callbacks %%-------------------------------------------------------------------- @@ -34,7 +41,14 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. + Mngr = ?CHILD(emqx_exhook_mngr, worker, [servers(), auto_reconnect()]), + {ok, {{one_for_one, 10, 100}, [Mngr]}}. + +servers() -> + application:get_env(emqx_exhook, servers, []). + +auto_reconnect() -> + application:get_env(emqx_exhook, auto_reconnect, 60000). %%-------------------------------------------------------------------- %% APIs