diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index f6f5213f7..de4096d45 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -2,8 +2,31 @@ ## EMQ X Hooks ##==================================================================== +## The default value or action will be returned, while the request to +## the gRPC server failed or no available grpc server running. +## +## Default: ignore +## Value: ignore | deny +#exhook.request_failed_action = ignore + +## The timeout to request grpc server +## +## Default: 5s +## Value: Duration +#exhook.request_timeout = 5s + +## Whether to automatically reconnect (initialize) the gRPC server +## +## When gRPC is not available, exhook tries to request the gRPC service at +## that interval and reinitialize the list of mounted hooks. +## +## Default: false +## Value: false | Duration +#exhook.auto_reconnect = 60s + + ##-------------------------------------------------------------------- -## Server Address +## The Hook callback servers ## The gRPC server url ## diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema index e5481a3dd..8f1775767 100644 --- a/apps/emqx_exhook/priv/emqx_exhook.schema +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -1,5 +1,31 @@ %%-*- mode: erlang -*- +{mapping, "exhook.request_failed_action", "emqx_exhook.request_failed_action", [ + {default, "ignore"}, + {datatype, {enum, [ignore, deny]}} +]}. + +{mapping, "exhook.request_timeout", "emqx_exhook.request_timeout", [ + {default, "5s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "exhook.auto_reconnect", "emqx_exhook.auto_reconnect", [ + {default, "60s"}, + {datatype, string} +]}. + +{translation, "emqx_exhook.auto_reconnect", fun(Conf) -> + case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of + "false" -> false; + Dur -> + case cuttlefish_duration:parse(Dur, ms) of + Ms when is_integer(Ms) -> Ms; + {error, Reason} -> error(Reason) + end + end +end}. + {mapping, "exhook.server.$name.url", "emqx_exhook.servers", [ {datatype, string} ]}. diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl index 2988be6d2..c1fcdc6ab 100644 --- a/apps/emqx_exhook/src/emqx_exhook_app.erl +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -22,73 +22,23 @@ -emqx_plugin(extension). --define(CNTER, emqx_exhook_counter). - -export([ start/2 , stop/1 , prep_stop/1 ]). -%% Internal export --export([ load_server/2 - , unload_server/1 - , unload_exhooks/0 - , init_hooks_cnter/0 - ]). - %%-------------------------------------------------------------------- %% Application callbacks %%-------------------------------------------------------------------- start(_StartType, _StartArgs) -> {ok, Sup} = emqx_exhook_sup:start_link(), - - %% Init counter - init_hooks_cnter(), - - %% Load all dirvers - load_all_servers(), - - %% Register CLI emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), {ok, Sup}. prep_stop(State) -> emqx_ctl:unregister_command(exhook), - _ = unload_exhooks(), - ok = unload_all_servers(), State. stop(_State) -> ok. - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -load_all_servers() -> - lists:foreach(fun({Name, Options}) -> - load_server(Name, Options) - end, application:get_env(?APP, servers, [])). - -unload_all_servers() -> - emqx_exhook:disable_all(). - -load_server(Name, Options) -> - emqx_exhook:enable(Name, Options). - -unload_server(Name) -> - emqx_exhook:disable(Name). - -unload_exhooks() -> - [emqx:unhook(Name, {M, F}) || - {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. - -init_hooks_cnter() -> - try - _ = ets:new(?CNTER, [named_table, public]), ok - catch - error:badarg:_ -> - ok - end. - diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl new file mode 100644 index 000000000..8e3dfaf82 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -0,0 +1,171 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Manage the server status and reload strategy +-module(emqx_exhook_mngr). + +-behaviour(gen_server). + +-include("emqx_exhook.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% APIs +-export([start_link/2]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-record(state, { + %% Running servers + running :: map(), + %% Wait to reload servers + waiting :: map(), + %% Marked stopped servers + stopped :: map(), + %% Auto reconnect timer interval + auto_reconnect :: false | non_neg_integer(), + %% Timer references + trefs :: map() + }). + +-type servers() :: [{Name :: atom(), server_options()}]. + +-type server_options() :: [ {scheme, http | https} + | {host, string()} + | {port, inet:port_number()} + ]. + +-define(CNTER, emqx_exhook_counter). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec start_link(servers(), false | non_neg_integer()) + ->ignore + | {ok, pid()} + | {error, any()}. +start_link(Servers, AutoReconnect) -> + gen_server:start_link(?MODULE, [Servers, AutoReconnect], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Servers, AutoReconnect]) -> + %% XXX: Due to the ExHook Module in the enterprise, + %% this process may start multiple times and they will share this table + try + _ = ets:new(?CNTER, [named_table, public]), ok + catch + error:badarg:_ -> + ok + end, + + %% Load the hook servers + {Waiting, Running} = load_all_servers(Servers), + {ok, ensure_reload_timer( + #state{waiting = Waiting, + running = Running, + stopped = #{}, + auto_reconnect = AutoReconnect, + trefs = #{} + } + )}. + +%% @private +load_all_servers(Servers) -> + load_all_servers(Servers, #{}, #{}). +load_all_servers([], Waiting, Running) -> + {Waiting, Running}; +load_all_servers([{Name, Options}|More], Waiting, Running) -> + {NWaiting, NRunning} = case emqx_exhook:enable(Name, Options) of + ok -> + {Waiting, Running#{Name => Options}}; + {error, _} -> + {Waiting#{Name => Options}, Running} + end, + load_all_servers(More, NWaiting, NRunning). + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({timeout, _Ref, {reload, Name}}, + State0 = #state{waiting = Waiting, + running = Running, + trefs = TRefs}) -> + State = State0#state{trefs = maps:remove(Name, TRefs)}, + case maps:get(Name, Waiting, undefined) of + undefined -> + {noreply, State}; + Options -> + case emqx_exhook:enable(Name, Options) of + ok -> + ?LOG(warning, "Reconnect to exhook callback server " + "\"~s\" successfully!", [Name]), + {noreply, State#state{ + running = maps:put(Name, Options, Running), + waiting = maps:remove(Name, Waiting)} + }; + {error, _} -> + {noreply, ensure_reload_timer(State)} + end + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + _ = emqx_exhook:disable_all(), + _ = unload_exhooks(), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +unload_exhooks() -> + [emqx:unhook(Name, {M, F}) || + {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. + +ensure_reload_timer(State = #state{auto_reconnect = false}) -> + State; +ensure_reload_timer(State = #state{waiting = Waiting, + trefs = TRefs, + auto_reconnect = Intv}) -> + NRefs = maps:fold(fun(Name, _, AccIn) -> + case maps:get(Name, AccIn, undefined) of + undefined -> + Ref = erlang:start_timer(Intv, self(), {reload, Name}), + AccIn#{Name => Ref}; + _HasRef -> + AccIn + end + end, TRefs, Waiting), + State#state{trefs = NRefs}. diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index c3ca811bd..96509181a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -26,6 +26,13 @@ , stop_grpc_client_channel/1 ]). +-define(CHILD(Mod, Type, Args), + #{ id => Mod + , start => {Mod, start_link, Args} + , type => Type + } + ). + %%-------------------------------------------------------------------- %% Supervisor APIs & Callbacks %%-------------------------------------------------------------------- @@ -34,7 +41,14 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. + Mngr = ?CHILD(emqx_exhook_mngr, worker, [servers(), auto_reconnect()]), + {ok, {{one_for_one, 10, 100}, [Mngr]}}. + +servers() -> + application:get_env(emqx_exhook, servers, []). + +auto_reconnect() -> + application:get_env(emqx_exhook, auto_reconnect, 60000). %%-------------------------------------------------------------------- %% APIs