From 14983ec14a66e4bc2e4aef853ffc167e53a01fa7 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 15 Sep 2023 13:22:39 +0300 Subject: [PATCH] chore(hooks): validate hookpoints and document hook callbacks Co-authored-by: Thales Macedo Garitezi --- apps/emqx/src/emqx.erl | 15 +- apps/emqx/src/emqx_access_control.erl | 6 +- apps/emqx/src/emqx_channel.erl | 2 +- apps/emqx/src/emqx_hookpoints.erl | 232 ++++++++++++++++++ apps/emqx/src/emqx_hooks.erl | 5 + apps/emqx/src/emqx_session.erl | 2 +- apps/emqx/test/emqx_SUITE.erl | 86 ------- apps/emqx/test/emqx_cm_locker_SUITE.erl | 2 +- apps/emqx/test/emqx_hooks_SUITE.erl | 60 +++-- .../src/emqx_bridge_gcp_pubsub.app.src | 2 +- ...emqx_bridge_gcp_pubsub_consumer_worker.erl | 2 +- .../src/emqx_bridge_kafka_impl_consumer.erl | 2 +- .../src/emqx_bridge_mqtt.app.src | 2 +- .../src/emqx_bridge_mqtt_connector.erl | 2 +- .../src/emqx_coap_channel.erl | 2 +- .../src/emqx_gateway_lwm2m.app.src | 2 +- .../src/emqx_lwm2m_channel.erl | 4 +- .../src/emqx_gateway_stomp.app.src | 2 +- .../src/emqx_stomp_channel.erl | 4 +- .../test/emqx_mgmt_api_plugins_SUITE.erl | 4 +- apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 27 +- 21 files changed, 305 insertions(+), 160 deletions(-) create mode 100644 apps/emqx/src/emqx_hookpoints.erl diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 47ff384c9..288ab9886 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -53,12 +53,6 @@ subscribed/2 ]). -%% Hooks API --export([ - run_hook/2, - run_fold_hook/3 -]). - %% Configs APIs -export([ get_config/1, @@ -172,15 +166,8 @@ subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) -> emqx_broker:subscribed(SubId, iolist_to_binary(Topic)). %%-------------------------------------------------------------------- -%% Hooks API +%% Config API %%-------------------------------------------------------------------- --spec run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop. -run_hook(HookPoint, Args) -> - emqx_hooks:run(HookPoint, Args). - --spec run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any(). -run_fold_hook(HookPoint, Args, Acc) -> - emqx_hooks:run_fold(HookPoint, Args, Acc). -spec get_config(emqx_utils_maps:config_key_path()) -> term(). get_config(KeyPath) -> diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 82604710a..8aaa44eaa 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -138,7 +138,7 @@ check_authorization_cache(ClientInfo, Action, Topic) -> emqx_authz_cache:put_authz_cache(Action, Topic, AuthzResult), AuthzResult; AuthzResult -> - emqx:run_hook( + emqx_hooks:run( 'client.check_authz_complete', [ClientInfo, Action, Topic, AuthzResult, cache] ), @@ -152,7 +152,7 @@ do_authorize(ClientInfo, Action, Topic) -> case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of AuthzResult = #{result := Result} when Result == allow; Result == deny -> From = maps:get(from, AuthzResult, unknown), - emqx:run_hook( + emqx_hooks:run( 'client.check_authz_complete', [ClientInfo, Action, Topic, Result, From] ), @@ -163,7 +163,7 @@ do_authorize(ClientInfo, Action, Topic) -> expected_example => "#{result => allow, from => default}", got => Other }), - emqx:run_hook( + emqx_hooks:run( 'client.check_authz_complete', [ClientInfo, Action, Topic, deny, unknown_return_format] ), diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8669aea8e..880226031 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -76,7 +76,7 @@ ] ). --export_type([channel/0, opts/0, conn_state/0]). +-export_type([channel/0, opts/0, conn_state/0, reply/0, replies/0]). -record(channel, { %% MQTT ConnInfo diff --git a/apps/emqx/src/emqx_hookpoints.erl b/apps/emqx/src/emqx_hookpoints.erl new file mode 100644 index 000000000..1a1452a57 --- /dev/null +++ b/apps/emqx/src/emqx_hookpoints.erl @@ -0,0 +1,232 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_hookpoints). + +-type callback_result() :: stop | any(). +-type fold_callback_result(Acc) :: {stop, Acc} | {ok, Acc} | stop | any(). + +-export_type([ + fold_callback_result/1, + callback_result/0 +]). + +-export([ + default_hookpoints/0, + register_hookpoints/0, + register_hookpoints/1, + verify_hookpoint/1 +]). + +%%----------------------------------------------------------------------------- +%% Hookpoints +%%----------------------------------------------------------------------------- + +-define(HOOKPOINTS, [ + 'client.connect', + 'client.connack', + 'client.connected', + 'client.disconnected', + 'client.authorize', + 'client.check_authz_complete', + 'client.authenticate', + 'client.subscribe', + 'client.unsubscribe', + 'client.timeout', + 'client.monitored_process_down', + 'session.created', + 'session.subscribed', + 'session.unsubscribed', + 'session.resumed', + 'session.discarded', + 'session.takenover', + 'session.terminated', + 'message.publish', + 'message.puback', + 'message.dropped', + 'message.delivered', + 'message.acked', + 'delivery.dropped', + 'delivery.completed', + 'cm.channel.unregistered', + 'tls_handshake.psk_lookup', + + %% This is a deprecated hookpoint renamed to 'client.authorize' + %% However, our template plugin used this hookpoint before its 5.1.0 version, + %% so we keep it here + 'client.check_acl' +]). + +%%----------------------------------------------------------------------------- +%% Callbacks +%%----------------------------------------------------------------------------- + +%% Callback definitions are given for documentation purposes. +%% Each hook callback implementation can also accept any number of custom arguments +%% after the mandatory ones. +%% +%% By default, callbacks are executed in the channel process context. + +-callback 'client.connect'(emqx_types:conninfo(), Props) -> + fold_callback_result(Props) +when + Props :: emqx_types:properties(). + +-callback 'client.connack'(emqx_types:conninfo(), _Reason :: atom(), Props) -> + fold_callback_result(Props) +when + Props :: emqx_types:properties(). + +-callback 'client.connected'(emqx_types:clientinfo(), emqx_types:conninfo()) -> callback_result(). + +-callback 'client.disconnected'(emqx_types:clientinfo(), _Reason :: atom(), emqx_types:conninfo()) -> + callback_result(). + +-callback 'client.authorize'( + emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), allow | deny +) -> + fold_callback_result(#{result := allow | deny, from => term()}). + +-callback 'client.check_authz_complete'( + emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), allow | deny, _From :: term() +) -> + callback_result(). + +-callback 'client.authenticate'(emqx_types:clientinfo(), ignore) -> + fold_callback_result( + ignore + | ok + | {ok, map()} + | {ok, map(), binary()} + | {continue, map()} + | {continue, binary(), map()} + | {error, term()} + ). + +-callback 'client.subscribe'(emqx_types:clientinfo(), emqx_types:properties(), TopicFilters) -> + fold_callback_result(TopicFilters) +when + TopicFilters :: list({emqx_topic:topic(), map()}). + +-callback 'client.unsubscribe'(emqx_types:clientinfo(), emqx_types:properties(), TopicFilters) -> + fold_callback_result(TopicFilters) +when + TopicFilters :: list({emqx_topic:topic(), map()}). + +-callback 'client.timeout'(_TimerReference :: reference(), _Msg :: term(), Replies) -> + fold_callback_result(Replies) +when + Replies :: emqx_channel:replies(). + +-callback 'client.monitored_process_down'( + _MonitorRef :: reference(), _Pid :: pid(), _Reason :: term(), Replies +) -> + fold_callback_result(Replies) +when + Replies :: emqx_channel:replies(). + +-callback 'session.created'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) -> + callback_result(). + +-callback 'session.subscribed'(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) -> + callback_result(). + +-callback 'session.unsubscribed'(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) -> + callback_result(). + +-callback 'session.resumed'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) -> + callback_result(). + +-callback 'session.discarded'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) -> + callback_result(). + +-callback 'session.takenover'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) -> + callback_result(). + +-callback 'session.terminated'( + emqx_types:clientinfo(), _Reason :: atom(), _SessionInfo :: emqx_types:infos() +) -> callback_result(). + +-callback 'message.publish'(Msg) -> + fold_callback_result(Msg) +when + Msg :: emqx_types:message(). + +-callback 'message.puback'( + emqx_types:packet_id(), + emqx_types:message(), + emqx_types:publish_result(), + emqx_types:reason_code() +) -> + fold_callback_result(undefined | emqx_types:reason_code()). + +-callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) -> + callback_result(). + +-callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when + Msg :: emqx_types:message(). + +-callback 'message.acked'(emqx_types:clientinfo(), emqx_types:message()) -> callback_result(). + +-callback 'delivery.dropped'(emqx_types:clientinfo(), emqx_types:message(), _Reason :: atom()) -> + callback_result(). + +-callback 'delivery.completed'(emqx_types:message(), #{ + session_birth_time := emqx_utils_calendar:epoch_millisecond(), clientid := emqx_types:clientid() +}) -> + callback_result(). + +%% NOTE +%% Executed out of channel process context +-callback 'cm.channel.unregistered'(_ChanPid :: pid()) -> callback_result(). + +%% NOTE +%% Executed out of channel process context +-callback 'tls_handshake.psk_lookup'(emqx_tls_psk:psk_identity(), normal) -> + fold_callback_result( + {ok, _SharedSecret :: binary()} + | {error, term()} + | normal + ). + +%%----------------------------------------------------------------------------- +%% API +%%----------------------------------------------------------------------------- + +default_hookpoints() -> + ?HOOKPOINTS. + +register_hookpoints() -> + register_hookpoints(default_hookpoints()). + +register_hookpoints(HookPoints) -> + persistent_term:put(?MODULE, maps:from_keys(HookPoints, true)). + +verify_hookpoint(HookPoint) when is_binary(HookPoint) -> ok; +verify_hookpoint(HookPoint) -> + case maps:is_key(HookPoint, registered_hookpoints()) of + true -> + ok; + false -> + error({invalid_hookpoint, HookPoint}) + end. + +%%----------------------------------------------------------------------------- +%% Internal API +%%----------------------------------------------------------------------------- + +registered_hookpoints() -> + persistent_term:get(?MODULE, #{}). diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index 0b8dc0941..c3e9c2230 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -131,6 +131,7 @@ add(HookPoint, Action, Priority, Filter) when is_integer(Priority) -> do_add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). do_add(HookPoint, Callback) -> + ok = emqx_hookpoints:verify_hookpoint(HookPoint), gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity). %% @doc `put/3,4` updates the existing hook, add it if not exists. @@ -143,6 +144,7 @@ put(HookPoint, Action, Priority, Filter) when is_integer(Priority) -> do_put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). do_put(HookPoint, Callback) -> + ok = emqx_hookpoints:verify_hookpoint(HookPoint), case do_add(HookPoint, Callback) of ok -> ok; {error, already_exists} -> gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity) @@ -156,11 +158,13 @@ del(HookPoint, Action) -> %% @doc Run hooks. -spec run(hookpoint(), list(Arg :: term())) -> ok. run(HookPoint, Args) -> + ok = emqx_hookpoints:verify_hookpoint(HookPoint), do_run(lookup(HookPoint), Args). %% @doc Run hooks with Accumulator. -spec run_fold(hookpoint(), list(Arg :: term()), Acc :: term()) -> Acc :: term(). run_fold(HookPoint, Args, Acc) -> + ok = emqx_hookpoints:verify_hookpoint(HookPoint), do_run_fold(lookup(HookPoint), Args, Acc). do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) -> @@ -230,6 +234,7 @@ lookup(HookPoint) -> init([]) -> ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + ok = emqx_hookpoints:register_hookpoints(), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 092c4483a..759ecab58 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -496,7 +496,7 @@ stats(Session) -> %%-------------------------------------------------------------------- on_delivery_completed(Msg, #{clientid := ClientId}, Session) -> - emqx:run_hook( + emqx_hooks:run( 'delivery.completed', [ Msg, diff --git a/apps/emqx/test/emqx_SUITE.erl b/apps/emqx/test/emqx_SUITE.erl index cfabff401..d2e26232f 100644 --- a/apps/emqx/test/emqx_SUITE.erl +++ b/apps/emqx/test/emqx_SUITE.erl @@ -95,59 +95,6 @@ t_emqx_pubsub_api(_) -> ct:sleep(20), ?assertEqual([], emqx:topics()). -t_hook_unhook(_) -> - ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0), - ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0), - ?assertEqual( - {error, already_exists}, - emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0) - ), - ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun1}), - ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun2}), - - ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun8, []}, 8), - ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun2, []}, 2), - ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun10, []}, 10), - ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun9, []}, 9), - ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun2, []}), - ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun8, []}), - ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun9, []}), - ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun10, []}). - -t_run_hook(_) -> - ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0), - ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0), - ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0), - [r5, r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []), - [] = emqx:run_fold_hook(unknown_hook, [], []), - - ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0), - ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0), - [r10] = emqx:run_fold_hook(foldl_hook2, [arg], []), - - ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0), - {error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0), - ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0), - ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0), - ok = emqx:run_hook(foreach_hook, [arg]), - - ok = emqx_hooks:add( - foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []} - ), - %% filter passed - ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), - %% filter failed - ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), - - ok = emqx_hooks:add( - foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]} - ), - ok = emqx_hooks:add( - foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]} - ), - ?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)), - ?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)). - t_cluster_nodes(_) -> Expected = [node()], ?assertEqual(Expected, emqx:running_nodes()), @@ -169,36 +116,3 @@ t_get_config_default_2(_) -> NonAtomPathRes = emqx:get_config(["doesnotexist", <<"db_backend">>], undefined), ?assertEqual(undefined, NonAtomPathRes), ?assertEqual(undefined, AtomPathRes). -%%-------------------------------------------------------------------- -%% Hook fun -%%-------------------------------------------------------------------- - -hook_fun1(arg) -> ok; -hook_fun1(_) -> error. - -hook_fun2(arg) -> ok; -hook_fun2(_) -> error. - -hook_fun2(_, Acc) -> {ok, Acc + 1}. -hook_fun2_1(_, Acc) -> {ok, Acc + 1}. - -hook_fun3(arg1, arg2, _Acc, init) -> ok. -hook_fun4(arg1, arg2, Acc, init) -> {ok, [r4 | Acc]}. -hook_fun5(arg1, arg2, Acc, init) -> {ok, [r5 | Acc]}. - -hook_fun6(arg, initArg) -> ok. -hook_fun7(arg, initArg) -> ok. -hook_fun8(arg, initArg) -> ok. - -hook_fun9(arg, Acc) -> {stop, [r9 | Acc]}. -hook_fun10(arg, Acc) -> {stop, [r10 | Acc]}. - -hook_filter1(arg) -> true; -hook_filter1(_) -> false. - -hook_filter2(arg, _Acc, init_arg) -> true; -hook_filter2(_, _Acc, _IntArg) -> false. - -hook_filter2_1(arg, _Acc, init_arg) -> true; -hook_filter2_1(arg1, _Acc, init_arg) -> true; -hook_filter2_1(_, _Acc, _IntArg) -> false. diff --git a/apps/emqx/test/emqx_cm_locker_SUITE.erl b/apps/emqx/test/emqx_cm_locker_SUITE.erl index 23bfe5ff3..3dfb6e5ad 100644 --- a/apps/emqx/test/emqx_cm_locker_SUITE.erl +++ b/apps/emqx/test/emqx_cm_locker_SUITE.erl @@ -38,7 +38,7 @@ t_trans(_) -> ok = emqx_cm_locker:trans(undefined, fun(_) -> ok end, []), ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end). -t_lock_unlocak(_) -> +t_lock_unlock(_) -> {true, _} = emqx_cm_locker:lock(<<"clientid">>), {true, _} = emqx_cm_locker:lock(<<"clientid">>), {true, _} = emqx_cm_locker:unlock(<<"clientid">>), diff --git a/apps/emqx/test/emqx_hooks_SUITE.erl b/apps/emqx/test/emqx_hooks_SUITE.erl index 8b631b5b1..c5135026b 100644 --- a/apps/emqx/test/emqx_hooks_SUITE.erl +++ b/apps/emqx/test/emqx_hooks_SUITE.erl @@ -25,26 +25,24 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_testcase(_, Config) -> + {ok, _} = emqx_hooks:start_link(), + ok = emqx_hookpoints:register_hookpoints( + [ + test_hook, + emqx_hook, + foldl_hook, + foldl_hook2, + foreach_hook, + foreach_filter1_hook, + foldl_filter2_hook + ] + ), Config. end_per_testcase(_) -> + ok = emqx_hookpoints:register_hookpoints(), catch emqx_hooks:stop(). -% t_lookup(_) -> -% error('TODO'). - -% t_run_fold(_) -> -% error('TODO'). - -% t_run(_) -> -% error('TODO'). - -% t_del(_) -> -% error('TODO'). - -% t_add(_) -> -% error('TODO'). - t_add_hook_order(_) -> ?assert( proper:quickcheck( @@ -65,7 +63,8 @@ add_hook_order_prop() -> Hooks, hooks(), try - {ok, _} = emqx_hooks:start_link(), + _ = emqx_hooks:start_link(), + ok = emqx_hookpoints:register_hookpoints([prop_hook]), [ok = emqx_hooks:add(prop_hook, {M, F, []}, Prio) || {Prio, M, F} <- Hooks], Callbacks = emqx_hooks:lookup(prop_hook), Order = [{Prio, M, F} || {callback, {M, F, _}, _Filter, Prio} <- Callbacks], @@ -92,7 +91,6 @@ hooks() -> ). t_add_put_del_hook(_) -> - {ok, _} = emqx_hooks:start_link(), ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0), ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0), ?assertEqual( @@ -150,31 +148,40 @@ t_add_put_del_hook(_) -> ?assertEqual([], emqx_hooks:lookup(emqx_hook)). t_run_hooks(_) -> - {ok, _} = emqx_hooks:start_link(), ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0), ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0), ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0), - [r5, r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []), - [] = emqx:run_fold_hook(unknown_hook, [], []), + [r5, r4] = emqx_hooks:run_fold(foldl_hook, [arg1, arg2], []), + + ?assertException( + error, + {invalid_hookpoint, unknown_hook}, + emqx_hooks:run_fold(unknown_hook, [], []) + ), + ?assertException( + error, + {invalid_hookpoint, unknown_hook}, + emqx_hooks:add(unknown_hook, {?MODULE, hook_fun5, [init]}, 0) + ), ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0), ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0), %% Note: 10 is _less_ than 9 per lexicographic order - [r10] = emqx:run_fold_hook(foldl_hook2, [arg], []), + [r10] = emqx_hooks:run_fold(foldl_hook2, [arg], []), ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0), {error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0), ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0), ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0), - ok = emqx:run_hook(foreach_hook, [arg]), + ok = emqx_hooks:run(foreach_hook, [arg]), ok = emqx_hooks:add( foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []} ), %% filter passed - ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), + ?assertEqual(ok, emqx_hooks:run(foreach_filter1_hook, [arg])), %% filter failed - ?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), + ?assertEqual(ok, emqx_hooks:run(foreach_filter1_hook, [arg1])), ok = emqx_hooks:add( foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]} @@ -182,11 +189,10 @@ t_run_hooks(_) -> ok = emqx_hooks:add( foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]} ), - ?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)), - ?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)). + ?assertEqual(3, emqx_hooks:run_fold(foldl_filter2_hook, [arg], 1)), + ?assertEqual(2, emqx_hooks:run_fold(foldl_filter2_hook, [arg1], 1)). t_uncovered_func(_) -> - {ok, _} = emqx_hooks:start_link(), Pid = erlang:whereis(emqx_hooks), gen_server:call(Pid, test), gen_server:cast(Pid, test), diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index 9afc0f05e..d0821ea83 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl index 291ace7e0..84a4e6d13 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl @@ -749,7 +749,7 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess Payload = render(FullMessage, PayloadTemplate), MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload), _ = emqx:publish(MQTTMessage), - emqx:run_hook(Hookpoint, [FullMessage]), + emqx_hooks:run(Hookpoint, [FullMessage]), emqx_resource_metrics:received_inc(InstanceId), ok end diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index b16f163fb..89cb9a78f 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -252,7 +252,7 @@ do_handle_message(Message, State) -> MQTTTopic = render(FullMessage, MQTTTopicTemplate), MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), _ = emqx:publish(MQTTMessage), - emqx:run_hook(Hookpoint, [FullMessage]), + emqx_hooks:run(Hookpoint, [FullMessage]), emqx_resource_metrics:received_inc(ResourceId), %% note: just `ack' does not commit the offset to the %% kafka consumer group. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 052b271f8..e39c4df69 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index a1cfe687f..eb81c4b6e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -41,7 +41,7 @@ %% if the bridge received msgs from the remote broker. on_message_received(Msg, HookPoint, ResId) -> emqx_resource_metrics:received_inc(ResId), - emqx:run_hook(HookPoint, [Msg]). + emqx_hooks:run(HookPoint, [Msg]). %% =================================================================== callback_mode() -> async_if_possible. diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index 0c0b7310d..467ac20a2 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -286,7 +286,7 @@ handle_call( ok = emqx_broker:unsubscribe(MountedTopic), _ = run_hooks( Ctx, - 'session.unsubscribe', + 'session.unsubscribed', [ClientInfo, MountedTopic, #{}] ), diff --git a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src index db7cd665f..e5afd7871 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src +++ b/apps/emqx_gateway_lwm2m/src/emqx_gateway_lwm2m.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_lwm2m, [ {description, "LwM2M Gateway"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]}, {env, []}, diff --git a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl index e187b3fb7..0f225b59b 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl @@ -264,7 +264,7 @@ handle_call( ok = emqx_broker:unsubscribe(MountedTopic), _ = run_hooks( Ctx, - 'session.unsubscribe', + 'session.unsubscribed', [ClientInfo, MountedTopic, #{}] ), %% modify session state @@ -331,7 +331,7 @@ terminate(Reason, #channel{ session = Session }) -> MountedTopic = emqx_lwm2m_session:on_close(Session), - _ = run_hooks(Ctx, 'session.unsubscribe', [ClientInfo, MountedTopic, #{}]), + _ = run_hooks(Ctx, 'session.unsubscribed', [ClientInfo, MountedTopic, #{}]), run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src index bcc018ad4..22dd4efde 100644 --- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src +++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_stomp, [ {description, "Stomp Gateway"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 3ae928ba3..8e9be8359 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -549,7 +549,7 @@ handle_in( ok = emqx_broker:unsubscribe(MountedTopic), _ = run_hooks( Ctx, - 'session.unsubscribe', + 'session.unsubscribed', [ClientInfo, MountedTopic, #{}] ), {ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}}; @@ -869,7 +869,7 @@ handle_call( ok = emqx_broker:unsubscribe(MountedTopic), _ = run_hooks( Ctx, - 'session.unsubscribe', + 'session.unsubscribed', [ClientInfo, MountedTopic, #{}] ), reply( diff --git a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl index dbf034bf8..4909f8ce8 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_plugins_SUITE.erl @@ -21,8 +21,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(EMQX_PLUGIN_TEMPLATE_NAME, "emqx_plugin_template"). --define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0.0"). +-define(EMQX_PLUGIN_TEMPLATE_NAME, "my_emqx_plugin"). +-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.1.0"). -define(PACKAGE_SUFFIX, ".tar.gz"). -define(CLUSTER_API_SERVER(PORT), ("http://127.0.0.1:" ++ (integer_to_list(PORT)))). diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index bf359374e..5680aa047 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -22,12 +22,13 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, "emqx_plugin_template"). +-define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin). +-define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)). -define(EMQX_PLUGIN_TEMPLATE_URL, "https://github.com/emqx/emqx-plugin-template/releases/download/" ). --define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0.0"). --define(EMQX_PLUGIN_TEMPLATE_TAG, "5.0.0"). +-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.1.0"). +-define(EMQX_PLUGIN_TEMPLATE_TAG, "5.1.0"). -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_RELEASE_NAME, "elixir_plugin_template"). -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_URL, "https://github.com/emqx/emqx-elixir-plugin/releases/download/" @@ -153,11 +154,11 @@ t_demo_install_start_stop_uninstall(Config) -> ?assertEqual([maps:without([readme], Info)], emqx_plugins:list()), %% start ok = emqx_plugins:ensure_started(NameVsn), - ok = assert_app_running(emqx_plugin_template, true), + ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, true), ok = assert_app_running(map_sets, true), %% start (idempotent) ok = emqx_plugins:ensure_started(bin(NameVsn)), - ok = assert_app_running(emqx_plugin_template, true), + ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, true), ok = assert_app_running(map_sets, true), %% running app can not be un-installed @@ -168,11 +169,11 @@ t_demo_install_start_stop_uninstall(Config) -> %% stop ok = emqx_plugins:ensure_stopped(NameVsn), - ok = assert_app_running(emqx_plugin_template, false), + ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, false), ok = assert_app_running(map_sets, false), %% stop (idempotent) ok = emqx_plugins:ensure_stopped(bin(NameVsn)), - ok = assert_app_running(emqx_plugin_template, false), + ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, false), ok = assert_app_running(map_sets, false), %% still listed after stopped ReleaseNameBin = list_to_binary(ReleaseName), @@ -257,9 +258,9 @@ t_start_restart_and_stop(Config) -> %% fake a disabled plugin in config ok = ensure_state(Bar2, front, false), - assert_app_running(emqx_plugin_template, false), + assert_app_running(?EMQX_PLUGIN_APP_NAME, false), ok = emqx_plugins:ensure_started(), - assert_app_running(emqx_plugin_template, true), + assert_app_running(?EMQX_PLUGIN_APP_NAME, true), %% fake enable bar-2 ok = ensure_state(Bar2, rear, true), @@ -269,18 +270,18 @@ t_start_restart_and_stop(Config) -> emqx_plugins:ensure_started() ), %% but demo plugin should still be running - assert_app_running(emqx_plugin_template, true), + assert_app_running(?EMQX_PLUGIN_APP_NAME, true), %% stop all ok = emqx_plugins:ensure_stopped(), - assert_app_running(emqx_plugin_template, false), + assert_app_running(?EMQX_PLUGIN_APP_NAME, false), ok = ensure_state(Bar2, rear, false), ok = emqx_plugins:restart(NameVsn), - assert_app_running(emqx_plugin_template, true), + assert_app_running(?EMQX_PLUGIN_APP_NAME, true), %% repeat ok = emqx_plugins:restart(NameVsn), - assert_app_running(emqx_plugin_template, true), + assert_app_running(?EMQX_PLUGIN_APP_NAME, true), ok = emqx_plugins:ensure_stopped(), ok = emqx_plugins:ensure_disabled(NameVsn),