refactor(exhook): move all manager code into mngr module

This commit is contained in:
JianBo He 2021-08-11 17:31:21 +08:00
parent 22f7b0b8e5
commit b3db4d0f7c
5 changed files with 230 additions and 141 deletions

View File

@ -21,10 +21,8 @@
-logger_header("[ExHook]").
%% Mgmt APIs
-export([ enable/2
-export([ enable/1
, disable/1
, disable_all/0
, list/0
]).
@ -36,64 +34,54 @@
%% Mgmt APIs
%%--------------------------------------------------------------------
%% XXX: Only return the running servers
-spec list() -> [emqx_exhook_server:server()].
list() ->
[server(Name) || Name <- running()].
-spec enable(atom()|string(), list()) -> ok | {error, term()}.
enable(Name, Opts) ->
case lists:member(Name, running()) of
true ->
{error, already_started};
_ ->
case emqx_exhook_server:load(Name, Opts) of
{ok, ServiceState} ->
save(Name, ServiceState);
{error, Reason} ->
?LOG(error, "Load server ~p failed: ~p", [Name, Reason]),
{error, Reason}
end
end.
-spec enable(atom()|string()) -> ok | {error, term()}.
enable(Name) ->
with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end).
-spec disable(atom()|string()) -> ok | {error, term()}.
disable(Name) ->
case server(Name) of
undefined -> {error, not_running};
Service ->
ok = emqx_exhook_server:unload(Service),
unsave(Name)
with_mngr(fun(Pid) -> emqx_exhook_mngr:disable(Pid, Name) end).
-spec list() -> [atom() | string()].
list() ->
with_mngr(fun(Pid) -> emqx_exhook_mngr:list(Pid) end).
with_mngr(Fun) ->
case lists:keyfind(emqx_exhook_mngr, 1,
supervisor:which_children(emqx_exhook_sup)) of
{_, Pid, _, _} ->
Fun(Pid);
_ ->
{error, no_manager_svr}
end.
-spec disable_all() -> ok.
disable_all() ->
lists:foreach(fun disable/1, running()).
%%----------------------------------------------------------
%%--------------------------------------------------------------------
%% Dispatch APIs
%%----------------------------------------------------------
%%--------------------------------------------------------------------
-spec cast(atom(), map()) -> ok.
cast(Hookpoint, Req) ->
cast(Hookpoint, Req, running()).
cast(Hookpoint, Req, emqx_exhook_mngr:running()).
cast(_, _, []) ->
ok;
cast(Hookpoint, Req, [ServiceName|More]) ->
%% XXX: Need a real asynchronous running
_ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)),
_ = emqx_exhook_server:call(Hookpoint, Req,
emqx_exhook_mngr:server(ServiceName)),
cast(Hookpoint, Req, More).
-spec call_fold(atom(), term(), function())
-> {ok, term()}
| {stop, term()}.
call_fold(Hookpoint, Req, AccFun) ->
call_fold(Hookpoint, Req, AccFun, running()).
call_fold(Hookpoint, Req, AccFun, emqx_exhook_mngr:running()).
call_fold(_, Req, _, []) ->
{ok, Req};
call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of
case emqx_exhook_server:call(Hookpoint, Req,
emqx_exhook_mngr:server(ServiceName)) of
{ok, Resp} ->
case AccFun(Req, Resp) of
{stop, NReq} -> {stop, NReq};
@ -103,34 +91,3 @@ call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
_ ->
call_fold(Hookpoint, Req, AccFun, More)
end.
%%----------------------------------------------------------
%% Storage
-compile({inline, [save/2]}).
save(Name, ServiceState) ->
Saved = persistent_term:get(?APP, []),
persistent_term:put(?APP, lists:reverse([Name | Saved])),
persistent_term:put({?APP, Name}, ServiceState).
-compile({inline, [unsave/1]}).
unsave(Name) ->
case persistent_term:get(?APP, []) of
[] ->
persistent_term:erase(?APP);
Saved ->
persistent_term:put(?APP, lists:delete(Name, Saved))
end,
persistent_term:erase({?APP, Name}),
ok.
-compile({inline, [running/0]}).
running() ->
persistent_term:get(?APP, []).
-compile({inline, [server/1]}).
server(Name) ->
case catch persistent_term:get({?APP, Name}) of
{'EXIT', {badarg,_}} -> undefined;
Service -> Service
end.

View File

