feat(exhook): support customize hook_priority
This commit is contained in:
parent
38c515908f
commit
b7f10b67a8
|
@ -24,6 +24,17 @@
|
||||||
## Value: false | Duration
|
## Value: false | Duration
|
||||||
#exhook.auto_reconnect = 60s
|
#exhook.auto_reconnect = 60s
|
||||||
|
|
||||||
|
## The exhook execution priority on the Chain of the emqx hooks.
|
||||||
|
##
|
||||||
|
## Modify the field to fix the exhook execute order before/after other plugins/modules.
|
||||||
|
## By default, most hooks registered by plugins or modules have a priority of 0.
|
||||||
|
##
|
||||||
|
## With the same priority of 0, the execute order depends on hookpoints mount order.
|
||||||
|
## Scilicet is the loaded order of plugins/ modules.
|
||||||
|
##
|
||||||
|
## Default: 0
|
||||||
|
## Value: Integer
|
||||||
|
#exhook.hook_priority = 0
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## The Hook callback servers
|
## The Hook callback servers
|
||||||
|
|
|
@ -41,4 +41,6 @@
|
||||||
, {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
|
, {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(DEFAULT_HOOK_PRIORITY, 0).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -15,6 +15,11 @@
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
{mapping, "exhook.hook_priority", "emqx_exhook.hook_priority", [
|
||||||
|
{default, 0},
|
||||||
|
{datatype, integer}
|
||||||
|
]}.
|
||||||
|
|
||||||
{translation, "emqx_exhook.auto_reconnect", fun(Conf) ->
|
{translation, "emqx_exhook.auto_reconnect", fun(Conf) ->
|
||||||
case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of
|
case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of
|
||||||
"false" -> false;
|
"false" -> false;
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([start_link/3]).
|
-export([start_link/4]).
|
||||||
|
|
||||||
%% Mgmt API
|
%% Mgmt API
|
||||||
-export([ enable/2
|
-export([ enable/2
|
||||||
|
@ -59,9 +59,14 @@
|
||||||
%% Request options
|
%% Request options
|
||||||
request_options :: grpc_client:options(),
|
request_options :: grpc_client:options(),
|
||||||
%% Timer references
|
%% Timer references
|
||||||
trefs :: map()
|
trefs :: map(),
|
||||||
|
%% Hooks execute options
|
||||||
|
hooks_options :: hooks_options()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-export_type([ server_options/0
|
||||||
|
, hooks_options/0]).
|
||||||
|
|
||||||
-type servers() :: [{Name :: atom(), server_options()}].
|
-type servers() :: [{Name :: atom(), server_options()}].
|
||||||
|
|
||||||
-type server_options() :: [ {scheme, http | https}
|
-type server_options() :: [ {scheme, http | https}
|
||||||
|
@ -69,6 +74,8 @@
|
||||||
| {port, inet:port_number()}
|
| {port, inet:port_number()}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
-type hooks_options() :: #{hook_priority => integer()}.
|
||||||
|
|
||||||
-define(DEFAULT_TIMEOUT, 60000).
|
-define(DEFAULT_TIMEOUT, 60000).
|
||||||
|
|
||||||
-define(CNTER, emqx_exhook_counter).
|
-define(CNTER, emqx_exhook_counter).
|
||||||
|
@ -77,12 +84,12 @@
|
||||||
%% APIs
|
%% APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec start_link(servers(), false | non_neg_integer(), grpc_client:options())
|
-spec start_link(servers(), false | non_neg_integer(), grpc_client:options(), hooks_options())
|
||||||
->ignore
|
->ignore
|
||||||
| {ok, pid()}
|
| {ok, pid()}
|
||||||
| {error, any()}.
|
| {error, any()}.
|
||||||
start_link(Servers, AutoReconnect, ReqOpts) ->
|
start_link(Servers, AutoReconnect, ReqOpts, HooksOpts) ->
|
||||||
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
|
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts, HooksOpts], []).
|
||||||
|
|
||||||
-spec enable(pid(), atom()|string()) -> ok | {error, term()}.
|
-spec enable(pid(), atom()|string()) -> ok | {error, term()}.
|
||||||
enable(Pid, Name) ->
|
enable(Pid, Name) ->
|
||||||
|
@ -102,7 +109,7 @@ call(Pid, Req) ->
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
init([Servers, AutoReconnect, ReqOpts0]) ->
|
init([Servers, AutoReconnect, ReqOpts0, HooksOpts]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
%% XXX: Due to the ExHook Module in the enterprise,
|
%% XXX: Due to the ExHook Module in the enterprise,
|
||||||
%% this process may start multiple times and they will share this table
|
%% this process may start multiple times and they will share this table
|
||||||
|
@ -120,32 +127,33 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
|
||||||
|
|
||||||
%% Load the hook servers
|
%% Load the hook servers
|
||||||
ReqOpts = maps:without([request_failed_action], ReqOpts0),
|
ReqOpts = maps:without([request_failed_action], ReqOpts0),
|
||||||
{Waiting, Running} = load_all_servers(Servers, ReqOpts),
|
{Waiting, Running} = load_all_servers(Servers, ReqOpts, HooksOpts),
|
||||||
{ok, ensure_reload_timer(
|
{ok, ensure_reload_timer(
|
||||||
#state{waiting = Waiting,
|
#state{waiting = Waiting,
|
||||||
running = Running,
|
running = Running,
|
||||||
stopped = #{},
|
stopped = #{},
|
||||||
request_options = ReqOpts,
|
request_options = ReqOpts,
|
||||||
auto_reconnect = AutoReconnect,
|
auto_reconnect = AutoReconnect,
|
||||||
trefs = #{}
|
trefs = #{},
|
||||||
|
hooks_options = HooksOpts
|
||||||
}
|
}
|
||||||
)}.
|
)}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
load_all_servers(Servers, ReqOpts) ->
|
load_all_servers(Servers, ReqOpts, HooksOpts) ->
|
||||||
load_all_servers(Servers, ReqOpts, #{}, #{}).
|
load_all_servers(Servers, ReqOpts, HooksOpts, #{}, #{}).
|
||||||
load_all_servers([], _Request, Waiting, Running) ->
|
load_all_servers([], _Request, _HooksOpts, Waiting, Running) ->
|
||||||
{Waiting, Running};
|
{Waiting, Running};
|
||||||
load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) ->
|
load_all_servers([{Name, Options} | More], ReqOpts, HooksOpts, Waiting, Running) ->
|
||||||
{NWaiting, NRunning} =
|
{NWaiting, NRunning} =
|
||||||
case emqx_exhook_server:load(Name, Options, ReqOpts) of
|
case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of
|
||||||
{ok, ServerState} ->
|
{ok, ServerState} ->
|
||||||
save(Name, ServerState),
|
save(Name, ServerState),
|
||||||
{Waiting, Running#{Name => Options}};
|
{Waiting, Running#{Name => Options}};
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
{Waiting#{Name => Options}, Running}
|
{Waiting#{Name => Options}, Running}
|
||||||
end,
|
end,
|
||||||
load_all_servers(More, ReqOpts, NWaiting, NRunning).
|
load_all_servers(More, ReqOpts, HooksOpts, NWaiting, NRunning).
|
||||||
|
|
||||||
handle_call({load, Name}, _From, State) ->
|
handle_call({load, Name}, _From, State) ->
|
||||||
{Result, NState} = do_load_server(Name, State),
|
{Result, NState} = do_load_server(Name, State),
|
||||||
|
@ -214,7 +222,8 @@ do_load_server(Name, State0 = #state{
|
||||||
waiting = Waiting,
|
waiting = Waiting,
|
||||||
running = Running,
|
running = Running,
|
||||||
stopped = Stopped,
|
stopped = Stopped,
|
||||||
request_options = ReqOpts}) ->
|
request_options = ReqOpts,
|
||||||
|
hooks_options = HooksOpts}) ->
|
||||||
State = clean_reload_timer(Name, State0),
|
State = clean_reload_timer(Name, State0),
|
||||||
case maps:get(Name, Running, undefined) of
|
case maps:get(Name, Running, undefined) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -223,7 +232,7 @@ do_load_server(Name, State0 = #state{
|
||||||
undefined ->
|
undefined ->
|
||||||
{{error, not_found}, State};
|
{{error, not_found}, State};
|
||||||
Options ->
|
Options ->
|
||||||
case emqx_exhook_server:load(Name, Options, ReqOpts) of
|
case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of
|
||||||
{ok, ServerState} ->
|
{ok, ServerState} ->
|
||||||
save(Name, ServerState),
|
save(Name, ServerState),
|
||||||
?LOG(info, "Load exhook callback server "
|
?LOG(info, "Load exhook callback server "
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
|
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
|
||||||
|
|
||||||
%% Load/Unload
|
%% Load/Unload
|
||||||
-export([ load/3
|
-export([ load/4
|
||||||
, unload/1
|
, unload/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -81,8 +81,9 @@
|
||||||
%% Load/Unload APIs
|
%% Load/Unload APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} .
|
-spec load(atom(), emqx_exhook_mngr:server_options(), grpc_client:options(), emqx_exhook_mngr:hooks_options())
|
||||||
load(Name0, Opts0, ReqOpts) ->
|
-> {ok, server()} | {error, term()} .
|
||||||
|
load(Name0, Opts0, ReqOpts, HooksOpts) ->
|
||||||
Name = to_list(Name0),
|
Name = to_list(Name0),
|
||||||
{SvrAddr, ClientOpts} = channel_opts(Opts0),
|
{SvrAddr, ClientOpts} = channel_opts(Opts0),
|
||||||
case emqx_exhook_sup:start_grpc_client_channel(
|
case emqx_exhook_sup:start_grpc_client_channel(
|
||||||
|
@ -97,7 +98,7 @@ load(Name0, Opts0, ReqOpts) ->
|
||||||
io_lib:format("exhook.~s.", [Name])),
|
io_lib:format("exhook.~s.", [Name])),
|
||||||
ensure_metrics(Prefix, HookSpecs),
|
ensure_metrics(Prefix, HookSpecs),
|
||||||
%% Ensure hooks
|
%% Ensure hooks
|
||||||
ensure_hooks(HookSpecs),
|
ensure_hooks(HookSpecs, maps:get(hook_priority, HooksOpts, ?DEFAULT_HOOK_PRIORITY)),
|
||||||
{ok, #server{name = Name,
|
{ok, #server{name = Name,
|
||||||
options = ReqOpts,
|
options = ReqOpts,
|
||||||
channel = _ChannPoolPid,
|
channel = _ChannPoolPid,
|
||||||
|
@ -193,13 +194,13 @@ ensure_metrics(Prefix, HookSpecs) ->
|
||||||
|| Hookpoint <- maps:keys(HookSpecs)],
|
|| Hookpoint <- maps:keys(HookSpecs)],
|
||||||
lists:foreach(fun emqx_metrics:ensure/1, Keys).
|
lists:foreach(fun emqx_metrics:ensure/1, Keys).
|
||||||
|
|
||||||
ensure_hooks(HookSpecs) ->
|
ensure_hooks(HookSpecs, Priority) ->
|
||||||
lists:foreach(fun(Hookpoint) ->
|
lists:foreach(fun(Hookpoint) ->
|
||||||
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
|
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
|
||||||
false ->
|
false ->
|
||||||
?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]);
|
?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]);
|
||||||
{Hookpoint, {M, F, A}} ->
|
{Hookpoint, {M, F, A}} ->
|
||||||
emqx_hooks:put(Hookpoint, {M, F, A}),
|
emqx_hooks:put(Hookpoint, {M, F, A}, Priority),
|
||||||
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
|
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
|
||||||
end
|
end
|
||||||
end, maps:keys(HookSpecs)).
|
end, maps:keys(HookSpecs)).
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
-include("emqx_exhook.hrl").
|
||||||
|
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
, init/1
|
, init/1
|
||||||
]).
|
]).
|
||||||
|
@ -43,7 +45,7 @@ start_link() ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
Mngr = ?CHILD(emqx_exhook_mngr, worker,
|
Mngr = ?CHILD(emqx_exhook_mngr, worker,
|
||||||
[servers(), auto_reconnect(), request_options()]),
|
[servers(), auto_reconnect(), request_options(), hooks_options()]),
|
||||||
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
|
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
|
||||||
|
|
||||||
servers() ->
|
servers() ->
|
||||||
|
@ -57,6 +59,10 @@ request_options() ->
|
||||||
request_failed_action => env(request_failed_action, deny)
|
request_failed_action => env(request_failed_action, deny)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
hooks_options() ->
|
||||||
|
#{hook_priority => env(hook_priority, ?DEFAULT_HOOK_PRIORITY)
|
||||||
|
}.
|
||||||
|
|
||||||
env(Key, Def) ->
|
env(Key, Def) ->
|
||||||
application:get_env(emqx_exhook, Key, Def).
|
application:get_env(emqx_exhook, Key, Def).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue