diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 4d94def06..6a4725e02 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -24,6 +24,17 @@ ## Value: false | Duration #exhook.auto_reconnect = 60s +## The exhook execution priority on the Chain of the emqx hooks. +## +## Modify the field to fix the exhook execute order before/after other plugins/modules. +## By default, most hooks registered by plugins or modules have a priority of 0. +## +## With the same priority of 0, the execute order depends on hookpoints mount order. +## Scilicet is the loaded order of plugins/ modules. +## +## Default: 0 +## Value: Integer +#exhook.hook_priority = 0 ##-------------------------------------------------------------------- ## The Hook callback servers diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 7301fdcbb..58deb707a 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -41,4 +41,6 @@ , {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} ]). +-define(DEFAULT_HOOK_PRIORITY, 0). + -endif. diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema index d11001c0d..4f6419c6a 100644 --- a/apps/emqx_exhook/priv/emqx_exhook.schema +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -15,6 +15,11 @@ {datatype, string} ]}. +{mapping, "exhook.hook_priority", "emqx_exhook.hook_priority", [ + {default, 0}, + {datatype, integer} +]}. + {translation, "emqx_exhook.auto_reconnect", fun(Conf) -> case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of "false" -> false; diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index cadd5eb37..3162b8c54 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -23,7 +23,7 @@ -include_lib("emqx/include/logger.hrl"). %% APIs --export([start_link/3]). +-export([start_link/4]). %% Mgmt API -export([ enable/2 @@ -59,9 +59,14 @@ %% Request options request_options :: grpc_client:options(), %% Timer references - trefs :: map() + trefs :: map(), + %% Hooks execute options + hooks_options :: hooks_options() }). +-export_type([ server_options/0 + , hooks_options/0]). + -type servers() :: [{Name :: atom(), server_options()}]. -type server_options() :: [ {scheme, http | https} @@ -69,6 +74,8 @@ | {port, inet:port_number()} ]. +-type hooks_options() :: #{hook_priority => integer()}. + -define(DEFAULT_TIMEOUT, 60000). -define(CNTER, emqx_exhook_counter). @@ -77,12 +84,12 @@ %% APIs %%-------------------------------------------------------------------- --spec start_link(servers(), false | non_neg_integer(), grpc_client:options()) +-spec start_link(servers(), false | non_neg_integer(), grpc_client:options(), hooks_options()) ->ignore | {ok, pid()} | {error, any()}. -start_link(Servers, AutoReconnect, ReqOpts) -> - gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). +start_link(Servers, AutoReconnect, ReqOpts, HooksOpts) -> + gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts, HooksOpts], []). -spec enable(pid(), atom()|string()) -> ok | {error, term()}. enable(Pid, Name) -> @@ -102,7 +109,7 @@ call(Pid, Req) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Servers, AutoReconnect, ReqOpts0]) -> +init([Servers, AutoReconnect, ReqOpts0, HooksOpts]) -> process_flag(trap_exit, true), %% XXX: Due to the ExHook Module in the enterprise, %% this process may start multiple times and they will share this table @@ -120,32 +127,33 @@ init([Servers, AutoReconnect, ReqOpts0]) -> %% Load the hook servers ReqOpts = maps:without([request_failed_action], ReqOpts0), - {Waiting, Running} = load_all_servers(Servers, ReqOpts), + {Waiting, Running} = load_all_servers(Servers, ReqOpts, HooksOpts), {ok, ensure_reload_timer( #state{waiting = Waiting, running = Running, stopped = #{}, request_options = ReqOpts, auto_reconnect = AutoReconnect, - trefs = #{} + trefs = #{}, + hooks_options = HooksOpts } )}. %% @private -load_all_servers(Servers, ReqOpts) -> - load_all_servers(Servers, ReqOpts, #{}, #{}). -load_all_servers([], _Request, Waiting, Running) -> +load_all_servers(Servers, ReqOpts, HooksOpts) -> + load_all_servers(Servers, ReqOpts, HooksOpts, #{}, #{}). +load_all_servers([], _Request, _HooksOpts, Waiting, Running) -> {Waiting, Running}; -load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> +load_all_servers([{Name, Options} | More], ReqOpts, HooksOpts, Waiting, Running) -> {NWaiting, NRunning} = - case emqx_exhook_server:load(Name, Options, ReqOpts) of + case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of {ok, ServerState} -> save(Name, ServerState), {Waiting, Running#{Name => Options}}; {error, _} -> {Waiting#{Name => Options}, Running} end, - load_all_servers(More, ReqOpts, NWaiting, NRunning). + load_all_servers(More, ReqOpts, HooksOpts, NWaiting, NRunning). handle_call({load, Name}, _From, State) -> {Result, NState} = do_load_server(Name, State), @@ -214,7 +222,8 @@ do_load_server(Name, State0 = #state{ waiting = Waiting, running = Running, stopped = Stopped, - request_options = ReqOpts}) -> + request_options = ReqOpts, + hooks_options = HooksOpts}) -> State = clean_reload_timer(Name, State0), case maps:get(Name, Running, undefined) of undefined -> @@ -223,7 +232,7 @@ do_load_server(Name, State0 = #state{ undefined -> {{error, not_found}, State}; Options -> - case emqx_exhook_server:load(Name, Options, ReqOpts) of + case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of {ok, ServerState} -> save(Name, ServerState), ?LOG(info, "Load exhook callback server " diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 7df5b643c..c4be91d07 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -25,7 +25,7 @@ -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% Load/Unload --export([ load/3 +-export([ load/4 , unload/1 ]). @@ -81,8 +81,9 @@ %% Load/Unload APIs %%-------------------------------------------------------------------- --spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} . -load(Name0, Opts0, ReqOpts) -> +-spec load(atom(), emqx_exhook_mngr:server_options(), grpc_client:options(), emqx_exhook_mngr:hooks_options()) + -> {ok, server()} | {error, term()} . +load(Name0, Opts0, ReqOpts, HooksOpts) -> Name = to_list(Name0), {SvrAddr, ClientOpts} = channel_opts(Opts0), case emqx_exhook_sup:start_grpc_client_channel( @@ -97,7 +98,7 @@ load(Name0, Opts0, ReqOpts) -> io_lib:format("exhook.~s.", [Name])), ensure_metrics(Prefix, HookSpecs), %% Ensure hooks - ensure_hooks(HookSpecs), + ensure_hooks(HookSpecs, maps:get(hook_priority, HooksOpts, ?DEFAULT_HOOK_PRIORITY)), {ok, #server{name = Name, options = ReqOpts, channel = _ChannPoolPid, @@ -193,13 +194,13 @@ ensure_metrics(Prefix, HookSpecs) -> || Hookpoint <- maps:keys(HookSpecs)], lists:foreach(fun emqx_metrics:ensure/1, Keys). -ensure_hooks(HookSpecs) -> +ensure_hooks(HookSpecs, Priority) -> lists:foreach(fun(Hookpoint) -> case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of false -> ?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]); {Hookpoint, {M, F, A}} -> - emqx_hooks:put(Hookpoint, {M, F, A}), + emqx_hooks:put(Hookpoint, {M, F, A}, Priority), ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) end end, maps:keys(HookSpecs)). diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index e9c405de0..1b5b4f7af 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -18,6 +18,8 @@ -behaviour(supervisor). +-include("emqx_exhook.hrl"). + -export([ start_link/0 , init/1 ]). @@ -43,7 +45,7 @@ start_link() -> init([]) -> Mngr = ?CHILD(emqx_exhook_mngr, worker, - [servers(), auto_reconnect(), request_options()]), + [servers(), auto_reconnect(), request_options(), hooks_options()]), {ok, {{one_for_one, 10, 100}, [Mngr]}}. servers() -> @@ -57,6 +59,10 @@ request_options() -> request_failed_action => env(request_failed_action, deny) }. +hooks_options() -> + #{hook_priority => env(hook_priority, ?DEFAULT_HOOK_PRIORITY) + }. + env(Key, Def) -> application:get_env(emqx_exhook, Key, Def).