refactor(exhook): add mechanism to reload the failure server

This commit is contained in:
JianBo He 2021-08-10 16:37:31 +08:00
parent 60e830fef7
commit 22f7b0b8e5
5 changed files with 236 additions and 52 deletions

View File

@ -2,8 +2,31 @@
## EMQ X Hooks
##====================================================================
## The default value or action will be returned, while the request to
## the gRPC server failed or no available grpc server running.
##
## Default: ignore
## Value: ignore | deny
#exhook.request_failed_action = ignore
## The timeout to request grpc server
##
## Default: 5s
## Value: Duration
#exhook.request_timeout = 5s
## Whether to automatically reconnect (initialize) the gRPC server
##
## When gRPC is not available, exhook tries to request the gRPC service at
## that interval and reinitialize the list of mounted hooks.
##
## Default: false
## Value: false | Duration
#exhook.auto_reconnect = 60s
##--------------------------------------------------------------------
## Server Address
## The Hook callback servers
## The gRPC server url
##

View File

@ -1,5 +1,31 @@
%%-*- mode: erlang -*-
{mapping, "exhook.request_failed_action", "emqx_exhook.request_failed_action", [
{default, "ignore"},
{datatype, {enum, [ignore, deny]}}
]}.
{mapping, "exhook.request_timeout", "emqx_exhook.request_timeout", [
{default, "5s"},
{datatype, {duration, ms}}
]}.
{mapping, "exhook.auto_reconnect", "emqx_exhook.auto_reconnect", [
{default, "60s"},
{datatype, string}
]}.
{translation, "emqx_exhook.auto_reconnect", fun(Conf) ->
case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of
"false" -> false;
Dur ->
case cuttlefish_duration:parse(Dur, ms) of
Ms when is_integer(Ms) -> Ms;
{error, Reason} -> error(Reason)
end
end
end}.
{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [
{datatype, string}
]}.

View File

@ -22,73 +22,23 @@
-emqx_plugin(extension).
-define(CNTER, emqx_exhook_counter).
-export([ start/2
, stop/1
, prep_stop/1
]).
%% Internal export
-export([ load_server/2
, unload_server/1
, unload_exhooks/0
, init_hooks_cnter/0
]).
%%--------------------------------------------------------------------
%% Application callbacks
%%--------------------------------------------------------------------
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_exhook_sup:start_link(),
%% Init counter
init_hooks_cnter(),
%% Load all dirvers
load_all_servers(),
%% Register CLI
emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []),
{ok, Sup}.
prep_stop(State) ->
emqx_ctl:unregister_command(exhook),
_ = unload_exhooks(),
ok = unload_all_servers(),
State.
stop(_State) ->
ok.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
load_all_servers() ->
lists:foreach(fun({Name, Options}) ->
load_server(Name, Options)
end, application:get_env(?APP, servers, [])).
unload_all_servers() ->
emqx_exhook:disable_all().
load_server(Name, Options) ->
emqx_exhook:enable(Name, Options).
unload_server(Name) ->
emqx_exhook:disable(Name).
unload_exhooks() ->
[emqx:unhook(Name, {M, F}) ||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
init_hooks_cnter() ->
try
_ = ets:new(?CNTER, [named_table, public]), ok
catch
error:badarg:_ ->
ok
end.

View File

@ -0,0 +1,171 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Manage the server status and reload strategy
-module(emqx_exhook_mngr).
-behaviour(gen_server).
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
%% APIs
-export([start_link/2]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {
%% Running servers
running :: map(),
%% Wait to reload servers
waiting :: map(),
%% Marked stopped servers
stopped :: map(),
%% Auto reconnect timer interval
auto_reconnect :: false | non_neg_integer(),
%% Timer references
trefs :: map()
}).
-type servers() :: [{Name :: atom(), server_options()}].
-type server_options() :: [ {scheme, http | https}
| {host, string()}
| {port, inet:port_number()}
].
-define(CNTER, emqx_exhook_counter).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec start_link(servers(), false | non_neg_integer())
->ignore
| {ok, pid()}
| {error, any()}.
start_link(Servers, AutoReconnect) ->
gen_server:start_link(?MODULE, [Servers, AutoReconnect], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Servers, AutoReconnect]) ->
%% XXX: Due to the ExHook Module in the enterprise,
%% this process may start multiple times and they will share this table
try
_ = ets:new(?CNTER, [named_table, public]), ok
catch
error:badarg:_ ->
ok
end,
%% Load the hook servers
{Waiting, Running} = load_all_servers(Servers),
{ok, ensure_reload_timer(
#state{waiting = Waiting,
running = Running,
stopped = #{},
auto_reconnect = AutoReconnect,
trefs = #{}
}
)}.
%% @private
load_all_servers(Servers) ->
load_all_servers(Servers, #{}, #{}).
load_all_servers([], Waiting, Running) ->
{Waiting, Running};
load_all_servers([{Name, Options}|More], Waiting, Running) ->
{NWaiting, NRunning} = case emqx_exhook:enable(Name, Options) of
ok ->
{Waiting, Running#{Name => Options}};
{error, _} ->
{Waiting#{Name => Options}, Running}
end,
load_all_servers(More, NWaiting, NRunning).
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, _Ref, {reload, Name}},
State0 = #state{waiting = Waiting,
running = Running,
trefs = TRefs}) ->
State = State0#state{trefs = maps:remove(Name, TRefs)},
case maps:get(Name, Waiting, undefined) of
undefined ->
{noreply, State};
Options ->
case emqx_exhook:enable(Name, Options) of
ok ->
?LOG(warning, "Reconnect to exhook callback server "
"\"~s\" successfully!", [Name]),
{noreply, State#state{
running = maps:put(Name, Options, Running),
waiting = maps:remove(Name, Waiting)}
};
{error, _} ->
{noreply, ensure_reload_timer(State)}
end
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
_ = emqx_exhook:disable_all(),
_ = unload_exhooks(),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
unload_exhooks() ->
[emqx:unhook(Name, {M, F}) ||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
ensure_reload_timer(State = #state{auto_reconnect = false}) ->
State;
ensure_reload_timer(State = #state{waiting = Waiting,
trefs = TRefs,
auto_reconnect = Intv}) ->
NRefs = maps:fold(fun(Name, _, AccIn) ->
case maps:get(Name, AccIn, undefined) of
undefined ->
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
AccIn#{Name => Ref};
_HasRef ->
AccIn
end
end, TRefs, Waiting),
State#state{trefs = NRefs}.

View File

@ -26,6 +26,13 @@
, stop_grpc_client_channel/1
]).
-define(CHILD(Mod, Type, Args),
#{ id => Mod
, start => {Mod, start_link, Args}
, type => Type
}
).
%%--------------------------------------------------------------------
%% Supervisor APIs & Callbacks
%%--------------------------------------------------------------------
@ -34,7 +41,14 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {{one_for_one, 10, 100}, []}}.
Mngr = ?CHILD(emqx_exhook_mngr, worker, [servers(), auto_reconnect()]),
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
servers() ->
application:get_env(emqx_exhook, servers, []).
auto_reconnect() ->
application:get_env(emqx_exhook, auto_reconnect, 60000).
%%--------------------------------------------------------------------
%% APIs