diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index 032d7f91a..a724aa9c7 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -21,10 +21,8 @@ -logger_header("[ExHook]"). -%% Mgmt APIs --export([ enable/2 +-export([ enable/1 , disable/1 - , disable_all/0 , list/0 ]). @@ -36,64 +34,54 @@ %% Mgmt APIs %%-------------------------------------------------------------------- -%% XXX: Only return the running servers --spec list() -> [emqx_exhook_server:server()]. -list() -> - [server(Name) || Name <- running()]. - --spec enable(atom()|string(), list()) -> ok | {error, term()}. -enable(Name, Opts) -> - case lists:member(Name, running()) of - true -> - {error, already_started}; - _ -> - case emqx_exhook_server:load(Name, Opts) of - {ok, ServiceState} -> - save(Name, ServiceState); - {error, Reason} -> - ?LOG(error, "Load server ~p failed: ~p", [Name, Reason]), - {error, Reason} - end - end. +-spec enable(atom()|string()) -> ok | {error, term()}. +enable(Name) -> + with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end). -spec disable(atom()|string()) -> ok | {error, term()}. disable(Name) -> - case server(Name) of - undefined -> {error, not_running}; - Service -> - ok = emqx_exhook_server:unload(Service), - unsave(Name) + with_mngr(fun(Pid) -> emqx_exhook_mngr:disable(Pid, Name) end). + +-spec list() -> [atom() | string()]. +list() -> + with_mngr(fun(Pid) -> emqx_exhook_mngr:list(Pid) end). + +with_mngr(Fun) -> + case lists:keyfind(emqx_exhook_mngr, 1, + supervisor:which_children(emqx_exhook_sup)) of + {_, Pid, _, _} -> + Fun(Pid); + _ -> + {error, no_manager_svr} end. --spec disable_all() -> ok. -disable_all() -> - lists:foreach(fun disable/1, running()). - -%%---------------------------------------------------------- +%%-------------------------------------------------------------------- %% Dispatch APIs -%%---------------------------------------------------------- +%%-------------------------------------------------------------------- -spec cast(atom(), map()) -> ok. cast(Hookpoint, Req) -> - cast(Hookpoint, Req, running()). + cast(Hookpoint, Req, emqx_exhook_mngr:running()). cast(_, _, []) -> ok; cast(Hookpoint, Req, [ServiceName|More]) -> %% XXX: Need a real asynchronous running - _ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)), + _ = emqx_exhook_server:call(Hookpoint, Req, + emqx_exhook_mngr:server(ServiceName)), cast(Hookpoint, Req, More). -spec call_fold(atom(), term(), function()) -> {ok, term()} | {stop, term()}. call_fold(Hookpoint, Req, AccFun) -> - call_fold(Hookpoint, Req, AccFun, running()). + call_fold(Hookpoint, Req, AccFun, emqx_exhook_mngr:running()). call_fold(_, Req, _, []) -> {ok, Req}; call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) -> - case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of + case emqx_exhook_server:call(Hookpoint, Req, + emqx_exhook_mngr:server(ServiceName)) of {ok, Resp} -> case AccFun(Req, Resp) of {stop, NReq} -> {stop, NReq}; @@ -103,34 +91,3 @@ call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) -> _ -> call_fold(Hookpoint, Req, AccFun, More) end. - -%%---------------------------------------------------------- -%% Storage - --compile({inline, [save/2]}). -save(Name, ServiceState) -> - Saved = persistent_term:get(?APP, []), - persistent_term:put(?APP, lists:reverse([Name | Saved])), - persistent_term:put({?APP, Name}, ServiceState). - --compile({inline, [unsave/1]}). -unsave(Name) -> - case persistent_term:get(?APP, []) of - [] -> - persistent_term:erase(?APP); - Saved -> - persistent_term:put(?APP, lists:delete(Name, Saved)) - end, - persistent_term:erase({?APP, Name}), - ok. - --compile({inline, [running/0]}). -running() -> - persistent_term:get(?APP, []). - --compile({inline, [server/1]}). -server(Name) -> - case catch persistent_term:get({?APP, Name}) of - {'EXIT', {badarg,_}} -> undefined; - Service -> Service - end. diff --git a/apps/emqx_exhook/src/emqx_exhook_cli.erl b/apps/emqx_exhook/src/emqx_exhook_cli.erl index a8dc43b16..804d05a2d 100644 --- a/apps/emqx_exhook/src/emqx_exhook_cli.erl +++ b/apps/emqx_exhook/src/emqx_exhook_cli.erl @@ -22,25 +22,18 @@ cli(["server", "list"]) -> if_enabled(fun() -> - Services = emqx_exhook:list(), - [emqx_ctl:print("HookServer(~s)~n", - [emqx_exhook_server:format(Service)]) || Service <- Services] + ServerNames = emqx_exhook:list(), + [emqx_ctl:print("Server(~s)~n", [format(Name)]) || Name <- ServerNames] end); -cli(["server", "enable", Name0]) -> +cli(["server", "enable", Name]) -> if_enabled(fun() -> - Name = list_to_atom(Name0), - case proplists:get_value(Name, application:get_env(?APP, servers, [])) of - undefined -> - emqx_ctl:print("not_found~n"); - Opts -> - print(emqx_exhook:enable(Name, Opts)) - end + print(emqx_exhook:enable(list_to_existing_atom(Name))) end); cli(["server", "disable", Name]) -> if_enabled(fun() -> - print(emqx_exhook:disable(list_to_atom(Name))) + print(emqx_exhook:disable(list_to_existing_atom(Name))) end); cli(["server", "stats"]) -> @@ -65,7 +58,8 @@ print({error, Reason}) -> if_enabled(Fun) -> case lists:keymember(?APP, 1, application:which_applications()) of - true -> Fun(); + true -> + Fun(); _ -> hint() end. @@ -79,3 +73,11 @@ stats() -> _ -> Acc end end, [], emqx_metrics:all())). + +format(Name) -> + case emqx_exhook_mngr:server(Name) of + undefined -> + io_lib:format("name=~s, hooks=#{}, active=false", [Name]); + Server -> + emqx_exhook_server:format(Server) + end. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index 8e3dfaf82..ce375825f 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -23,7 +23,18 @@ -include_lib("emqx/include/logger.hrl"). %% APIs --export([start_link/2]). +-export([start_link/3]). + +%% Mgmt API +-export([ enable/2 + , disable/2 + , list/1 + ]). + +%% Helper funcs +-export([ running/0 + , server/1 + ]). %% gen_server callbacks -export([ init/1 @@ -36,13 +47,15 @@ -record(state, { %% Running servers - running :: map(), + running :: map(), %% XXX: server order? %% Wait to reload servers waiting :: map(), %% Marked stopped servers stopped :: map(), %% Auto reconnect timer interval auto_reconnect :: false | non_neg_integer(), + %% Request options + request_options :: grpc_client:options(), %% Timer references trefs :: map() }). @@ -54,24 +67,40 @@ | {port, inet:port_number()} ]. +-define(DEFAULT_TIMEOUT, 60000). + -define(CNTER, emqx_exhook_counter). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- --spec start_link(servers(), false | non_neg_integer()) +-spec start_link(servers(), false | non_neg_integer(), grpc_client:options()) ->ignore | {ok, pid()} | {error, any()}. -start_link(Servers, AutoReconnect) -> - gen_server:start_link(?MODULE, [Servers, AutoReconnect], []). +start_link(Servers, AutoReconnect, ReqOpts) -> + gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). + +-spec enable(pid(), atom()|string()) -> ok | {error, term()}. +enable(Pid, Name) -> + call(Pid, {load, Name}). + +-spec disable(pid(), atom()|string()) -> ok | {error, term()}. +disable(Pid, Name) -> + call(Pid, {unload, Name}). + +list(Pid) -> + call(Pid, list). + +call(Pid, Req) -> + gen_server:call(Pid, Req, ?DEFAULT_TIMEOUT). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([Servers, AutoReconnect]) -> +init([Servers, AutoReconnect, ReqOpts]) -> %% XXX: Due to the ExHook Module in the enterprise, %% this process may start multiple times and they will share this table try @@ -82,29 +111,53 @@ init([Servers, AutoReconnect]) -> end, %% Load the hook servers - {Waiting, Running} = load_all_servers(Servers), + {Waiting, Running} = load_all_servers(Servers, ReqOpts), {ok, ensure_reload_timer( #state{waiting = Waiting, running = Running, stopped = #{}, + request_options = ReqOpts, auto_reconnect = AutoReconnect, trefs = #{} } )}. %% @private -load_all_servers(Servers) -> - load_all_servers(Servers, #{}, #{}). -load_all_servers([], Waiting, Running) -> +load_all_servers(Servers, ReqOpts) -> + load_all_servers(Servers, ReqOpts, #{}, #{}). +load_all_servers([], _Request, Waiting, Running) -> {Waiting, Running}; -load_all_servers([{Name, Options}|More], Waiting, Running) -> - {NWaiting, NRunning} = case emqx_exhook:enable(Name, Options) of - ok -> - {Waiting, Running#{Name => Options}}; - {error, _} -> - {Waiting#{Name => Options}, Running} - end, - load_all_servers(More, NWaiting, NRunning). +load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> + {NWaiting, NRunning} = + case emqx_exhook_server:load(Name, Options, ReqOpts) of + {ok, ServerState} -> + save(Name, ServerState), + {Waiting, Running#{Name => Options}}; + {error, _} -> + {Waiting#{Name => Options}, Running} + end, + load_all_servers(More, ReqOpts, NWaiting, NRunning). + +handle_call({load, Name}, _From, State) -> + {Result, NState} = do_load_server(Name, State), + {reply, Result, NState}; + +handle_call({unload, Name}, _From, State) -> + case do_unload_server(Name, State) of + {error, Reason} -> + {reply, {error, Reason}, State}; + {ok, NState} -> + {reply, ok, NState} + end; + +handle_call(list, _From, State = #state{ + running = Running, + waiting = Waiting, + stopped = Stopped}) -> + ServerNames = maps:keys(Running) + ++ maps:keys(Waiting) + ++ maps:keys(Stopped), + {reply, ServerNames, State}; handle_call(_Request, _From, State) -> Reply = ok, @@ -113,33 +166,27 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({timeout, _Ref, {reload, Name}}, - State0 = #state{waiting = Waiting, - running = Running, - trefs = TRefs}) -> - State = State0#state{trefs = maps:remove(Name, TRefs)}, - case maps:get(Name, Waiting, undefined) of - undefined -> - {noreply, State}; - Options -> - case emqx_exhook:enable(Name, Options) of - ok -> - ?LOG(warning, "Reconnect to exhook callback server " - "\"~s\" successfully!", [Name]), - {noreply, State#state{ - running = maps:put(Name, Options, Running), - waiting = maps:remove(Name, Waiting)} - }; - {error, _} -> - {noreply, ensure_reload_timer(State)} - end +handle_info({timeout, _Ref, {reload, Name}}, State) -> + {Result, NState} = do_load_server(Name, State), + case Result of + ok -> + {noreply, NState}; + {error, not_found} -> + {noreply, NState}; + {error, Reason} -> + ?LOG(warning, "Failed to reload exhook callback server \"~s\", " + "Reason: ~0p", [Name, Reason]), + {noreply, ensure_reload_timer(NState)} end; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - _ = emqx_exhook:disable_all(), +terminate(_Reason, State = #state{stopped = Stopped}) -> + _ = maps:fold(fun(Name, _, AccIn) -> + {ok, NAccIn} = do_unload_server(Name, AccIn), + NAccIn + end, State, Stopped), _ = unload_exhooks(), ok. @@ -154,6 +201,49 @@ unload_exhooks() -> [emqx:unhook(Name, {M, F}) || {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. +do_load_server(Name, State0 = #state{ + waiting = Waiting, + running = Running, + stopped = Stopped, + request_options = ReqOpts}) -> + State = clean_reload_timer(Name, State0), + case maps:get(Name, Running, undefined) of + undefined -> + case maps:get(Name, Stopped, + maps:get(Name, Waiting, undefined)) of + undefined -> + {{error, not_found}, State}; + Options -> + case emqx_exhook_server:load(Name, Options, ReqOpts) of + {ok, ServerState} -> + save(Name, ServerState), + ?LOG(info, "Load exhook callback server " + "\"~s\" successfully!", [Name]), + {ok, State#state{ + running = maps:put(Name, Options, Running), + waiting = maps:remove(Name, Waiting), + stopped = maps:remove(Name, Stopped) + } + }; + {error, Reason} -> + {{error, Reason}, State} + end + end; + _ -> + {{error, already_started}, State} + end. + +do_unload_server(Name, State = #state{running = Running, stopped = Stopped}) -> + case maps:take(Name, Running) of + error -> {error, not_running}; + {Options, NRunning} -> + ok = emqx_exhook_server:unload(server(Name)), + ok = unsave(Name), + {ok, State#state{running = NRunning, + stopped = maps:put(Name, Options, Stopped) + }} + end. + ensure_reload_timer(State = #state{auto_reconnect = false}) -> State; ensure_reload_timer(State = #state{waiting = Waiting, @@ -169,3 +259,38 @@ ensure_reload_timer(State = #state{waiting = Waiting, end end, TRefs, Waiting), State#state{trefs = NRefs}. + +clean_reload_timer(Name, State = #state{trefs = TRefs}) -> + case maps:take(Name, TRefs) of + error -> State; + {TRef, NTRefs} -> + _ = erlang:cancel_timer(TRef), + State#state{trefs = NTRefs} + end. + +%%-------------------------------------------------------------------- +%% Server state persistent + +save(Name, ServerState) -> + Saved = persistent_term:get(?APP, []), + persistent_term:put(?APP, lists:reverse([Name | Saved])), + persistent_term:put({?APP, Name}, ServerState). + +unsave(Name) -> + case persistent_term:get(?APP, []) of + [] -> + persistent_term:erase(?APP); + Saved -> + persistent_term:put(?APP, lists:delete(Name, Saved)) + end, + persistent_term:erase({?APP, Name}), + ok. + +running() -> + persistent_term:get(?APP, []). + +server(Name) -> + case catch persistent_term:get({?APP, Name}) of + {'EXIT', {badarg,_}} -> undefined; + Service -> Service + end. diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index f4965e4ca..7df5b643c 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -25,7 +25,7 @@ -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% Load/Unload --export([ load/2 +-export([ load/3 , unload/1 ]). @@ -40,8 +40,8 @@ -record(server, { %% Server name (equal to grpc client channel name) name :: server_name(), - %% The server started options - options :: list(), + %% The function options + options :: map(), %% gRPC channel pid channel :: pid(), %% Registered hook names and options @@ -81,8 +81,8 @@ %% Load/Unload APIs %%-------------------------------------------------------------------- --spec load(atom(), list()) -> {ok, server()} | {error, term()} . -load(Name0, Opts0) -> +-spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} . +load(Name0, Opts0, ReqOpts) -> Name = to_list(Name0), {SvrAddr, ClientOpts} = channel_opts(Opts0), case emqx_exhook_sup:start_grpc_client_channel( @@ -90,7 +90,7 @@ load(Name0, Opts0) -> SvrAddr, ClientOpts) of {ok, _ChannPoolPid} -> - case do_init(Name) of + case do_init(Name, ReqOpts) of {ok, HookSpecs} -> %% Reigster metrics Prefix = lists:flatten( @@ -99,7 +99,7 @@ load(Name0, Opts0) -> %% Ensure hooks ensure_hooks(HookSpecs), {ok, #server{name = Name, - options = Opts0, + options = ReqOpts, channel = _ChannPoolPid, hookspec = HookSpecs, prefix = Prefix }}; @@ -141,19 +141,19 @@ format_http_uri(Scheme, Host0, Port) -> lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])). -spec unload(server()) -> ok. -unload(#server{name = Name, hookspec = HookSpecs}) -> - _ = do_deinit(Name), +unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) -> + _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), ok. -do_deinit(Name) -> - _ = do_call(Name, 'on_provider_unloaded', #{}), +do_deinit(Name, ReqOpts) -> + _ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts), ok. -do_init(ChannName) -> +do_init(ChannName, ReqOpts) -> Req = #{broker => maps:from_list(emqx_sys:info())}, - case do_call(ChannName, 'on_provider_loaded', Req) of + case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of {ok, InitialResp} -> try {ok, resovle_hookspec(maps:get(hooks, InitialResp, []))} @@ -219,7 +219,7 @@ may_unload_hooks(HookSpecs) -> end, maps:keys(HookSpecs)). format(#server{name = Name, hookspec = Hooks}) -> - io_lib:format("name=~p, hooks=~0p", [Name, Hooks]). + io_lib:format("name=~s, hooks=~0p, active=true", [Name, Hooks]). %%-------------------------------------------------------------------- %% APIs @@ -232,7 +232,8 @@ name(#server{name = Name}) -> -> ignore | {ok, Resp :: term()} | {error, term()}. -call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix}) -> +call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, + hookspec = Hooks, prefix = Prefix}) -> GrpcFunc = hk2func(Hookpoint), case maps:get(Hookpoint, Hooks, undefined) of undefined -> ignore; @@ -247,7 +248,7 @@ call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix false -> ignore; _ -> inc_metrics(Prefix, Hookpoint), - do_call(ChannName, GrpcFunc, Req) + do_call(ChannName, GrpcFunc, Req, ReqOpts) end end. @@ -265,9 +266,9 @@ match_topic_filter(_, []) -> match_topic_filter(TopicName, TopicFilter) -> lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter). --spec do_call(string(), atom(), map()) -> {ok, map()} | {error, term()}. -do_call(ChannName, Fun, Req) -> - Options = #{channel => ChannName}, +-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. +do_call(ChannName, Fun, Req, ReqOpts) -> + Options = ReqOpts#{channel => ChannName}, ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of {ok, Resp, _Metadata} -> diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index 96509181a..cb6cea635 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -41,7 +41,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Mngr = ?CHILD(emqx_exhook_mngr, worker, [servers(), auto_reconnect()]), + Mngr = ?CHILD(emqx_exhook_mngr, worker, + [servers(), auto_reconnect(), request_options()]), {ok, {{one_for_one, 10, 100}, [Mngr]}}. servers() -> @@ -50,6 +51,9 @@ servers() -> auto_reconnect() -> application:get_env(emqx_exhook, auto_reconnect, 60000). +request_options() -> + #{timeout => application:get_env(emqx_exhook, request_timeout, 5000)}. + %%-------------------------------------------------------------------- %% APIs %%--------------------------------------------------------------------