@ -22,25 +22,18 @@
cli(["server", "list"]) ->
if_enabled(fun() ->
Services = emqx_exhook:list(),
[emqx_ctl:print("HookServer(~s)~n",
[emqx_exhook_server:format(Service)]) || Service <- Services]
ServerNames = emqx_exhook:list(),
[emqx_ctl:print("Server(~s)~n", [format(Name)]) || Name <- ServerNames]
end);
cli(["server", "enable", Name0]) ->
cli(["server", "enable", Name]) ->
if_enabled(fun() ->
Name = list_to_atom(Name0),
case proplists:get_value(Name, application:get_env(?APP, servers, [])) of
undefined ->
emqx_ctl:print("not_found~n");
Opts ->
print(emqx_exhook:enable(Name, Opts))
end
print(emqx_exhook:enable(list_to_existing_atom(Name)))
end);
cli(["server", "disable", Name]) ->
if_enabled(fun() ->
print(emqx_exhook:disable(list_to_atom(Name)))
print(emqx_exhook:disable(list_to_existing_atom(Name)))
end);
cli(["server", "stats"]) ->
@ -65,7 +58,8 @@ print({error, Reason}) ->
if_enabled(Fun) ->
case lists:keymember(?APP, 1, application:which_applications()) of
true -> Fun();
true ->
Fun();
_ -> hint()
end.
@ -79,3 +73,11 @@ stats() ->
_ -> Acc
end
end, [], emqx_metrics:all())).
format(Name) ->
case emqx_exhook_mngr:server(Name) of
undefined ->
io_lib:format("name=~s, hooks=#{}, active=false", [Name]);
Server ->
emqx_exhook_server:format(Server)
end.

View File

