Merge pull request #8221 from terry-xiaoyu/fix_delayed_module_disbled_after_emqx_stop
feat: fix the hook priorities
This commit is contained in:
commit
3b00b16abe
|
@ -0,0 +1,40 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Definitions for Hook Priorities
|
||||
|
||||
%% Highest Priority = 1000, don't change this value as the plugins may depend on it.
|
||||
-define(HP_HIGHEST, 1000).
|
||||
|
||||
%% hooks used by the emqx core app
|
||||
-define(HP_PSK, 990).
|
||||
-define(HP_REWRITE, 980).
|
||||
-define(HP_AUTHN, 970).
|
||||
-define(HP_AUTHZ, 960).
|
||||
-define(HP_SYS_MSGS, 950).
|
||||
-define(HP_TOPIC_METRICS, 940).
|
||||
-define(HP_RETAINER, 930).
|
||||
-define(HP_AUTO_SUB, 920).
|
||||
-define(HP_RULE_ENGINE, 900).
|
||||
%% apps that can work with the republish action
|
||||
-define(HP_SLOW_SUB, 880).
|
||||
-define(HP_BRIDGE, 870).
|
||||
-define(HP_DELAY_PUB, 860).
|
||||
%% apps that can stop the hooks chain from continuing
|
||||
-define(HP_EXHOOK, 100).
|
||||
|
||||
%% == Lowest Priority = 0, don't change this value as the plugins may depend on it.
|
||||
-define(HP_LOWEST, 0).
|
|
@ -49,10 +49,6 @@
|
|||
|
||||
%% Hooks API
|
||||
-export([
|
||||
hook/2,
|
||||
hook/3,
|
||||
hook/4,
|
||||
unhook/2,
|
||||
run_hook/2,
|
||||
run_fold_hook/3
|
||||
]).
|
||||
|
@ -160,31 +156,6 @@ subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Hooks API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec hook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok | {error, already_exists}.
|
||||
hook(HookPoint, Action) ->
|
||||
emqx_hooks:add(HookPoint, Action).
|
||||
|
||||
-spec hook(
|
||||
emqx_hooks:hookpoint(),
|
||||
emqx_hooks:action(),
|
||||
emqx_hooks:filter() | integer() | list()
|
||||
) ->
|
||||
ok | {error, already_exists}.
|
||||
hook(HookPoint, Action, Priority) when is_integer(Priority) ->
|
||||
emqx_hooks:add(HookPoint, Action, Priority);
|
||||
hook(HookPoint, Action, {_M, _F, _A} = Filter) ->
|
||||
emqx_hooks:add(HookPoint, Action, Filter).
|
||||
|
||||
-spec hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter(), integer()) ->
|
||||
ok | {error, already_exists}.
|
||||
hook(HookPoint, Action, Filter, Priority) ->
|
||||
emqx_hooks:add(HookPoint, Action, Filter, Priority).
|
||||
|
||||
-spec unhook(emqx_hooks:hookpoint(), emqx_hooks:action() | {module(), atom()}) -> ok.
|
||||
unhook(HookPoint, Action) ->
|
||||
emqx_hooks:del(HookPoint, Action).
|
||||
|
||||
-spec run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop.
|
||||
run_hook(HookPoint, Args) ->
|
||||
emqx_hooks:run(HookPoint, Args).
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("emqx_authentication.hrl").
|
||||
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
|
||||
-define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
||||
|
@ -698,7 +698,7 @@ maybe_hook(#{hooked := false} = State) ->
|
|||
)
|
||||
of
|
||||
true ->
|
||||
_ = emqx:hook('client.authenticate', {?MODULE, authenticate, []}),
|
||||
ok = emqx_hooks:put('client.authenticate', {?MODULE, authenticate, []}, ?HP_AUTHN),
|
||||
State#{hooked => true};
|
||||
false ->
|
||||
State
|
||||
|
@ -717,7 +717,7 @@ maybe_unhook(#{hooked := true} = State) ->
|
|||
)
|
||||
of
|
||||
true ->
|
||||
_ = emqx:unhook('client.authenticate', {?MODULE, authenticate, []}),
|
||||
ok = emqx_hooks:del('client.authenticate', {?MODULE, authenticate, []}),
|
||||
State#{hooked => false};
|
||||
false ->
|
||||
State
|
||||
|
|
|
@ -29,10 +29,8 @@
|
|||
|
||||
%% Hooks API
|
||||
-export([
|
||||
add/2,
|
||||
add/3,
|
||||
add/4,
|
||||
put/2,
|
||||
put/3,
|
||||
put/4,
|
||||
del/2,
|
||||
|
@ -121,44 +119,34 @@ callback_priority(#callback{priority = P}) -> P.
|
|||
%% Hooks API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Register a callback
|
||||
-spec add(hookpoint(), action() | callback()) -> ok_or_error(already_exists).
|
||||
add(HookPoint, Callback) when is_record(Callback, callback) ->
|
||||
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
|
||||
add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
|
||||
add(HookPoint, #callback{action = Action, priority = 0}).
|
||||
|
||||
-spec add(hookpoint(), action(), filter() | integer() | list()) ->
|
||||
%% @doc `add/3,4` add a new hook, returns 'already_exists' if the hook exists.
|
||||
-spec add(hookpoint(), action(), integer()) ->
|
||||
ok_or_error(already_exists).
|
||||
add(HookPoint, Action, {_M, _F, _A} = Filter) ->
|
||||
add(HookPoint, #callback{action = Action, filter = Filter, priority = 0});
|
||||
add(HookPoint, Action, Priority) when is_integer(Priority) ->
|
||||
add(HookPoint, #callback{action = Action, priority = Priority}).
|
||||
do_add(HookPoint, #callback{action = Action, priority = Priority}).
|
||||
|
||||
-spec add(hookpoint(), action(), filter(), integer()) ->
|
||||
-spec add(hookpoint(), action(), integer(), filter()) ->
|
||||
ok_or_error(already_exists).
|
||||
add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
|
||||
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
|
||||
add(HookPoint, Action, Priority, Filter) when is_integer(Priority) ->
|
||||
do_add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
|
||||
|
||||
%% @doc Like add/2, it register a callback, discard 'already_exists' error.
|
||||
-spec put(hookpoint(), action() | callback()) -> ok.
|
||||
put(HookPoint, Callback) when is_record(Callback, callback) ->
|
||||
case add(HookPoint, Callback) of
|
||||
do_add(HookPoint, Callback) ->
|
||||
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity).
|
||||
|
||||
%% @doc `put/3,4` updates the existing hook, add it if not exists.
|
||||
-spec put(hookpoint(), action(), integer()) -> ok.
|
||||
put(HookPoint, Action, Priority) when is_integer(Priority) ->
|
||||
do_put(HookPoint, #callback{action = Action, priority = Priority}).
|
||||
|
||||
-spec put(hookpoint(), action(), integer(), filter()) -> ok.
|
||||
put(HookPoint, Action, Priority, Filter) when is_integer(Priority) ->
|
||||
do_put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
|
||||
|
||||
do_put(HookPoint, Callback) ->
|
||||
case do_add(HookPoint, Callback) of
|
||||
ok -> ok;
|
||||
{error, already_exists} -> gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity)
|
||||
end;
|
||||
put(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
|
||||
?MODULE:put(HookPoint, #callback{action = Action, priority = 0}).
|
||||
|
||||
-spec put(hookpoint(), action(), filter() | integer() | list()) -> ok.
|
||||
put(HookPoint, Action, {_M, _F, _A} = Filter) ->
|
||||
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = 0});
|
||||
put(HookPoint, Action, Priority) when is_integer(Priority) ->
|
||||
?MODULE:put(HookPoint, #callback{action = Action, priority = Priority}).
|
||||
|
||||
-spec put(hookpoint(), action(), filter(), integer()) -> ok.
|
||||
put(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
|
||||
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
|
||||
end.
|
||||
|
||||
%% @doc Unregister a callback.
|
||||
-spec del(hookpoint(), action() | {module(), atom()}) -> ok.
|
||||
|
@ -296,7 +284,7 @@ insert_hook(HookPoint, Callbacks) ->
|
|||
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}),
|
||||
ok.
|
||||
update_hook(HookPoint, Callbacks) ->
|
||||
Ms = ets:fun2ms(fun({hook, K, V}) when K =:= HookPoint -> {hook, K, Callbacks} end),
|
||||
Ms = ets:fun2ms(fun({hook, K, _V}) when K =:= HookPoint -> {hook, K, Callbacks} end),
|
||||
ets:select_replace(emqx_hooks, Ms),
|
||||
ok.
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-include("emqx.hrl").
|
||||
-include("types.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("emqx_hooks.hrl").
|
||||
|
||||
-export([
|
||||
start_link/0,
|
||||
|
@ -190,7 +191,7 @@ load_event_hooks(Events) ->
|
|||
ok;
|
||||
({K, true}) ->
|
||||
{HookPoint, Fun} = hook_and_fun(K),
|
||||
emqx_hooks:put(HookPoint, {?MODULE, Fun, []})
|
||||
emqx_hooks:put(HookPoint, {?MODULE, Fun, []}, ?HP_SYS_MSGS)
|
||||
end,
|
||||
Events
|
||||
).
|
||||
|
|
|
@ -95,52 +95,54 @@ t_emqx_pubsub_api(_) ->
|
|||
?assertEqual([], emqx:topics()).
|
||||
|
||||
t_hook_unhook(_) ->
|
||||
ok = emqx:hook(test_hook, {?MODULE, hook_fun1, []}),
|
||||
ok = emqx:hook(test_hook, {?MODULE, hook_fun2, []}),
|
||||
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0),
|
||||
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0),
|
||||
?assertEqual(
|
||||
{error, already_exists},
|
||||
emqx:hook(test_hook, {?MODULE, hook_fun2, []})
|
||||
emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0)
|
||||
),
|
||||
ok = emqx:unhook(test_hook, {?MODULE, hook_fun1}),
|
||||
ok = emqx:unhook(test_hook, {?MODULE, hook_fun2}),
|
||||
ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun1}),
|
||||
ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun2}),
|
||||
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun8, []}, 8),
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 2),
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun10, []}, 10),
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun9, []}, 9),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}).
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun8, []}, 8),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun2, []}, 2),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun10, []}, 10),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun9, []}, 9),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun2, []}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun8, []}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun9, []}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun10, []}).
|
||||
|
||||
t_run_hook(_) ->
|
||||
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}),
|
||||
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun4, [init]}),
|
||||
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun5, [init]}),
|
||||
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0),
|
||||
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0),
|
||||
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0),
|
||||
[r5, r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []),
|
||||
[] = emqx:run_fold_hook(unknown_hook, [], []),
|
||||
|
||||
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun9, []}),
|
||||
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
|
||||
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0),
|
||||
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0),
|
||||
[r10] = emqx:run_fold_hook(foldl_hook2, [arg], []),
|
||||
|
||||
ok = emqx:hook(foreach_hook, {?MODULE, hook_fun6, [initArg]}),
|
||||
{error, already_exists} = emqx:hook(foreach_hook, {?MODULE, hook_fun6, [initArg]}),
|
||||
ok = emqx:hook(foreach_hook, {?MODULE, hook_fun7, [initArg]}),
|
||||
ok = emqx:hook(foreach_hook, {?MODULE, hook_fun8, [initArg]}),
|
||||
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
|
||||
{error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
|
||||
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0),
|
||||
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0),
|
||||
ok = emqx:run_hook(foreach_hook, [arg]),
|
||||
|
||||
ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
|
||||
ok = emqx_hooks:add(
|
||||
foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []}
|
||||
),
|
||||
%% filter passed
|
||||
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])),
|
||||
%% filter failed
|
||||
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])),
|
||||
|
||||
ok = emqx:hook(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}
|
||||
ok = emqx_hooks:add(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]}
|
||||
),
|
||||
ok = emqx:hook(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}
|
||||
ok = emqx_hooks:add(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]}
|
||||
),
|
||||
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
|
||||
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)).
|
||||
|
|
|
@ -66,7 +66,7 @@ add_hook_order_prop() ->
|
|||
hooks(),
|
||||
try
|
||||
{ok, _} = emqx_hooks:start_link(),
|
||||
[ok = emqx:hook(prop_hook, {M, F, []}, Prio) || {Prio, M, F} <- Hooks],
|
||||
[ok = emqx_hooks:add(prop_hook, {M, F, []}, Prio) || {Prio, M, F} <- Hooks],
|
||||
Callbacks = emqx_hooks:lookup(prop_hook),
|
||||
Order = [{Prio, M, F} || {callback, {M, F, _}, _Filter, Prio} <- Callbacks],
|
||||
?assertEqual(
|
||||
|
@ -93,11 +93,11 @@ hooks() ->
|
|||
|
||||
t_add_put_del_hook(_) ->
|
||||
{ok, _} = emqx_hooks:start_link(),
|
||||
ok = emqx:hook(test_hook, {?MODULE, hook_fun1, []}),
|
||||
ok = emqx:hook(test_hook, {?MODULE, hook_fun2, []}),
|
||||
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0),
|
||||
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0),
|
||||
?assertEqual(
|
||||
{error, already_exists},
|
||||
emqx:hook(test_hook, {?MODULE, hook_fun2, []})
|
||||
emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0)
|
||||
),
|
||||
Callbacks0 = [
|
||||
{callback, {?MODULE, hook_fun1, []}, undefined, 0},
|
||||
|
@ -105,23 +105,23 @@ t_add_put_del_hook(_) ->
|
|||
],
|
||||
?assertEqual(Callbacks0, emqx_hooks:lookup(test_hook)),
|
||||
|
||||
ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun1, [test]}),
|
||||
ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun2, [test]}),
|
||||
ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun1, [test]}, 0),
|
||||
ok = emqx_hooks:put(test_hook, {?MODULE, hook_fun2, [test]}, 0),
|
||||
Callbacks1 = [
|
||||
{callback, {?MODULE, hook_fun1, [test]}, undefined, 0},
|
||||
{callback, {?MODULE, hook_fun2, [test]}, undefined, 0}
|
||||
],
|
||||
?assertEqual(Callbacks1, emqx_hooks:lookup(test_hook)),
|
||||
|
||||
ok = emqx:unhook(test_hook, {?MODULE, hook_fun1}),
|
||||
ok = emqx:unhook(test_hook, {?MODULE, hook_fun2}),
|
||||
ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun1}),
|
||||
ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun2}),
|
||||
timer:sleep(200),
|
||||
?assertEqual([], emqx_hooks:lookup(test_hook)),
|
||||
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun8, []}, 8),
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 2),
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun10, []}, 10),
|
||||
ok = emqx:hook(emqx_hook, {?MODULE, hook_fun9, []}, 9),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun8, []}, 8),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun2, []}, 2),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun10, []}, 10),
|
||||
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun9, []}, 9),
|
||||
Callbacks2 = [
|
||||
{callback, {?MODULE, hook_fun10, []}, undefined, 10},
|
||||
{callback, {?MODULE, hook_fun9, []}, undefined, 9},
|
||||
|
@ -142,43 +142,45 @@ t_add_put_del_hook(_) ->
|
|||
],
|
||||
?assertEqual(Callbacks3, emqx_hooks:lookup(emqx_hook)),
|
||||
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, [test]}),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, [test]}),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, [test]}),
|
||||
ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, [test]}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun2, [test]}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun8, [test]}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun9, [test]}),
|
||||
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun10, [test]}),
|
||||
timer:sleep(200),
|
||||
?assertEqual([], emqx_hooks:lookup(emqx_hook)).
|
||||
|
||||
t_run_hooks(_) ->
|
||||
{ok, _} = emqx_hooks:start_link(),
|
||||
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun3, [init]}),
|
||||
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun4, [init]}),
|
||||
ok = emqx:hook(foldl_hook, {?MODULE, hook_fun5, [init]}),
|
||||
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0),
|
||||
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0),
|
||||
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0),
|
||||
[r5, r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []),
|
||||
[] = emqx:run_fold_hook(unknown_hook, [], []),
|
||||
|
||||
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun9, []}),
|
||||
ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
|
||||
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0),
|
||||
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0),
|
||||
%% Note: 10 is _less_ than 9 per lexicographic order
|
||||
[r10] = emqx:run_fold_hook(foldl_hook2, [arg], []),
|
||||
|
||||
ok = emqx:hook(foreach_hook, {?MODULE, hook_fun6, [initArg]}),
|
||||
{error, already_exists} = emqx:hook(foreach_hook, {?MODULE, hook_fun6, [initArg]}),
|
||||
ok = emqx:hook(foreach_hook, {?MODULE, hook_fun7, [initArg]}),
|
||||
ok = emqx:hook(foreach_hook, {?MODULE, hook_fun8, [initArg]}),
|
||||
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
|
||||
{error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
|
||||
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0),
|
||||
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0),
|
||||
ok = emqx:run_hook(foreach_hook, [arg]),
|
||||
|
||||
ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
|
||||
ok = emqx_hooks:add(
|
||||
foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []}
|
||||
),
|
||||
%% filter passed
|
||||
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])),
|
||||
%% filter failed
|
||||
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])),
|
||||
|
||||
ok = emqx:hook(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}
|
||||
ok = emqx_hooks:add(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]}
|
||||
),
|
||||
ok = emqx:hook(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}
|
||||
ok = emqx_hooks:add(
|
||||
foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]}
|
||||
),
|
||||
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
|
||||
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)).
|
||||
|
|
|
@ -114,7 +114,7 @@ do_mock(mria_mnesia) ->
|
|||
do_mock(emqx_metrics) ->
|
||||
meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end);
|
||||
do_mock(emqx_hooks) ->
|
||||
meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA) -> ok end),
|
||||
meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA, _) -> ok end),
|
||||
meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end);
|
||||
do_mock(emqx_config_handler) ->
|
||||
meck:expect(emqx_config_handler, add_handler, fun(_, _) -> ok end).
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
-include("emqx_authz.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-compile(export_all).
|
||||
|
@ -100,7 +101,7 @@ init() ->
|
|||
Sources = emqx_conf:get(?CONF_KEY_PATH, []),
|
||||
ok = check_dup_types(Sources),
|
||||
NSources = create_sources(Sources),
|
||||
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1).
|
||||
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [NSources]}, ?HP_AUTHZ).
|
||||
|
||||
deinit() ->
|
||||
ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
|
||||
|
@ -501,7 +502,7 @@ get_source_by_type(Type, Sources) ->
|
|||
|
||||
%% @doc put hook with (maybe) initialized new source and old sources
|
||||
update_authz_chain(Actions) ->
|
||||
emqx_hooks:put('client.authorize', {?MODULE, authorize, [Actions]}, -1).
|
||||
emqx_hooks:put('client.authorize', {?MODULE, authorize, [Actions]}, ?HP_AUTHZ).
|
||||
|
||||
check_acl_file_rules(RawRules) ->
|
||||
%% TODO: make sure the bin rules checked
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_auto_subscribe).
|
||||
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-define(HOOK_POINT, 'client.connected').
|
||||
|
||||
-define(MAX_AUTO_SUBSCRIBE, 20).
|
||||
|
@ -114,5 +116,7 @@ update_hook() ->
|
|||
|
||||
update_hook(Config) ->
|
||||
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config),
|
||||
emqx_hooks:put(?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}),
|
||||
emqx_hooks:put(
|
||||
?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}, ?HP_AUTO_SUB
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(emqx_config_handler).
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-export([post_config_update/5]).
|
||||
|
@ -105,7 +106,7 @@ load_hook(Bridges) ->
|
|||
|
||||
do_load_hook(#{local_topic := _} = Conf) ->
|
||||
case maps:get(direction, Conf, egress) of
|
||||
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
|
||||
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
|
||||
ingress -> ok
|
||||
end;
|
||||
do_load_hook(_Conf) ->
|
||||
|
|
|
@ -317,7 +317,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
unload_exhooks() ->
|
||||
[
|
||||
emqx:unhook(Name, {M, F})
|
||||
emqx_hooks:del(Name, {M, F})
|
||||
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
|
||||
].
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
-include("emqx_exhook.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
%% The exhook proto version should be fixed as `v2` in EMQX v5.x
|
||||
%% to make sure the exhook proto version is compatible
|
||||
|
@ -255,7 +256,7 @@ ensure_hooks(HookSpecs) ->
|
|||
false ->
|
||||
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
|
||||
{Hookpoint, {M, F, A}} ->
|
||||
emqx_hooks:put(Hookpoint, {M, F, A}),
|
||||
emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK),
|
||||
ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
|
||||
end
|
||||
end,
|
||||
|
|
|
@ -330,8 +330,8 @@ t_clients_get_subscription_api(_) ->
|
|||
|
||||
t_on_offline_event(_) ->
|
||||
Fun = fun(Channel) ->
|
||||
emqx_hooks:add('client.connected', {emqx_sys, on_client_connected, []}),
|
||||
emqx_hooks:add('client.disconnected', {emqx_sys, on_client_disconnected, []}),
|
||||
emqx_hooks:add('client.connected', {emqx_sys, on_client_connected, []}, 1000),
|
||||
emqx_hooks:add('client.disconnected', {emqx_sys, on_client_disconnected, []}, 1000),
|
||||
|
||||
ConnectedSub = <<"$SYS/brokers/+/gateway/coap/clients/+/connected">>,
|
||||
emqx_broker:subscribe(ConnectedSub),
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-import(
|
||||
emqx_exproto_echo_svr,
|
||||
[
|
||||
|
@ -262,8 +264,8 @@ t_hook_connected_disconnected(Cfg) ->
|
|||
ConnAckBin = frame_connack(0),
|
||||
|
||||
Parent = self(),
|
||||
emqx:hook('client.connected', {?MODULE, hook_fun1, [Parent]}),
|
||||
emqx:hook('client.disconnected', {?MODULE, hook_fun2, [Parent]}),
|
||||
emqx_hooks:add('client.connected', {?MODULE, hook_fun1, [Parent]}, 1000),
|
||||
emqx_hooks:add('client.disconnected', {?MODULE, hook_fun2, [Parent]}, 1000),
|
||||
|
||||
send(Sock, ConnBin),
|
||||
{ok, ConnAckBin} = recv(Sock, 5000),
|
||||
|
@ -287,8 +289,8 @@ t_hook_connected_disconnected(Cfg) ->
|
|||
begin
|
||||
{error, closed} = recv(Sock, 5000)
|
||||
end,
|
||||
emqx:unhook('client.connected', {?MODULE, hook_fun1}),
|
||||
emqx:unhook('client.disconnected', {?MODULE, hook_fun2}).
|
||||
emqx_hooks:del('client.connected', {?MODULE, hook_fun1}),
|
||||
emqx_hooks:del('client.disconnected', {?MODULE, hook_fun2}).
|
||||
|
||||
t_hook_session_subscribed_unsubscribed(Cfg) ->
|
||||
SockType = proplists:get_value(listener_type, Cfg),
|
||||
|
@ -308,8 +310,8 @@ t_hook_session_subscribed_unsubscribed(Cfg) ->
|
|||
{ok, ConnAckBin} = recv(Sock, 5000),
|
||||
|
||||
Parent = self(),
|
||||
emqx:hook('session.subscribed', {?MODULE, hook_fun3, [Parent]}),
|
||||
emqx:hook('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}),
|
||||
emqx_hooks:add('session.subscribed', {?MODULE, hook_fun3, [Parent]}, 1000),
|
||||
emqx_hooks:add('session.unsubscribed', {?MODULE, hook_fun4, [Parent]}, 1000),
|
||||
|
||||
SubBin = frame_subscribe(<<"t/#">>, 1),
|
||||
SubAckBin = frame_suback(0),
|
||||
|
@ -336,8 +338,8 @@ t_hook_session_subscribed_unsubscribed(Cfg) ->
|
|||
end,
|
||||
|
||||
close(Sock),
|
||||
emqx:unhook('session.subscribed', {?MODULE, hook_fun3}),
|
||||
emqx:unhook('session.unsubscribed', {?MODULE, hook_fun4}).
|
||||
emqx_hooks:del('session.subscribed', {?MODULE, hook_fun3}),
|
||||
emqx_hooks:del('session.unsubscribed', {?MODULE, hook_fun4}).
|
||||
|
||||
t_hook_message_delivered(Cfg) ->
|
||||
SockType = proplists:get_value(listener_type, Cfg),
|
||||
|
@ -362,14 +364,14 @@ t_hook_message_delivered(Cfg) ->
|
|||
send(Sock, SubBin),
|
||||
{ok, SubAckBin} = recv(Sock, 5000),
|
||||
|
||||
emqx:hook('message.delivered', {?MODULE, hook_fun5, []}),
|
||||
emqx_hooks:add('message.delivered', {?MODULE, hook_fun5, []}, 1000),
|
||||
|
||||
emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)),
|
||||
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
|
||||
{ok, PubBin1} = recv(Sock, 5000),
|
||||
|
||||
close(Sock),
|
||||
emqx:unhook('message.delivered', {?MODULE, hook_fun5}).
|
||||
emqx_hooks:del('message.delivered', {?MODULE, hook_fun5}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Utils
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
-include_lib("emqx/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
@ -379,7 +380,7 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
|
|||
delayed_count() -> mnesia:table_info(?TAB, size).
|
||||
|
||||
do_load_or_unload(true, State) ->
|
||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
|
||||
State;
|
||||
do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
|
||||
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([
|
||||
|
@ -76,9 +77,9 @@ register_hook([]) ->
|
|||
unregister_hook();
|
||||
register_hook(Rules) ->
|
||||
{PubRules, SubRules, ErrRules} = compile(Rules),
|
||||
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
|
||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
|
||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}),
|
||||
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, ?HP_REWRITE),
|
||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, ?HP_REWRITE),
|
||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, ?HP_REWRITE),
|
||||
case ErrRules of
|
||||
[] ->
|
||||
ok;
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-export([
|
||||
on_message_publish/1,
|
||||
|
@ -106,9 +107,9 @@ max_limit() ->
|
|||
?MAX_TOPICS.
|
||||
|
||||
enable() ->
|
||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
||||
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}),
|
||||
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}).
|
||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_TOPIC_METRICS),
|
||||
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}, ?HP_TOPIC_METRICS),
|
||||
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}, ?HP_TOPIC_METRICS).
|
||||
|
||||
disable() ->
|
||||
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||
|
|
|
@ -65,6 +65,7 @@ end_per_testcase(_Case, _Config) ->
|
|||
|
||||
t_enable_disable_case(_) ->
|
||||
emqx_delayed:unload(),
|
||||
timer:sleep(100),
|
||||
Hooks = emqx_hooks:lookup('message.publish'),
|
||||
MFA = {emqx_delayed, on_message_publish, []},
|
||||
?assertEqual(false, lists:keyfind(MFA, 2, Hooks)),
|
||||
|
@ -81,6 +82,7 @@ t_enable_disable_case(_) ->
|
|||
?assertMatch(#{data := Datas} when Datas =/= [], emqx_delayed:list(#{})),
|
||||
|
||||
emqx_delayed:unload(),
|
||||
timer:sleep(100),
|
||||
?assertEqual(false, lists:keyfind(MFA, 2, Hooks)),
|
||||
?assertMatch(#{data := []}, emqx_delayed:list(#{})),
|
||||
ok.
|
||||
|
@ -188,6 +190,7 @@ t_unknown_messages(_) ->
|
|||
).
|
||||
|
||||
t_get_basic_usage_info(_Config) ->
|
||||
emqx:update_config([delayed, max_delayed_messages], 10000),
|
||||
?assertEqual(#{delayed_message_count => 0}, emqx_delayed:get_basic_usage_info()),
|
||||
lists:foreach(
|
||||
fun(N) ->
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0-rc.1").
|
||||
-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0.0-rc.3").
|
||||
-define(EMQX_ELIXIR_PLUGIN_TEMPLATE_VSN, "0.1.0").
|
||||
-define(PACKAGE_SUFFIX, ".tar.gz").
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-export([
|
||||
load/0,
|
||||
|
@ -85,10 +86,10 @@ mnesia(boot) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
load() ->
|
||||
emqx:hook('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup, []}).
|
||||
ok = emqx_hooks:put('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup, []}, ?HP_PSK).
|
||||
|
||||
unload() ->
|
||||
emqx:unhook('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup}).
|
||||
ok = emqx_hooks:del('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup}).
|
||||
|
||||
on_psk_lookup(PSKIdentity, _UserState) ->
|
||||
case mnesia:dirty_read(?TAB, PSKIdentity) of
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
-include("emqx_retainer.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
|
@ -428,13 +429,15 @@ close_resource(_) ->
|
|||
|
||||
-spec load(context()) -> ok.
|
||||
load(Context) ->
|
||||
_ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}),
|
||||
_ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}),
|
||||
ok = emqx_hooks:put(
|
||||
'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER
|
||||
),
|
||||
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER),
|
||||
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
|
||||
ok.
|
||||
|
||||
unload() ->
|
||||
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
|
||||
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
|
||||
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||
ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}),
|
||||
emqx_stats:cancel_update(emqx_retainer_stats),
|
||||
ok.
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-include("rule_engine.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-export([
|
||||
reload/0,
|
||||
|
@ -87,7 +88,9 @@ reload() ->
|
|||
|
||||
load(Topic) ->
|
||||
HookPoint = event_name(Topic),
|
||||
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}).
|
||||
emqx_hooks:put(
|
||||
HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
|
||||
).
|
||||
|
||||
unload() ->
|
||||
lists:foreach(
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||
|
||||
-export([
|
||||
start_link/0,
|
||||
|
@ -212,7 +213,7 @@ load(State) ->
|
|||
threshold := Threshold
|
||||
} = emqx:get_config([slow_subs]),
|
||||
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
|
||||
_ = emqx:hook(
|
||||
ok = emqx_hooks:put(
|
||||
'delivery.completed',
|
||||
{?MODULE, on_delivery_completed, [
|
||||
#{
|
||||
|
@ -220,14 +221,15 @@ load(State) ->
|
|||
stats_type => StatsType,
|
||||
threshold => Threshold
|
||||
}
|
||||
]}
|
||||
]},
|
||||
?HP_SLOW_SUB
|
||||
),
|
||||
|
||||
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
||||
State1#{enable := true, last_tick_at => ?NOW}.
|
||||
|
||||
unload(#{expire_timer := ExpireTimer} = State) ->
|
||||
emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
|
||||
emqx_hooks:del('delivery.completed', {?MODULE, on_delivery_completed}),
|
||||
State#{
|
||||
enable := false,
|
||||
expire_timer := cancel_timer(ExpireTimer)
|
||||
|
|
|
@ -27,6 +27,11 @@
|
|||
|
||||
-define(CONF_KEY_PATH, [license]).
|
||||
|
||||
%% Give the license app the highest priority.
|
||||
%% We don't define it in the emqx_hooks.hrl becasue that is an opensource code
|
||||
%% and can be changed by the communitiy.
|
||||
-define(HP_LICENSE, 2000).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -114,7 +119,7 @@ post_config_update(_Path, _Cmd, NewConf, _Old, _AppEnvs) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
add_license_hook() ->
|
||||
ok = emqx_hooks:put('client.connect', {?MODULE, check, []}).
|
||||
ok = emqx_hooks:put('client.connect', {?MODULE, check, []}, ?HP_LICENSE).
|
||||
|
||||
del_license_hook() ->
|
||||
_ = emqx_hooks:del('client.connect', {?MODULE, check, []}),
|
||||
|
|
Loading…
Reference in New Issue