feat: fix the hook priorities
This commit is contained in:
parent
4ef86b47c7
commit
39b1b20506
|
@ -0,0 +1,38 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Definitions for Hook Priorities
|
||||||
|
|
||||||
|
%% == Highest Priority
|
||||||
|
-define(HP_PSK, 1000).
|
||||||
|
-define(HP_REWRITE, 1000).
|
||||||
|
-define(HP_AUTHN, 990).
|
||||||
|
-define(HP_AUTHZ, 980).
|
||||||
|
-define(HP_SYS_MSGS, 960).
|
||||||
|
-define(HP_TOPIC_METRICS, 950).
|
||||||
|
-define(HP_RETAINER, 940).
|
||||||
|
-define(HP_AUTO_SUB, 930).
|
||||||
|
|
||||||
|
-define(HP_RULE_ENGINE, 900).
|
||||||
|
|
||||||
|
%% apps that can work with the republish action
|
||||||
|
-define(HP_SLOW_SUB, 980).
|
||||||
|
-define(HP_BRIDGE, 970).
|
||||||
|
-define(HP_DELAY_PUB, 960).
|
||||||
|
|
||||||
|
%% apps that can stop the hooks
|
||||||
|
-define(HP_EXHOOK, 100).
|
||||||
|
%% == Lowest Priority
|
|
@ -25,7 +25,7 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("emqx_authentication.hrl").
|
-include("emqx_authentication.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
|
||||||
-define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
-define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
||||||
|
@ -696,7 +696,7 @@ maybe_hook(#{hooked := false} = State) ->
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
_ = emqx:hook('client.authenticate', {?MODULE, authenticate, []}),
|
ok = emqx_hooks:put('client.authenticate', {?MODULE, authenticate, []}, ?HP_AUTHN),
|
||||||
State#{hooked => true};
|
State#{hooked => true};
|
||||||
false ->
|
false ->
|
||||||
State
|
State
|
||||||
|
@ -715,7 +715,7 @@ maybe_unhook(#{hooked := true} = State) ->
|
||||||
)
|
)
|
||||||
of
|
of
|
||||||
true ->
|
true ->
|
||||||
_ = emqx:unhook('client.authenticate', {?MODULE, authenticate, []}),
|
ok = emqx_hooks:del('client.authenticate', {?MODULE, authenticate, []}),
|
||||||
State#{hooked => false};
|
State#{hooked => false};
|
||||||
false ->
|
false ->
|
||||||
State
|
State
|
||||||
|
|
|
@ -121,7 +121,7 @@ callback_priority(#callback{priority = P}) -> P.
|
||||||
%% Hooks API
|
%% Hooks API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Register a callback
|
%% @doc `add/2,3,4` add a new hook, returns 'already_exists' if the hook exists.
|
||||||
-spec add(hookpoint(), action() | callback()) -> ok_or_error(already_exists).
|
-spec add(hookpoint(), action() | callback()) -> ok_or_error(already_exists).
|
||||||
add(HookPoint, Callback) when is_record(Callback, callback) ->
|
add(HookPoint, Callback) when is_record(Callback, callback) ->
|
||||||
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
|
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
|
||||||
|
@ -140,7 +140,7 @@ add(HookPoint, Action, Priority) when is_integer(Priority) ->
|
||||||
add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
|
add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
|
||||||
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
|
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
|
||||||
|
|
||||||
%% @doc Like add/2, it register a callback, discard 'already_exists' error.
|
%% @doc `put/2,3,4` updates the existing hook, add it if not exists.
|
||||||
-spec put(hookpoint(), action() | callback()) -> ok.
|
-spec put(hookpoint(), action() | callback()) -> ok.
|
||||||
put(HookPoint, Callback) when is_record(Callback, callback) ->
|
put(HookPoint, Callback) when is_record(Callback, callback) ->
|
||||||
case add(HookPoint, Callback) of
|
case add(HookPoint, Callback) of
|
||||||
|
@ -296,7 +296,7 @@ insert_hook(HookPoint, Callbacks) ->
|
||||||
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}),
|
ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}),
|
||||||
ok.
|
ok.
|
||||||
update_hook(HookPoint, Callbacks) ->
|
update_hook(HookPoint, Callbacks) ->
|
||||||
Ms = ets:fun2ms(fun({hook, K, V}) when K =:= HookPoint -> {hook, K, Callbacks} end),
|
Ms = ets:fun2ms(fun({hook, K, _V}) when K =:= HookPoint -> {hook, K, Callbacks} end),
|
||||||
ets:select_replace(emqx_hooks, Ms),
|
ets:select_replace(emqx_hooks, Ms),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
-include("emqx_hooks.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
|
@ -190,7 +191,7 @@ load_event_hooks(Events) ->
|
||||||
ok;
|
ok;
|
||||||
({K, true}) ->
|
({K, true}) ->
|
||||||
{HookPoint, Fun} = hook_and_fun(K),
|
{HookPoint, Fun} = hook_and_fun(K),
|
||||||
emqx_hooks:put(HookPoint, {?MODULE, Fun, []})
|
emqx_hooks:put(HookPoint, {?MODULE, Fun, []}, ?HP_SYS_MSGS)
|
||||||
end,
|
end,
|
||||||
Events
|
Events
|
||||||
).
|
).
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
-include("emqx_authz.hrl").
|
-include("emqx_authz.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
@ -100,7 +101,7 @@ init() ->
|
||||||
Sources = emqx_conf:get(?CONF_KEY_PATH, []),
|
Sources = emqx_conf:get(?CONF_KEY_PATH, []),
|
||||||
ok = check_dup_types(Sources),
|
ok = check_dup_types(Sources),
|
||||||
NSources = create_sources(Sources),
|
NSources = create_sources(Sources),
|
||||||
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1).
|
ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [NSources]}, ?HP_AUTHZ).
|
||||||
|
|
||||||
deinit() ->
|
deinit() ->
|
||||||
ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
|
ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}),
|
||||||
|
@ -501,7 +502,7 @@ get_source_by_type(Type, Sources) ->
|
||||||
|
|
||||||
%% @doc put hook with (maybe) initialized new source and old sources
|
%% @doc put hook with (maybe) initialized new source and old sources
|
||||||
update_authz_chain(Actions) ->
|
update_authz_chain(Actions) ->
|
||||||
emqx_hooks:put('client.authorize', {?MODULE, authorize, [Actions]}, -1).
|
emqx_hooks:put('client.authorize', {?MODULE, authorize, [Actions]}, ?HP_AUTHZ).
|
||||||
|
|
||||||
check_acl_file_rules(RawRules) ->
|
check_acl_file_rules(RawRules) ->
|
||||||
%% TODO: make sure the bin rules checked
|
%% TODO: make sure the bin rules checked
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
-module(emqx_auto_subscribe).
|
-module(emqx_auto_subscribe).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-define(HOOK_POINT, 'client.connected').
|
-define(HOOK_POINT, 'client.connected').
|
||||||
|
|
||||||
-define(MAX_AUTO_SUBSCRIBE, 20).
|
-define(MAX_AUTO_SUBSCRIBE, 20).
|
||||||
|
@ -114,5 +116,7 @@ update_hook() ->
|
||||||
|
|
||||||
update_hook(Config) ->
|
update_hook(Config) ->
|
||||||
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config),
|
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config),
|
||||||
emqx_hooks:put(?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}),
|
emqx_hooks:put(
|
||||||
|
?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}, ?HP_AUTO_SUB
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-behaviour(emqx_config_handler).
|
-behaviour(emqx_config_handler).
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-export([post_config_update/5]).
|
-export([post_config_update/5]).
|
||||||
|
@ -105,7 +106,7 @@ load_hook(Bridges) ->
|
||||||
|
|
||||||
do_load_hook(#{local_topic := _} = Conf) ->
|
do_load_hook(#{local_topic := _} = Conf) ->
|
||||||
case maps:get(direction, Conf, egress) of
|
case maps:get(direction, Conf, egress) of
|
||||||
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
|
egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE);
|
||||||
ingress -> ok
|
ingress -> ok
|
||||||
end;
|
end;
|
||||||
do_load_hook(_Conf) ->
|
do_load_hook(_Conf) ->
|
||||||
|
|
|
@ -317,7 +317,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
unload_exhooks() ->
|
unload_exhooks() ->
|
||||||
[
|
[
|
||||||
emqx:unhook(Name, {M, F})
|
emqx_hooks:del(Name, {M, F})
|
||||||
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
|
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
-include("emqx_exhook.hrl").
|
-include("emqx_exhook.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
%% The exhook proto version should be fixed as `v2` in EMQX v5.x
|
%% The exhook proto version should be fixed as `v2` in EMQX v5.x
|
||||||
%% to make sure the exhook proto version is compatible
|
%% to make sure the exhook proto version is compatible
|
||||||
|
@ -255,7 +256,7 @@ ensure_hooks(HookSpecs) ->
|
||||||
false ->
|
false ->
|
||||||
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
|
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
|
||||||
{Hookpoint, {M, F, A}} ->
|
{Hookpoint, {M, F, A}} ->
|
||||||
emqx_hooks:put(Hookpoint, {M, F, A}),
|
emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK),
|
||||||
ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
|
ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
-include_lib("emqx/include/types.hrl").
|
-include_lib("emqx/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
@ -379,7 +380,7 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
|
||||||
delayed_count() -> mnesia:table_info(?TAB, size).
|
delayed_count() -> mnesia:table_info(?TAB, size).
|
||||||
|
|
||||||
do_load_or_unload(true, State) ->
|
do_load_or_unload(true, State) ->
|
||||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
|
||||||
State;
|
State;
|
||||||
do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
|
do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
|
||||||
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([
|
-export([
|
||||||
|
@ -76,9 +77,9 @@ register_hook([]) ->
|
||||||
unregister_hook();
|
unregister_hook();
|
||||||
register_hook(Rules) ->
|
register_hook(Rules) ->
|
||||||
{PubRules, SubRules, ErrRules} = compile(Rules),
|
{PubRules, SubRules, ErrRules} = compile(Rules),
|
||||||
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
|
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, ?HP_REWRITE),
|
||||||
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
|
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, ?HP_REWRITE),
|
||||||
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}),
|
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, ?HP_REWRITE),
|
||||||
case ErrRules of
|
case ErrRules of
|
||||||
[] ->
|
[] ->
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
on_message_publish/1,
|
on_message_publish/1,
|
||||||
|
@ -106,9 +107,9 @@ max_limit() ->
|
||||||
?MAX_TOPICS.
|
?MAX_TOPICS.
|
||||||
|
|
||||||
enable() ->
|
enable() ->
|
||||||
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_TOPIC_METRICS),
|
||||||
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}),
|
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}, ?HP_TOPIC_METRICS),
|
||||||
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}).
|
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}, ?HP_TOPIC_METRICS).
|
||||||
|
|
||||||
disable() ->
|
disable() ->
|
||||||
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
load/0,
|
load/0,
|
||||||
|
@ -85,10 +86,10 @@ mnesia(boot) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
emqx:hook('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup, []}).
|
ok = emqx_hooks:put('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup, []}, ?HP_PSK).
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
emqx:unhook('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup}).
|
ok = emqx_hooks:del('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup}).
|
||||||
|
|
||||||
on_psk_lookup(PSKIdentity, _UserState) ->
|
on_psk_lookup(PSKIdentity, _UserState) ->
|
||||||
case mnesia:dirty_read(?TAB, PSKIdentity) of
|
case mnesia:dirty_read(?TAB, PSKIdentity) of
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-include("emqx_retainer.hrl").
|
-include("emqx_retainer.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
@ -423,13 +424,15 @@ close_resource(_) ->
|
||||||
|
|
||||||
-spec load(context()) -> ok.
|
-spec load(context()) -> ok.
|
||||||
load(Context) ->
|
load(Context) ->
|
||||||
_ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}),
|
ok = emqx_hooks:put(
|
||||||
_ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}),
|
'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER
|
||||||
|
),
|
||||||
|
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER),
|
||||||
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
|
emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
|
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
||||||
emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}),
|
ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}),
|
||||||
emqx_stats:cancel_update(emqx_retainer_stats),
|
emqx_stats:cancel_update(emqx_retainer_stats),
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-include("rule_engine.hrl").
|
-include("rule_engine.hrl").
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
reload/0,
|
reload/0,
|
||||||
|
@ -87,7 +88,9 @@ reload() ->
|
||||||
|
|
||||||
load(Topic) ->
|
load(Topic) ->
|
||||||
HookPoint = event_name(Topic),
|
HookPoint = event_name(Topic),
|
||||||
emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}).
|
emqx_hooks:put(
|
||||||
|
HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
|
||||||
|
).
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
|
@ -212,7 +213,7 @@ load(State) ->
|
||||||
threshold := Threshold
|
threshold := Threshold
|
||||||
} = emqx:get_config([slow_subs]),
|
} = emqx:get_config([slow_subs]),
|
||||||
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
|
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
|
||||||
_ = emqx:hook(
|
ok = emqx_hooks:put(
|
||||||
'delivery.completed',
|
'delivery.completed',
|
||||||
{?MODULE, on_delivery_completed, [
|
{?MODULE, on_delivery_completed, [
|
||||||
#{
|
#{
|
||||||
|
@ -220,14 +221,15 @@ load(State) ->
|
||||||
stats_type => StatsType,
|
stats_type => StatsType,
|
||||||
threshold => Threshold
|
threshold => Threshold
|
||||||
}
|
}
|
||||||
]}
|
]},
|
||||||
|
?HP_SLOW_SUB
|
||||||
),
|
),
|
||||||
|
|
||||||
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
||||||
State1#{enable := true, last_tick_at => ?NOW}.
|
State1#{enable := true, last_tick_at => ?NOW}.
|
||||||
|
|
||||||
unload(#{expire_timer := ExpireTimer} = State) ->
|
unload(#{expire_timer := ExpireTimer} = State) ->
|
||||||
emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
|
emqx_hooks:del('delivery.completed', {?MODULE, on_delivery_completed}),
|
||||||
State#{
|
State#{
|
||||||
enable := false,
|
enable := false,
|
||||||
expire_timer := cancel_timer(ExpireTimer)
|
expire_timer := cancel_timer(ExpireTimer)
|
||||||
|
|
Loading…
Reference in New Issue