@ -23,7 +23,18 @@
-include_lib("emqx/include/logger.hrl").
%% APIs
-export([start_link/2]).
-export([start_link/3]).
%% Mgmt API
-export([ enable/2
, disable/2
, list/1
]).
%% Helper funcs
-export([ running/0
, server/1
]).
%% gen_server callbacks
-export([ init/1
@ -36,13 +47,15 @@
-record(state, {
%% Running servers
running :: map(),
running :: map(), %% XXX: server order?
%% Wait to reload servers
waiting :: map(),
%% Marked stopped servers
stopped :: map(),
%% Auto reconnect timer interval
auto_reconnect :: false | non_neg_integer(),
%% Request options
request_options :: grpc_client:options(),
%% Timer references
trefs :: map()
}).
@ -54,24 +67,40 @@
| {port, inet:port_number()}
].
-define(DEFAULT_TIMEOUT, 60000).
-define(CNTER, emqx_exhook_counter).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec start_link(servers(), false | non_neg_integer())
-spec start_link(servers(), false | non_neg_integer(), grpc_client:options())
->ignore
| {ok, pid()}
| {error, any()}.
start_link(Servers, AutoReconnect) ->
gen_server:start_link(?MODULE, [Servers, AutoReconnect], []).
start_link(Servers, AutoReconnect, ReqOpts) ->
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
-spec enable(pid(), atom()|string()) -> ok | {error, term()}.
enable(Pid, Name) ->
call(Pid, {load, Name}).
-spec disable(pid(), atom()|string()) -> ok | {error, term()}.
disable(Pid, Name) ->
call(Pid, {unload, Name}).
list(Pid) ->
call(Pid, list).
call(Pid, Req) ->
gen_server:call(Pid, Req, ?DEFAULT_TIMEOUT).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Servers, AutoReconnect]) ->
init([Servers, AutoReconnect, ReqOpts]) ->
%% XXX: Due to the ExHook Module in the enterprise,
%% this process may start multiple times and they will share this table
try
@ -82,29 +111,53 @@ init([Servers, AutoReconnect]) ->
end,
%% Load the hook servers
{Waiting, Running} = load_all_servers(Servers),
{Waiting, Running} = load_all_servers(Servers, ReqOpts),
{ok, ensure_reload_timer(
#state{waiting = Waiting,
running = Running,
stopped = #{},
request_options = ReqOpts,
auto_reconnect = AutoReconnect,
trefs = #{}
}
)}.
%% @private
load_all_servers(Servers) ->
load_all_servers(Servers, #{}, #{}).
load_all_servers([], Waiting, Running) ->
load_all_servers(Servers, ReqOpts) ->
load_all_servers(Servers, ReqOpts, #{}, #{}).
load_all_servers([], _Request, Waiting, Running) ->
{Waiting, Running};
load_all_servers([{Name, Options}|More], Waiting, Running) ->
{NWaiting, NRunning} = case emqx_exhook:enable(Name, Options) of
ok ->
{Waiting, Running#{Name => Options}};
{error, _} ->
{Waiting#{Name => Options}, Running}
end,
load_all_servers(More, NWaiting, NRunning).
load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) ->
{NWaiting, NRunning} =
case emqx_exhook_server:load(Name, Options, ReqOpts) of
{ok, ServerState} ->
save(Name, ServerState),
{Waiting, Running#{Name => Options}};
{error, _} ->
{Waiting#{Name => Options}, Running}
end,
load_all_servers(More, ReqOpts, NWaiting, NRunning).
handle_call({load, Name}, _From, State) ->
{Result, NState} = do_load_server(Name, State),
{reply, Result, NState};
handle_call({unload, Name}, _From, State) ->
case do_unload_server(Name, State) of
{error, Reason} ->
{reply, {error, Reason}, State};
{ok, NState} ->
{reply, ok, NState}
end;
handle_call(list, _From, State = #state{
running = Running,
waiting = Waiting,
stopped = Stopped}) ->
ServerNames = maps:keys(Running)
++ maps:keys(Waiting)
++ maps:keys(Stopped),
{reply, ServerNames, State};
handle_call(_Request, _From, State) ->
Reply = ok,
@ -113,33 +166,27 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, _Ref, {reload, Name}},
State0 = #state{waiting = Waiting,
running = Running,
trefs = TRefs}) ->
State = State0#state{trefs = maps:remove(Name, TRefs)},
case maps:get(Name, Waiting, undefined) of
undefined ->
{noreply, State};
Options ->
case emqx_exhook:enable(Name, Options) of
ok ->
?LOG(warning, "Reconnect to exhook callback server "
"\"~s\" successfully!", [Name]),
{noreply, State#state{
running = maps:put(Name, Options, Running),
waiting = maps:remove(Name, Waiting)}
};
{error, _} ->
{noreply, ensure_reload_timer(State)}
end
handle_info({timeout, _Ref, {reload, Name}}, State) ->
{Result, NState} = do_load_server(Name, State),
case Result of
ok ->
{noreply, NState};
{error, not_found} ->
{noreply, NState};
{error, Reason} ->
?LOG(warning, "Failed to reload exhook callback server \"~s\", "
"Reason: ~0p", [Name, Reason]),
{noreply, ensure_reload_timer(NState)}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
_ = emqx_exhook:disable_all(),
terminate(_Reason, State = #state{stopped = Stopped}) ->
_ = maps:fold(fun(Name, _, AccIn) ->
{ok, NAccIn} = do_unload_server(Name, AccIn),
NAccIn
end, State, Stopped),
_ = unload_exhooks(),
ok.
@ -154,6 +201,49 @@ unload_exhooks() ->
[emqx:unhook(Name, {M, F}) ||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
do_load_server(Name, State0 = #state{
waiting = Waiting,
running = Running,
stopped = Stopped,
request_options = ReqOpts}) ->
State = clean_reload_timer(Name, State0),
case maps:get(Name, Running, undefined) of
undefined ->
case maps:get(Name, Stopped,
maps:get(Name, Waiting, undefined)) of
undefined ->
{{error, not_found}, State};
Options ->
case emqx_exhook_server:load(Name, Options, ReqOpts) of
{ok, ServerState} ->
save(Name, ServerState),
?LOG(info, "Load exhook callback server "
"\"~s\" successfully!", [Name]),
{ok, State#state{
running = maps:put(Name, Options, Running),
waiting = maps:remove(Name, Waiting),
stopped = maps:remove(Name, Stopped)
}
};
{error, Reason} ->
{{error, Reason}, State}
end
end;
_ ->
{{error, already_started}, State}
end.
do_unload_server(Name, State = #state{running = Running, stopped = Stopped}) ->
case maps:take(Name, Running) of
error -> {error, not_running};
{Options, NRunning} ->
ok = emqx_exhook_server:unload(server(Name)),
ok = unsave(Name),
{ok, State#state{running = NRunning,
stopped = maps:put(Name, Options, Stopped)
}}
end.
ensure_reload_timer(State = #state{auto_reconnect = false}) ->
State;
ensure_reload_timer(State = #state{waiting = Waiting,
@ -169,3 +259,38 @@ ensure_reload_timer(State = #state{waiting = Waiting,
end
end, TRefs, Waiting),
State#state{trefs = NRefs}.
clean_reload_timer(Name, State = #state{trefs = TRefs}) ->
case maps:take(Name, TRefs) of
error -> State;
{TRef, NTRefs} ->
_ = erlang:cancel_timer(TRef),
State#state{trefs = NTRefs}
end.
%%--------------------------------------------------------------------
%% Server state persistent
save(Name, ServerState) ->
Saved = persistent_term:get(?APP, []),
persistent_term:put(?APP, lists:reverse([Name | Saved])),
persistent_term:put({?APP, Name}, ServerState).
unsave(Name) ->
case persistent_term:get(?APP, []) of
[] ->
persistent_term:erase(?APP);
Saved ->
persistent_term:put(?APP, lists:delete(Name, Saved))
end,
persistent_term:erase({?APP, Name}),
ok.
running() ->
persistent_term:get(?APP, []).
server(Name) ->
case catch persistent_term:get({?APP, Name}) of
{'EXIT', {badarg,_}} -> undefined;
Service -> Service
end.

View File

@ -25,7 +25,7 @@
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
%% Load/Unload
-export([ load/2
-export([ load/3
, unload/1
]).
@ -40,8 +40,8 @@
-record(server, {
%% Server name (equal to grpc client channel name)
name :: server_name(),
%% The server started options
options :: list(),
%% The function options
options :: map(),
%% gRPC channel pid
channel :: pid(),
%% Registered hook names and options
@ -81,8 +81,8 @@
%% Load/Unload APIs
%%--------------------------------------------------------------------
-spec load(atom(), list()) -> {ok, server()} | {error, term()} .
load(Name0, Opts0) ->
-spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} .
load(Name0, Opts0, ReqOpts) ->
Name = to_list(Name0),
{SvrAddr, ClientOpts} = channel_opts(Opts0),
case emqx_exhook_sup:start_grpc_client_channel(
@ -90,7 +90,7 @@ load(Name0, Opts0) ->
SvrAddr,
ClientOpts) of
{ok, _ChannPoolPid} ->
case do_init(Name) of
case do_init(Name, ReqOpts) of
{ok, HookSpecs} ->
%% Reigster metrics
Prefix = lists:flatten(
@ -99,7 +99,7 @@ load(Name0, Opts0) ->
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #server{name = Name,
options = Opts0,
options = ReqOpts,
channel = _ChannPoolPid,
hookspec = HookSpecs,
prefix = Prefix }};
@ -141,19 +141,19 @@ format_http_uri(Scheme, Host0, Port) ->
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
-spec unload(server()) -> ok.
unload(#server{name = Name, hookspec = HookSpecs}) ->
_ = do_deinit(Name),
unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) ->
_ = do_deinit(Name, ReqOpts),
_ = may_unload_hooks(HookSpecs),
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
ok.
do_deinit(Name) ->
_ = do_call(Name, 'on_provider_unloaded', #{}),
do_deinit(Name, ReqOpts) ->
_ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts),
ok.
do_init(ChannName) ->
do_init(ChannName, ReqOpts) ->
Req = #{broker => maps:from_list(emqx_sys:info())},
case do_call(ChannName, 'on_provider_loaded', Req) of
case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of
{ok, InitialResp} ->
try
{ok, resovle_hookspec(maps:get(hooks, InitialResp, []))}
@ -219,7 +219,7 @@ may_unload_hooks(HookSpecs) ->
end, maps:keys(HookSpecs)).
format(#server{name = Name, hookspec = Hooks}) ->
io_lib:format("name=~p, hooks=~0p", [Name, Hooks]).
io_lib:format("name=~s, hooks=~0p, active=true", [Name, Hooks]).
%%--------------------------------------------------------------------
%% APIs
@ -232,7 +232,8 @@ name(#server{name = Name}) ->
-> ignore
| {ok, Resp :: term()}
| {error, term()}.
call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix}) ->
call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
hookspec = Hooks, prefix = Prefix}) ->
GrpcFunc = hk2func(Hookpoint),
case maps:get(Hookpoint, Hooks, undefined) of
undefined -> ignore;
@ -247,7 +248,7 @@ call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix
false -> ignore;
_ ->
inc_metrics(Prefix, Hookpoint),
do_call(ChannName, GrpcFunc, Req)
do_call(ChannName, GrpcFunc, Req, ReqOpts)
end
end.
@ -265,9 +266,9 @@ match_topic_filter(_, []) ->
match_topic_filter(TopicName, TopicFilter) ->
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
-spec do_call(string(), atom(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req) ->
Options = #{channel => ChannName},
-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req, ReqOpts) ->
Options = ReqOpts#{channel => ChannName},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} ->

View File

@ -41,7 +41,8 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Mngr = ?CHILD(emqx_exhook_mngr, worker, [servers(), auto_reconnect()]),
Mngr = ?CHILD(emqx_exhook_mngr, worker,
[servers(), auto_reconnect(), request_options()]),
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
servers() ->
@ -50,6 +51,9 @@ servers() ->
auto_reconnect() ->
application:get_env(emqx_exhook, auto_reconnect, 60000).
request_options() ->
#{timeout => application:get_env(emqx_exhook, request_timeout, 5000)}.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------