From 39b1b2050681a537527ac22dc636b03e64f5ffde Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 15 Jun 2022 19:03:40 +0800 Subject: [PATCH] feat: fix the hook priorities --- apps/emqx/include/emqx_hooks.hrl | 38 +++++++++++++++++++ apps/emqx/src/emqx_authentication.erl | 6 +-- apps/emqx/src/emqx_hooks.erl | 6 +-- apps/emqx/src/emqx_sys.erl | 3 +- apps/emqx_authz/src/emqx_authz.erl | 5 ++- .../src/emqx_auto_subscribe.erl | 6 ++- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- apps/emqx_exhook/src/emqx_exhook_mgr.erl | 2 +- apps/emqx_exhook/src/emqx_exhook_server.erl | 3 +- apps/emqx_modules/src/emqx_delayed.erl | 3 +- apps/emqx_modules/src/emqx_rewrite.erl | 7 ++-- apps/emqx_modules/src/emqx_topic_metrics.erl | 7 ++-- apps/emqx_psk/src/emqx_psk.erl | 5 ++- apps/emqx_retainer/src/emqx_retainer.erl | 11 ++++-- .../emqx_rule_engine/src/emqx_rule_events.erl | 5 ++- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 8 ++-- 16 files changed, 88 insertions(+), 30 deletions(-) create mode 100644 apps/emqx/include/emqx_hooks.hrl diff --git a/apps/emqx/include/emqx_hooks.hrl b/apps/emqx/include/emqx_hooks.hrl new file mode 100644 index 000000000..08fa07bc8 --- /dev/null +++ b/apps/emqx/include/emqx_hooks.hrl @@ -0,0 +1,38 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% Definitions for Hook Priorities + +%% == Highest Priority +-define(HP_PSK, 1000). +-define(HP_REWRITE, 1000). +-define(HP_AUTHN, 990). +-define(HP_AUTHZ, 980). +-define(HP_SYS_MSGS, 960). +-define(HP_TOPIC_METRICS, 950). +-define(HP_RETAINER, 940). +-define(HP_AUTO_SUB, 930). + +-define(HP_RULE_ENGINE, 900). + +%% apps that can work with the republish action +-define(HP_SLOW_SUB, 980). +-define(HP_BRIDGE, 970). +-define(HP_DELAY_PUB, 960). + +%% apps that can stop the hooks +-define(HP_EXHOOK, 100). +%% == Lowest Priority diff --git a/apps/emqx/src/emqx_authentication.erl b/apps/emqx/src/emqx_authentication.erl index 101be754d..b46e1faa8 100644 --- a/apps/emqx/src/emqx_authentication.erl +++ b/apps/emqx/src/emqx_authentication.erl @@ -25,7 +25,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("emqx_authentication.hrl"). - +-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM). @@ -696,7 +696,7 @@ maybe_hook(#{hooked := false} = State) -> ) of true -> - _ = emqx:hook('client.authenticate', {?MODULE, authenticate, []}), + ok = emqx_hooks:put('client.authenticate', {?MODULE, authenticate, []}, ?HP_AUTHN), State#{hooked => true}; false -> State @@ -715,7 +715,7 @@ maybe_unhook(#{hooked := true} = State) -> ) of true -> - _ = emqx:unhook('client.authenticate', {?MODULE, authenticate, []}), + ok = emqx_hooks:del('client.authenticate', {?MODULE, authenticate, []}), State#{hooked => false}; false -> State diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index 72319bae0..632257c4f 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -121,7 +121,7 @@ callback_priority(#callback{priority = P}) -> P. %% Hooks API %%-------------------------------------------------------------------- -%% @doc Register a callback +%% @doc `add/2,3,4` add a new hook, returns 'already_exists' if the hook exists. -spec add(hookpoint(), action() | callback()) -> ok_or_error(already_exists). add(HookPoint, Callback) when is_record(Callback, callback) -> gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); @@ -140,7 +140,7 @@ add(HookPoint, Action, Priority) when is_integer(Priority) -> add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). -%% @doc Like add/2, it register a callback, discard 'already_exists' error. +%% @doc `put/2,3,4` updates the existing hook, add it if not exists. -spec put(hookpoint(), action() | callback()) -> ok. put(HookPoint, Callback) when is_record(Callback, callback) -> case add(HookPoint, Callback) of @@ -296,7 +296,7 @@ insert_hook(HookPoint, Callbacks) -> ets:insert(?TAB, #hook{name = HookPoint, callbacks = Callbacks}), ok. update_hook(HookPoint, Callbacks) -> - Ms = ets:fun2ms(fun({hook, K, V}) when K =:= HookPoint -> {hook, K, Callbacks} end), + Ms = ets:fun2ms(fun({hook, K, _V}) when K =:= HookPoint -> {hook, K, Callbacks} end), ets:select_replace(emqx_hooks, Ms), ok. diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 2b1f87746..284fefac2 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("types.hrl"). -include("logger.hrl"). +-include("emqx_hooks.hrl"). -export([ start_link/0, @@ -190,7 +191,7 @@ load_event_hooks(Events) -> ok; ({K, true}) -> {HookPoint, Fun} = hook_and_fun(K), - emqx_hooks:put(HookPoint, {?MODULE, Fun, []}) + emqx_hooks:put(HookPoint, {?MODULE, Fun, []}, ?HP_SYS_MSGS) end, Events ). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index e41ef71ce..410920f97 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -19,6 +19,7 @@ -include("emqx_authz.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -ifdef(TEST). -compile(export_all). @@ -100,7 +101,7 @@ init() -> Sources = emqx_conf:get(?CONF_KEY_PATH, []), ok = check_dup_types(Sources), NSources = create_sources(Sources), - ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1). + ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [NSources]}, ?HP_AUTHZ). deinit() -> ok = emqx_hooks:del('client.authorize', {?MODULE, authorize}), @@ -501,7 +502,7 @@ get_source_by_type(Type, Sources) -> %% @doc put hook with (maybe) initialized new source and old sources update_authz_chain(Actions) -> - emqx_hooks:put('client.authorize', {?MODULE, authorize, [Actions]}, -1). + emqx_hooks:put('client.authorize', {?MODULE, authorize, [Actions]}, ?HP_AUTHZ). check_acl_file_rules(RawRules) -> %% TODO: make sure the bin rules checked diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index f7f06b13c..44d3d04c5 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -16,6 +16,8 @@ -module(emqx_auto_subscribe). +-include_lib("emqx/include/emqx_hooks.hrl"). + -define(HOOK_POINT, 'client.connected'). -define(MAX_AUTO_SUBSCRIBE, 20). @@ -114,5 +116,7 @@ update_hook() -> update_hook(Config) -> {TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config), - emqx_hooks:put(?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}), + emqx_hooks:put( + ?HOOK_POINT, {?MODULE, on_client_connected, [{TopicHandler, Options}]}, ?HP_AUTO_SUB + ), ok. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index c4a657a0c..200c8c2a9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -17,6 +17,7 @@ -behaviour(emqx_config_handler). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([post_config_update/5]). @@ -105,7 +106,7 @@ load_hook(Bridges) -> do_load_hook(#{local_topic := _} = Conf) -> case maps:get(direction, Conf, egress) of - egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}); + egress -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_BRIDGE); ingress -> ok end; do_load_hook(_Conf) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index c18dd6239..cf83e8eb9 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -317,7 +317,7 @@ code_change(_OldVsn, State, _Extra) -> unload_exhooks() -> [ - emqx:unhook(Name, {M, F}) + emqx_hooks:del(Name, {M, F}) || {Name, {M, F, _A}} <- ?ENABLED_HOOKS ]. diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 88aee8e7d..b0c9b8454 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -18,6 +18,7 @@ -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). %% The exhook proto version should be fixed as `v2` in EMQX v5.x %% to make sure the exhook proto version is compatible @@ -255,7 +256,7 @@ ensure_hooks(HookSpecs) -> false -> ?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint}); {Hookpoint, {M, F, A}} -> - emqx_hooks:put(Hookpoint, {M, F, A}), + emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK), ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) end end, diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 99ae97dd3..99bd62022 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -379,7 +380,7 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now -> delayed_count() -> mnesia:table_info(?TAB, size). do_load_or_unload(true, State) -> - emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB), State; do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index edad6dfc4..79465045d 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -19,6 +19,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -ifdef(TEST). -export([ @@ -76,9 +77,9 @@ register_hook([]) -> unregister_hook(); register_hook(Rules) -> {PubRules, SubRules, ErrRules} = compile(Rules), - emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), - emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), - emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}), + emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, ?HP_REWRITE), + emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, ?HP_REWRITE), + emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, ?HP_REWRITE), case ErrRules of [] -> ok; diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index 76d0c6e14..5a0925771 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -export([ on_message_publish/1, @@ -106,9 +107,9 @@ max_limit() -> ?MAX_TOPICS. enable() -> - emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), - emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}), - emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}). + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_TOPIC_METRICS), + emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}, ?HP_TOPIC_METRICS), + emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}, ?HP_TOPIC_METRICS). disable() -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 294a969c2..99354d230 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -export([ load/0, @@ -85,10 +86,10 @@ mnesia(boot) -> %%------------------------------------------------------------------------------ load() -> - emqx:hook('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup, []}). + ok = emqx_hooks:put('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup, []}, ?HP_PSK). unload() -> - emqx:unhook('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup}). + ok = emqx_hooks:del('tls_handshake.psk_lookup', {?MODULE, on_psk_lookup}). on_psk_lookup(PSKIdentity, _UserState) -> case mnesia:dirty_read(?TAB, PSKIdentity) of diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 6764f5e82..c15b0f5bd 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -20,6 +20,7 @@ -include("emqx_retainer.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -export([start_link/0]). @@ -423,13 +424,15 @@ close_resource(_) -> -spec load(context()) -> ok. load(Context) -> - _ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, [Context]}), - _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Context]}), + ok = emqx_hooks:put( + 'session.subscribed', {?MODULE, on_session_subscribed, [Context]}, ?HP_RETAINER + ), + ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, [Context]}, ?HP_RETAINER), emqx_stats:update_interval(emqx_retainer_stats, fun ?MODULE:stats_fun/0), ok. unload() -> - emqx:unhook('message.publish', {?MODULE, on_message_publish}), - emqx:unhook('session.subscribed', {?MODULE, on_session_subscribed}), + ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), + ok = emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}), emqx_stats:cancel_update(emqx_retainer_stats), ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 9aa7d0aa0..a25ad0e2e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -19,6 +19,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -export([ reload/0, @@ -87,7 +88,9 @@ reload() -> load(Topic) -> HookPoint = event_name(Topic), - emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}). + emqx_hooks:put( + HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE + ). unload() -> lists:foreach( diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index f0605baca..2a07518b3 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). -export([ start_link/0, @@ -212,7 +213,7 @@ load(State) -> threshold := Threshold } = emqx:get_config([slow_subs]), MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE), - _ = emqx:hook( + ok = emqx_hooks:put( 'delivery.completed', {?MODULE, on_delivery_completed, [ #{ @@ -220,14 +221,15 @@ load(State) -> stats_type => StatsType, threshold => Threshold } - ]} + ]}, + ?HP_SLOW_SUB ), State1 = start_timer(expire_timer, fun expire_tick/0, State), State1#{enable := true, last_tick_at => ?NOW}. unload(#{expire_timer := ExpireTimer} = State) -> - emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}), + emqx_hooks:del('delivery.completed', {?MODULE, on_delivery_completed}), State#{ enable := false, expire_timer := cancel_timer(ExpireTimer)