feat(hooks): add config options to set explicit callback order
This commit is contained in:
parent
d1a5dcd222
commit
52652f6c96
|
@ -2494,6 +2494,52 @@ end}.
|
|||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
|
||||
{mapping, "broker.hook_order.$name.$hp_or_order", "emqx.hook_order", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
%% @doc Set explicit callback order for particular callbacks of particular hook points
|
||||
%%
|
||||
%% For example, to make JWT acl checks always preceed HTTP ACL checks, one may add:
|
||||
%%
|
||||
%% broker.hook_order.acl.hook_point = client.check_acl
|
||||
%% broker.hook_order.acl.1 = emqx_auth_jwt
|
||||
%% broker.hook_order.acl.2 = emqx_auth_http
|
||||
%%
|
||||
%% Callbacks with explicit ordering always have higher priority than the others.
|
||||
{translation, "emqx.hook_order", fun(Conf) ->
|
||||
Conf0 = cuttlefish_variable:filter_by_prefix("broker.hook_order", Conf),
|
||||
HookOrders = lists:foldl(
|
||||
fun({["broker", "hook_order", Name, "hook_point"], HookPoint}, AccHookOrders) ->
|
||||
HookOrder = maps:get(Name, AccHookOrders, #{}),
|
||||
NHookOrder = HookOrder#{hook_point => list_to_atom(HookPoint)},
|
||||
AccHookOrders#{Name => NHookOrder};
|
||||
({["broker", "hook_order", Name, NStr], HookModule}, AccHookOrders) ->
|
||||
case string:to_integer(NStr) of
|
||||
{N, ""} ->
|
||||
HookOrder = maps:get(Name, AccHookOrders, #{}),
|
||||
HookOrderMods = maps:get(mods, HookOrder, []),
|
||||
NHookOrder = HookOrder#{mods => [{list_to_atom(HookModule), N} | HookOrderMods]},
|
||||
AccHookOrders#{Name => NHookOrder};
|
||||
_ ->
|
||||
AccHookOrders
|
||||
end;
|
||||
(_, AccHookOrders) ->
|
||||
AccHookOrders
|
||||
end,
|
||||
#{},
|
||||
Conf0),
|
||||
SimplifiedHookOrders = lists:map(
|
||||
fun({Name, HookOrder}) ->
|
||||
HookPoint = maps:get(hook_point, HookOrder, list_to_atom(Name)),
|
||||
Mods = maps:get(mods, HookOrder, []),
|
||||
{HookPoint, maps:from_list(Mods)}
|
||||
end,
|
||||
maps:to_list(HookOrders)),
|
||||
maps:from_list(SimplifiedHookOrders)
|
||||
end}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% System Monitor
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -2,14 +2,16 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.22",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.21",
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
|
@ -26,7 +28,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.20",
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
@ -44,7 +47,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.19",
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
@ -63,7 +67,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.18",
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
|
@ -82,7 +87,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.17",
|
||||
[{add_module,emqx_secret},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -104,7 +110,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.16",
|
||||
[{add_module,emqx_secret},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -133,7 +140,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_topic,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.15",
|
||||
[{add_module,emqx_secret},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{add_module,emqx_secret},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -870,14 +878,16 @@
|
|||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.22",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.21",
|
||||
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
|
@ -893,7 +903,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.20",
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -910,7 +921,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.19",
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -928,7 +940,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.18",
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
|
@ -946,7 +959,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.17",
|
||||
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
||||
|
@ -967,7 +981,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_control,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.16",
|
||||
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
||||
|
@ -995,7 +1010,8 @@
|
|||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||
{delete_module,emqx_exclusive_subscription}]},
|
||||
{"4.3.15",
|
||||
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -235,14 +235,15 @@ lookup(HookPoint) ->
|
|||
|
||||
init([]) ->
|
||||
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
|
||||
{ok, #{}}.
|
||||
HookOrders = emqx:get_env(hook_order, #{}),
|
||||
{ok, #{hook_orders => HookOrders}}.
|
||||
|
||||
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
|
||||
Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of
|
||||
true ->
|
||||
{error, already_exists};
|
||||
false ->
|
||||
insert_hook(HookPoint, add_callback(Callback, Callbacks))
|
||||
insert_hook(HookPoint, add_callback(callback_orders(HookPoint), Callback, Callbacks))
|
||||
end,
|
||||
{reply, Reply, State};
|
||||
|
||||
|
@ -280,16 +281,18 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
insert_hook(HookPoint, Callbacks) ->
|
||||
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok.
|
||||
|
||||
add_callback(C, Callbacks) ->
|
||||
add_callback(C, Callbacks, []).
|
||||
add_callback(Orders, C, Callbacks) ->
|
||||
add_callback(Orders, C, Callbacks, []).
|
||||
|
||||
add_callback(C, [], Acc) ->
|
||||
add_callback(_Orders, C, [], Acc) ->
|
||||
lists:reverse([C|Acc]);
|
||||
add_callback(C1 = #callback{priority = P1}, [C2 = #callback{priority = P2}|More], Acc)
|
||||
when P1 =< P2 ->
|
||||
add_callback(C1, More, [C2|Acc]);
|
||||
add_callback(C1, More, Acc) ->
|
||||
lists:append(lists:reverse(Acc), [C1 | More]).
|
||||
add_callback(Orders, C1, [C2|More], Acc) ->
|
||||
case is_lower_priority(Orders, C1, C2) of
|
||||
true ->
|
||||
add_callback(Orders, C1, More, [C2|Acc]);
|
||||
false ->
|
||||
lists:append(lists:reverse(Acc), [C1, C2 | More])
|
||||
end.
|
||||
|
||||
del_callback(Action, Callbacks) ->
|
||||
del_callback(Action, Callbacks, []).
|
||||
|
@ -304,3 +307,44 @@ del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) ->
|
|||
del_callback(Func, Callbacks, Acc);
|
||||
del_callback(Action, [Callback | Callbacks], Acc) ->
|
||||
del_callback(Action, Callbacks, [Callback | Acc]).
|
||||
|
||||
|
||||
%% does A have lower priority than B?
|
||||
is_lower_priority(Orders,
|
||||
#callback{priority = PrA, action = ActA},
|
||||
#callback{priority = PrB, action = ActB}) ->
|
||||
OrdA = callback_order(Orders, ActA),
|
||||
OrdB = callback_order(Orders, ActB),
|
||||
case {OrdA, OrdB} of
|
||||
%% if both action positions are not specified, use priority
|
||||
{undefined, undefined} ->
|
||||
PrA =< PrB;
|
||||
%% actions with specified positions have higher priority
|
||||
{_OrdA, undefined} ->
|
||||
false;
|
||||
%% actions with specified positions have higher priority
|
||||
{undefined, _OrdB} ->
|
||||
true;
|
||||
%% if both action positions are specified, the last one has the lower priority
|
||||
_ ->
|
||||
OrdA >= OrdB
|
||||
end.
|
||||
|
||||
callback_orders(HookPoint) ->
|
||||
case hook_orders() of
|
||||
#{HookPoint := CallbackOrders} -> CallbackOrders;
|
||||
_ -> #{}
|
||||
end.
|
||||
|
||||
hook_orders() ->
|
||||
case emqx:get_env(hook_order, #{}) of
|
||||
Map when is_map(Map) -> Map;
|
||||
_ -> #{}
|
||||
end.
|
||||
|
||||
callback_order(Orders, {M, _F, _A}) ->
|
||||
case Orders of
|
||||
#{M := N} -> N;
|
||||
_ -> undefined
|
||||
end;
|
||||
callback_order(_Orders, _Action) -> undefined.
|
||||
|
|
|
@ -23,6 +23,12 @@
|
|||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
init_per_testcase(_Test, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_Test, _Config) ->
|
||||
_ = clear_orders().
|
||||
|
||||
% t_lookup(_) ->
|
||||
% error('TODO').
|
||||
|
||||
|
@ -107,6 +113,42 @@ t_uncovered_func(_) ->
|
|||
Pid ! test,
|
||||
ok = emqx_hooks:stop().
|
||||
|
||||
t_explicit_order(_) ->
|
||||
{ok, _} = emqx_hooks:start_link(),
|
||||
|
||||
ok = set_orders(hookpoint, [mod_a, mod_b]),
|
||||
|
||||
ok = emqx:hook(hookpoint, {mod_c, cb, []}, 5),
|
||||
ok = emqx:hook(hookpoint, {mod_d, cb, []}, 0),
|
||||
ok = emqx:hook(hookpoint, {mod_b, cb, []}, 0),
|
||||
ok = emqx:hook(hookpoint, {mod_a, cb, []}, -1),
|
||||
ok = emqx:hook(hookpoint, {mod_e, cb, []}, -1),
|
||||
|
||||
?assertEqual(
|
||||
[
|
||||
{mod_a, cb, []},
|
||||
{mod_b, cb, []},
|
||||
{mod_c, cb, []},
|
||||
{mod_d, cb, []},
|
||||
{mod_e, cb, []}
|
||||
],
|
||||
get_hookpoint_callbacks(hookpoint)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
set_orders(HookPoint, Mods) ->
|
||||
Orders = maps:from_list(lists:zip(Mods, lists:seq(0, length(Mods) - 1))),
|
||||
application:set_env(emqx, hook_order, #{HookPoint => Orders}).
|
||||
|
||||
clear_orders() ->
|
||||
application:set_env(emqx, hook_order, #{}).
|
||||
|
||||
get_hookpoint_callbacks(HookPoint) ->
|
||||
[emqx_hooks:callback_action(C) || C <- emqx_hooks:lookup(HookPoint)].
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Hook fun
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -140,4 +182,3 @@ hook_filter2(_, _Acc, _IntArg) -> false.
|
|||
hook_filter2_1(arg, _Acc, init_arg) -> true;
|
||||
hook_filter2_1(arg1, _Acc, init_arg) -> true;
|
||||
hook_filter2_1(_, _Acc, _IntArg) -> false.
|
||||
|
||||
|
|
Loading…
Reference in New Issue