chore(hooks): validate hookpoints and document hook callbacks

Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
This commit is contained in:
Ilya Averyanov 2023-09-15 13:22:39 +03:00
parent 17206f8c75
commit 14983ec14a
21 changed files with 305 additions and 160 deletions

View File

@ -53,12 +53,6 @@
subscribed/2 subscribed/2
]). ]).
%% Hooks API
-export([
run_hook/2,
run_fold_hook/3
]).
%% Configs APIs %% Configs APIs
-export([ -export([
get_config/1, get_config/1,
@ -172,15 +166,8 @@ subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
emqx_broker:subscribed(SubId, iolist_to_binary(Topic)). emqx_broker:subscribed(SubId, iolist_to_binary(Topic)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hooks API %% Config API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec run_hook(emqx_hooks:hookpoint(), list(any())) -> ok | stop.
run_hook(HookPoint, Args) ->
emqx_hooks:run(HookPoint, Args).
-spec run_fold_hook(emqx_hooks:hookpoint(), list(any()), any()) -> any().
run_fold_hook(HookPoint, Args, Acc) ->
emqx_hooks:run_fold(HookPoint, Args, Acc).
-spec get_config(emqx_utils_maps:config_key_path()) -> term(). -spec get_config(emqx_utils_maps:config_key_path()) -> term().
get_config(KeyPath) -> get_config(KeyPath) ->

View File

@ -138,7 +138,7 @@ check_authorization_cache(ClientInfo, Action, Topic) ->
emqx_authz_cache:put_authz_cache(Action, Topic, AuthzResult), emqx_authz_cache:put_authz_cache(Action, Topic, AuthzResult),
AuthzResult; AuthzResult;
AuthzResult -> AuthzResult ->
emqx:run_hook( emqx_hooks:run(
'client.check_authz_complete', 'client.check_authz_complete',
[ClientInfo, Action, Topic, AuthzResult, cache] [ClientInfo, Action, Topic, AuthzResult, cache]
), ),
@ -152,7 +152,7 @@ do_authorize(ClientInfo, Action, Topic) ->
case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of case run_hooks('client.authorize', [ClientInfo, Action, Topic], Default) of
AuthzResult = #{result := Result} when Result == allow; Result == deny -> AuthzResult = #{result := Result} when Result == allow; Result == deny ->
From = maps:get(from, AuthzResult, unknown), From = maps:get(from, AuthzResult, unknown),
emqx:run_hook( emqx_hooks:run(
'client.check_authz_complete', 'client.check_authz_complete',
[ClientInfo, Action, Topic, Result, From] [ClientInfo, Action, Topic, Result, From]
), ),
@ -163,7 +163,7 @@ do_authorize(ClientInfo, Action, Topic) ->
expected_example => "#{result => allow, from => default}", expected_example => "#{result => allow, from => default}",
got => Other got => Other
}), }),
emqx:run_hook( emqx_hooks:run(
'client.check_authz_complete', 'client.check_authz_complete',
[ClientInfo, Action, Topic, deny, unknown_return_format] [ClientInfo, Action, Topic, deny, unknown_return_format]
), ),

View File

@ -76,7 +76,7 @@
] ]
). ).
-export_type([channel/0, opts/0, conn_state/0]). -export_type([channel/0, opts/0, conn_state/0, reply/0, replies/0]).
-record(channel, { -record(channel, {
%% MQTT ConnInfo %% MQTT ConnInfo

View File

@ -0,0 +1,232 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_hookpoints).
-type callback_result() :: stop | any().
-type fold_callback_result(Acc) :: {stop, Acc} | {ok, Acc} | stop | any().
-export_type([
fold_callback_result/1,
callback_result/0
]).
-export([
default_hookpoints/0,
register_hookpoints/0,
register_hookpoints/1,
verify_hookpoint/1
]).
%%-----------------------------------------------------------------------------
%% Hookpoints
%%-----------------------------------------------------------------------------
-define(HOOKPOINTS, [
'client.connect',
'client.connack',
'client.connected',
'client.disconnected',
'client.authorize',
'client.check_authz_complete',
'client.authenticate',
'client.subscribe',
'client.unsubscribe',
'client.timeout',
'client.monitored_process_down',
'session.created',
'session.subscribed',
'session.unsubscribed',
'session.resumed',
'session.discarded',
'session.takenover',
'session.terminated',
'message.publish',
'message.puback',
'message.dropped',
'message.delivered',
'message.acked',
'delivery.dropped',
'delivery.completed',
'cm.channel.unregistered',
'tls_handshake.psk_lookup',
%% This is a deprecated hookpoint renamed to 'client.authorize'
%% However, our template plugin used this hookpoint before its 5.1.0 version,
%% so we keep it here
'client.check_acl'
]).
%%-----------------------------------------------------------------------------
%% Callbacks
%%-----------------------------------------------------------------------------
%% Callback definitions are given for documentation purposes.
%% Each hook callback implementation can also accept any number of custom arguments
%% after the mandatory ones.
%%
%% By default, callbacks are executed in the channel process context.
-callback 'client.connect'(emqx_types:conninfo(), Props) ->
fold_callback_result(Props)
when
Props :: emqx_types:properties().
-callback 'client.connack'(emqx_types:conninfo(), _Reason :: atom(), Props) ->
fold_callback_result(Props)
when
Props :: emqx_types:properties().
-callback 'client.connected'(emqx_types:clientinfo(), emqx_types:conninfo()) -> callback_result().
-callback 'client.disconnected'(emqx_types:clientinfo(), _Reason :: atom(), emqx_types:conninfo()) ->
callback_result().
-callback 'client.authorize'(
emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), allow | deny
) ->
fold_callback_result(#{result := allow | deny, from => term()}).
-callback 'client.check_authz_complete'(
emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic(), allow | deny, _From :: term()
) ->
callback_result().
-callback 'client.authenticate'(emqx_types:clientinfo(), ignore) ->
fold_callback_result(
ignore
| ok
| {ok, map()}
| {ok, map(), binary()}
| {continue, map()}
| {continue, binary(), map()}
| {error, term()}
).
-callback 'client.subscribe'(emqx_types:clientinfo(), emqx_types:properties(), TopicFilters) ->
fold_callback_result(TopicFilters)
when
TopicFilters :: list({emqx_topic:topic(), map()}).
-callback 'client.unsubscribe'(emqx_types:clientinfo(), emqx_types:properties(), TopicFilters) ->
fold_callback_result(TopicFilters)
when
TopicFilters :: list({emqx_topic:topic(), map()}).
-callback 'client.timeout'(_TimerReference :: reference(), _Msg :: term(), Replies) ->
fold_callback_result(Replies)
when
Replies :: emqx_channel:replies().
-callback 'client.monitored_process_down'(
_MonitorRef :: reference(), _Pid :: pid(), _Reason :: term(), Replies
) ->
fold_callback_result(Replies)
when
Replies :: emqx_channel:replies().
-callback 'session.created'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
callback_result().
-callback 'session.subscribed'(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) ->
callback_result().
-callback 'session.unsubscribed'(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) ->
callback_result().
-callback 'session.resumed'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
callback_result().
-callback 'session.discarded'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
callback_result().
-callback 'session.takenover'(emqx_types:clientinfo(), _SessionInfo :: emqx_types:infos()) ->
callback_result().
-callback 'session.terminated'(
emqx_types:clientinfo(), _Reason :: atom(), _SessionInfo :: emqx_types:infos()
) -> callback_result().
-callback 'message.publish'(Msg) ->
fold_callback_result(Msg)
when
Msg :: emqx_types:message().
-callback 'message.puback'(
emqx_types:packet_id(),
emqx_types:message(),
emqx_types:publish_result(),
emqx_types:reason_code()
) ->
fold_callback_result(undefined | emqx_types:reason_code()).
-callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
callback_result().
-callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when
Msg :: emqx_types:message().
-callback 'message.acked'(emqx_types:clientinfo(), emqx_types:message()) -> callback_result().
-callback 'delivery.dropped'(emqx_types:clientinfo(), emqx_types:message(), _Reason :: atom()) ->
callback_result().
-callback 'delivery.completed'(emqx_types:message(), #{
session_birth_time := emqx_utils_calendar:epoch_millisecond(), clientid := emqx_types:clientid()
}) ->
callback_result().
%% NOTE
%% Executed out of channel process context
-callback 'cm.channel.unregistered'(_ChanPid :: pid()) -> callback_result().
%% NOTE
%% Executed out of channel process context
-callback 'tls_handshake.psk_lookup'(emqx_tls_psk:psk_identity(), normal) ->
fold_callback_result(
{ok, _SharedSecret :: binary()}
| {error, term()}
| normal
).
%%-----------------------------------------------------------------------------
%% API
%%-----------------------------------------------------------------------------
default_hookpoints() ->
?HOOKPOINTS.
register_hookpoints() ->
register_hookpoints(default_hookpoints()).
register_hookpoints(HookPoints) ->
persistent_term:put(?MODULE, maps:from_keys(HookPoints, true)).
verify_hookpoint(HookPoint) when is_binary(HookPoint) -> ok;
verify_hookpoint(HookPoint) ->
case maps:is_key(HookPoint, registered_hookpoints()) of
true ->
ok;
false ->
error({invalid_hookpoint, HookPoint})
end.
%%-----------------------------------------------------------------------------
%% Internal API
%%-----------------------------------------------------------------------------
registered_hookpoints() ->
persistent_term:get(?MODULE, #{}).

View File

@ -131,6 +131,7 @@ add(HookPoint, Action, Priority, Filter) when is_integer(Priority) ->
do_add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). do_add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
do_add(HookPoint, Callback) -> do_add(HookPoint, Callback) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity). gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity).
%% @doc `put/3,4` updates the existing hook, add it if not exists. %% @doc `put/3,4` updates the existing hook, add it if not exists.
@ -143,6 +144,7 @@ put(HookPoint, Action, Priority, Filter) when is_integer(Priority) ->
do_put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). do_put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
do_put(HookPoint, Callback) -> do_put(HookPoint, Callback) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
case do_add(HookPoint, Callback) of case do_add(HookPoint, Callback) of
ok -> ok; ok -> ok;
{error, already_exists} -> gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity) {error, already_exists} -> gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity)
@ -156,11 +158,13 @@ del(HookPoint, Action) ->
%% @doc Run hooks. %% @doc Run hooks.
-spec run(hookpoint(), list(Arg :: term())) -> ok. -spec run(hookpoint(), list(Arg :: term())) -> ok.
run(HookPoint, Args) -> run(HookPoint, Args) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
do_run(lookup(HookPoint), Args). do_run(lookup(HookPoint), Args).
%% @doc Run hooks with Accumulator. %% @doc Run hooks with Accumulator.
-spec run_fold(hookpoint(), list(Arg :: term()), Acc :: term()) -> Acc :: term(). -spec run_fold(hookpoint(), list(Arg :: term()), Acc :: term()) -> Acc :: term().
run_fold(HookPoint, Args, Acc) -> run_fold(HookPoint, Args, Acc) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
do_run_fold(lookup(HookPoint), Args, Acc). do_run_fold(lookup(HookPoint), Args, Acc).
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) -> do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
@ -230,6 +234,7 @@ lookup(HookPoint) ->
init([]) -> init([]) ->
ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
ok = emqx_hookpoints:register_hookpoints(),
{ok, #{}}. {ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) -> handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->

View File

@ -496,7 +496,7 @@ stats(Session) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
on_delivery_completed(Msg, #{clientid := ClientId}, Session) -> on_delivery_completed(Msg, #{clientid := ClientId}, Session) ->
emqx:run_hook( emqx_hooks:run(
'delivery.completed', 'delivery.completed',
[ [
Msg, Msg,

View File

@ -95,59 +95,6 @@ t_emqx_pubsub_api(_) ->
ct:sleep(20), ct:sleep(20),
?assertEqual([], emqx:topics()). ?assertEqual([], emqx:topics()).
t_hook_unhook(_) ->
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0),
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0),
?assertEqual(
{error, already_exists},
emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0)
),
ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun1}),
ok = emqx_hooks:del(test_hook, {?MODULE, hook_fun2}),
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun8, []}, 8),
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun2, []}, 2),
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun10, []}, 10),
ok = emqx_hooks:add(emqx_hook, {?MODULE, hook_fun9, []}, 9),
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun2, []}),
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun8, []}),
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun9, []}),
ok = emqx_hooks:del(emqx_hook, {?MODULE, hook_fun10, []}).
t_run_hook(_) ->
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0),
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0),
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0),
[r5, r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []),
[] = emqx:run_fold_hook(unknown_hook, [], []),
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0),
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0),
[r10] = emqx:run_fold_hook(foldl_hook2, [arg], []),
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
{error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0),
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0),
ok = emqx:run_hook(foreach_hook, [arg]),
ok = emqx_hooks:add(
foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []}
),
%% filter passed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])),
%% filter failed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])),
ok = emqx_hooks:add(
foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]}
),
ok = emqx_hooks:add(
foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]}
),
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)),
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)).
t_cluster_nodes(_) -> t_cluster_nodes(_) ->
Expected = [node()], Expected = [node()],
?assertEqual(Expected, emqx:running_nodes()), ?assertEqual(Expected, emqx:running_nodes()),
@ -169,36 +116,3 @@ t_get_config_default_2(_) ->
NonAtomPathRes = emqx:get_config(["doesnotexist", <<"db_backend">>], undefined), NonAtomPathRes = emqx:get_config(["doesnotexist", <<"db_backend">>], undefined),
?assertEqual(undefined, NonAtomPathRes), ?assertEqual(undefined, NonAtomPathRes),
?assertEqual(undefined, AtomPathRes). ?assertEqual(undefined, AtomPathRes).
%%--------------------------------------------------------------------
%% Hook fun
%%--------------------------------------------------------------------
hook_fun1(arg) -> ok;
hook_fun1(_) -> error.
hook_fun2(arg) -> ok;
hook_fun2(_) -> error.
hook_fun2(_, Acc) -> {ok, Acc + 1}.
hook_fun2_1(_, Acc) -> {ok, Acc + 1}.
hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r4 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {ok, [r5 | Acc]}.
hook_fun6(arg, initArg) -> ok.
hook_fun7(arg, initArg) -> ok.
hook_fun8(arg, initArg) -> ok.
hook_fun9(arg, Acc) -> {stop, [r9 | Acc]}.
hook_fun10(arg, Acc) -> {stop, [r10 | Acc]}.
hook_filter1(arg) -> true;
hook_filter1(_) -> false.
hook_filter2(arg, _Acc, init_arg) -> true;
hook_filter2(_, _Acc, _IntArg) -> false.
hook_filter2_1(arg, _Acc, init_arg) -> true;
hook_filter2_1(arg1, _Acc, init_arg) -> true;
hook_filter2_1(_, _Acc, _IntArg) -> false.

View File

@ -38,7 +38,7 @@ t_trans(_) ->
ok = emqx_cm_locker:trans(undefined, fun(_) -> ok end, []), ok = emqx_cm_locker:trans(undefined, fun(_) -> ok end, []),
ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end). ok = emqx_cm_locker:trans(<<"clientid">>, fun(_) -> ok end).
t_lock_unlocak(_) -> t_lock_unlock(_) ->
{true, _} = emqx_cm_locker:lock(<<"clientid">>), {true, _} = emqx_cm_locker:lock(<<"clientid">>),
{true, _} = emqx_cm_locker:lock(<<"clientid">>), {true, _} = emqx_cm_locker:lock(<<"clientid">>),
{true, _} = emqx_cm_locker:unlock(<<"clientid">>), {true, _} = emqx_cm_locker:unlock(<<"clientid">>),

View File

@ -25,26 +25,24 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx_hookpoints:register_hookpoints(
[
test_hook,
emqx_hook,
foldl_hook,
foldl_hook2,
foreach_hook,
foreach_filter1_hook,
foldl_filter2_hook
]
),
Config. Config.
end_per_testcase(_) -> end_per_testcase(_) ->
ok = emqx_hookpoints:register_hookpoints(),
catch emqx_hooks:stop(). catch emqx_hooks:stop().
% t_lookup(_) ->
% error('TODO').
% t_run_fold(_) ->
% error('TODO').
% t_run(_) ->
% error('TODO').
% t_del(_) ->
% error('TODO').
% t_add(_) ->
% error('TODO').
t_add_hook_order(_) -> t_add_hook_order(_) ->
?assert( ?assert(
proper:quickcheck( proper:quickcheck(
@ -65,7 +63,8 @@ add_hook_order_prop() ->
Hooks, Hooks,
hooks(), hooks(),
try try
{ok, _} = emqx_hooks:start_link(), _ = emqx_hooks:start_link(),
ok = emqx_hookpoints:register_hookpoints([prop_hook]),
[ok = emqx_hooks:add(prop_hook, {M, F, []}, Prio) || {Prio, M, F} <- Hooks], [ok = emqx_hooks:add(prop_hook, {M, F, []}, Prio) || {Prio, M, F} <- Hooks],
Callbacks = emqx_hooks:lookup(prop_hook), Callbacks = emqx_hooks:lookup(prop_hook),
Order = [{Prio, M, F} || {callback, {M, F, _}, _Filter, Prio} <- Callbacks], Order = [{Prio, M, F} || {callback, {M, F, _}, _Filter, Prio} <- Callbacks],
@ -92,7 +91,6 @@ hooks() ->
). ).
t_add_put_del_hook(_) -> t_add_put_del_hook(_) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0), ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun1, []}, 0),
ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0), ok = emqx_hooks:add(test_hook, {?MODULE, hook_fun2, []}, 0),
?assertEqual( ?assertEqual(
@ -150,31 +148,40 @@ t_add_put_del_hook(_) ->
?assertEqual([], emqx_hooks:lookup(emqx_hook)). ?assertEqual([], emqx_hooks:lookup(emqx_hook)).
t_run_hooks(_) -> t_run_hooks(_) ->
{ok, _} = emqx_hooks:start_link(),
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0), ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun3, [init]}, 0),
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0), ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun4, [init]}, 0),
ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0), ok = emqx_hooks:add(foldl_hook, {?MODULE, hook_fun5, [init]}, 0),
[r5, r4] = emqx:run_fold_hook(foldl_hook, [arg1, arg2], []), [r5, r4] = emqx_hooks:run_fold(foldl_hook, [arg1, arg2], []),
[] = emqx:run_fold_hook(unknown_hook, [], []),
?assertException(
error,
{invalid_hookpoint, unknown_hook},
emqx_hooks:run_fold(unknown_hook, [], [])
),
?assertException(
error,
{invalid_hookpoint, unknown_hook},
emqx_hooks:add(unknown_hook, {?MODULE, hook_fun5, [init]}, 0)
),
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0), ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun9, []}, 0),
ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0), ok = emqx_hooks:add(foldl_hook2, {?MODULE, hook_fun10, []}, 0),
%% Note: 10 is _less_ than 9 per lexicographic order %% Note: 10 is _less_ than 9 per lexicographic order
[r10] = emqx:run_fold_hook(foldl_hook2, [arg], []), [r10] = emqx_hooks:run_fold(foldl_hook2, [arg], []),
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0), ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
{error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0), {error, already_exists} = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun6, [initArg]}, 0),
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0), ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun7, [initArg]}, 0),
ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0), ok = emqx_hooks:add(foreach_hook, {?MODULE, hook_fun8, [initArg]}, 0),
ok = emqx:run_hook(foreach_hook, [arg]), ok = emqx_hooks:run(foreach_hook, [arg]),
ok = emqx_hooks:add( ok = emqx_hooks:add(
foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []} foreach_filter1_hook, {?MODULE, hook_fun1, []}, 0, {?MODULE, hook_filter1, []}
), ),
%% filter passed %% filter passed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg])), ?assertEqual(ok, emqx_hooks:run(foreach_filter1_hook, [arg])),
%% filter failed %% filter failed
?assertEqual(ok, emqx:run_hook(foreach_filter1_hook, [arg1])), ?assertEqual(ok, emqx_hooks:run(foreach_filter1_hook, [arg1])),
ok = emqx_hooks:add( ok = emqx_hooks:add(
foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]} foldl_filter2_hook, {?MODULE, hook_fun2, []}, 0, {?MODULE, hook_filter2, [init_arg]}
@ -182,11 +189,10 @@ t_run_hooks(_) ->
ok = emqx_hooks:add( ok = emqx_hooks:add(
foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]} foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, 0, {?MODULE, hook_filter2_1, [init_arg]}
), ),
?assertEqual(3, emqx:run_fold_hook(foldl_filter2_hook, [arg], 1)), ?assertEqual(3, emqx_hooks:run_fold(foldl_filter2_hook, [arg], 1)),
?assertEqual(2, emqx:run_fold_hook(foldl_filter2_hook, [arg1], 1)). ?assertEqual(2, emqx_hooks:run_fold(foldl_filter2_hook, [arg1], 1)).
t_uncovered_func(_) -> t_uncovered_func(_) ->
{ok, _} = emqx_hooks:start_link(),
Pid = erlang:whereis(emqx_hooks), Pid = erlang:whereis(emqx_hooks),
gen_server:call(Pid, test), gen_server:call(Pid, test),
gen_server:cast(Pid, test), gen_server:cast(Pid, test),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_gcp_pubsub, [ {application, emqx_bridge_gcp_pubsub, [
{description, "EMQX Enterprise GCP Pub/Sub Bridge"}, {description, "EMQX Enterprise GCP Pub/Sub Bridge"},
{vsn, "0.1.8"}, {vsn, "0.1.9"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -749,7 +749,7 @@ handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Mess
Payload = render(FullMessage, PayloadTemplate), Payload = render(FullMessage, PayloadTemplate),
MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload), MQTTMessage = emqx_message:make(InstanceId, MQTTQoS, MQTTTopic, Payload),
_ = emqx:publish(MQTTMessage), _ = emqx:publish(MQTTMessage),
emqx:run_hook(Hookpoint, [FullMessage]), emqx_hooks:run(Hookpoint, [FullMessage]),
emqx_resource_metrics:received_inc(InstanceId), emqx_resource_metrics:received_inc(InstanceId),
ok ok
end end

View File

@ -252,7 +252,7 @@ do_handle_message(Message, State) ->
MQTTTopic = render(FullMessage, MQTTTopicTemplate), MQTTTopic = render(FullMessage, MQTTTopicTemplate),
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
_ = emqx:publish(MQTTMessage), _ = emqx:publish(MQTTMessage),
emqx:run_hook(Hookpoint, [FullMessage]), emqx_hooks:run(Hookpoint, [FullMessage]),
emqx_resource_metrics:received_inc(ResourceId), emqx_resource_metrics:received_inc(ResourceId),
%% note: just `ack' does not commit the offset to the %% note: just `ack' does not commit the offset to the
%% kafka consumer group. %% kafka consumer group.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [ {application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"}, {description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -41,7 +41,7 @@
%% if the bridge received msgs from the remote broker. %% if the bridge received msgs from the remote broker.
on_message_received(Msg, HookPoint, ResId) -> on_message_received(Msg, HookPoint, ResId) ->
emqx_resource_metrics:received_inc(ResId), emqx_resource_metrics:received_inc(ResId),
emqx:run_hook(HookPoint, [Msg]). emqx_hooks:run(HookPoint, [Msg]).
%% =================================================================== %% ===================================================================
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -286,7 +286,7 @@ handle_call(
ok = emqx_broker:unsubscribe(MountedTopic), ok = emqx_broker:unsubscribe(MountedTopic),
_ = run_hooks( _ = run_hooks(
Ctx, Ctx,
'session.unsubscribe', 'session.unsubscribed',
[ClientInfo, MountedTopic, #{}] [ClientInfo, MountedTopic, #{}]
), ),

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_lwm2m, [ {application, emqx_gateway_lwm2m, [
{description, "LwM2M Gateway"}, {description, "LwM2M Gateway"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]}, {applications, [kernel, stdlib, emqx, emqx_gateway, emqx_gateway_coap]},
{env, []}, {env, []},

View File

@ -264,7 +264,7 @@ handle_call(
ok = emqx_broker:unsubscribe(MountedTopic), ok = emqx_broker:unsubscribe(MountedTopic),
_ = run_hooks( _ = run_hooks(
Ctx, Ctx,
'session.unsubscribe', 'session.unsubscribed',
[ClientInfo, MountedTopic, #{}] [ClientInfo, MountedTopic, #{}]
), ),
%% modify session state %% modify session state
@ -331,7 +331,7 @@ terminate(Reason, #channel{
session = Session session = Session
}) -> }) ->
MountedTopic = emqx_lwm2m_session:on_close(Session), MountedTopic = emqx_lwm2m_session:on_close(Session),
_ = run_hooks(Ctx, 'session.unsubscribe', [ClientInfo, MountedTopic, #{}]), _ = run_hooks(Ctx, 'session.unsubscribed', [ClientInfo, MountedTopic, #{}]),
run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]). run_hooks(Ctx, 'session.terminated', [ClientInfo, Reason, Session]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_stomp, [ {application, emqx_gateway_stomp, [
{description, "Stomp Gateway"}, {description, "Stomp Gateway"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -549,7 +549,7 @@ handle_in(
ok = emqx_broker:unsubscribe(MountedTopic), ok = emqx_broker:unsubscribe(MountedTopic),
_ = run_hooks( _ = run_hooks(
Ctx, Ctx,
'session.unsubscribe', 'session.unsubscribed',
[ClientInfo, MountedTopic, #{}] [ClientInfo, MountedTopic, #{}]
), ),
{ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}}; {ok, Channel#channel{subscriptions = lists:keydelete(SubId, 1, Subs)}};
@ -869,7 +869,7 @@ handle_call(
ok = emqx_broker:unsubscribe(MountedTopic), ok = emqx_broker:unsubscribe(MountedTopic),
_ = run_hooks( _ = run_hooks(
Ctx, Ctx,
'session.unsubscribe', 'session.unsubscribed',
[ClientInfo, MountedTopic, #{}] [ClientInfo, MountedTopic, #{}]
), ),
reply( reply(

View File

@ -21,8 +21,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(EMQX_PLUGIN_TEMPLATE_NAME, "emqx_plugin_template"). -define(EMQX_PLUGIN_TEMPLATE_NAME, "my_emqx_plugin").
-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0.0"). -define(EMQX_PLUGIN_TEMPLATE_VSN, "5.1.0").
-define(PACKAGE_SUFFIX, ".tar.gz"). -define(PACKAGE_SUFFIX, ".tar.gz").
-define(CLUSTER_API_SERVER(PORT), ("http://127.0.0.1:" ++ (integer_to_list(PORT)))). -define(CLUSTER_API_SERVER(PORT), ("http://127.0.0.1:" ++ (integer_to_list(PORT)))).

View File

@ -22,12 +22,13 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, "emqx_plugin_template"). -define(EMQX_PLUGIN_APP_NAME, my_emqx_plugin).
-define(EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, atom_to_list(?EMQX_PLUGIN_APP_NAME)).
-define(EMQX_PLUGIN_TEMPLATE_URL, -define(EMQX_PLUGIN_TEMPLATE_URL,
"https://github.com/emqx/emqx-plugin-template/releases/download/" "https://github.com/emqx/emqx-plugin-template/releases/download/"
). ).
-define(EMQX_PLUGIN_TEMPLATE_VSN, "5.0.0"). -define(EMQX_PLUGIN_TEMPLATE_VSN, "5.1.0").
-define(EMQX_PLUGIN_TEMPLATE_TAG, "5.0.0"). -define(EMQX_PLUGIN_TEMPLATE_TAG, "5.1.0").
-define(EMQX_ELIXIR_PLUGIN_TEMPLATE_RELEASE_NAME, "elixir_plugin_template"). -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_RELEASE_NAME, "elixir_plugin_template").
-define(EMQX_ELIXIR_PLUGIN_TEMPLATE_URL, -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_URL,
"https://github.com/emqx/emqx-elixir-plugin/releases/download/" "https://github.com/emqx/emqx-elixir-plugin/releases/download/"
@ -153,11 +154,11 @@ t_demo_install_start_stop_uninstall(Config) ->
?assertEqual([maps:without([readme], Info)], emqx_plugins:list()), ?assertEqual([maps:without([readme], Info)], emqx_plugins:list()),
%% start %% start
ok = emqx_plugins:ensure_started(NameVsn), ok = emqx_plugins:ensure_started(NameVsn),
ok = assert_app_running(emqx_plugin_template, true), ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
ok = assert_app_running(map_sets, true), ok = assert_app_running(map_sets, true),
%% start (idempotent) %% start (idempotent)
ok = emqx_plugins:ensure_started(bin(NameVsn)), ok = emqx_plugins:ensure_started(bin(NameVsn)),
ok = assert_app_running(emqx_plugin_template, true), ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
ok = assert_app_running(map_sets, true), ok = assert_app_running(map_sets, true),
%% running app can not be un-installed %% running app can not be un-installed
@ -168,11 +169,11 @@ t_demo_install_start_stop_uninstall(Config) ->
%% stop %% stop
ok = emqx_plugins:ensure_stopped(NameVsn), ok = emqx_plugins:ensure_stopped(NameVsn),
ok = assert_app_running(emqx_plugin_template, false), ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
ok = assert_app_running(map_sets, false), ok = assert_app_running(map_sets, false),
%% stop (idempotent) %% stop (idempotent)
ok = emqx_plugins:ensure_stopped(bin(NameVsn)), ok = emqx_plugins:ensure_stopped(bin(NameVsn)),
ok = assert_app_running(emqx_plugin_template, false), ok = assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
ok = assert_app_running(map_sets, false), ok = assert_app_running(map_sets, false),
%% still listed after stopped %% still listed after stopped
ReleaseNameBin = list_to_binary(ReleaseName), ReleaseNameBin = list_to_binary(ReleaseName),
@ -257,9 +258,9 @@ t_start_restart_and_stop(Config) ->
%% fake a disabled plugin in config %% fake a disabled plugin in config
ok = ensure_state(Bar2, front, false), ok = ensure_state(Bar2, front, false),
assert_app_running(emqx_plugin_template, false), assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
ok = emqx_plugins:ensure_started(), ok = emqx_plugins:ensure_started(),
assert_app_running(emqx_plugin_template, true), assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
%% fake enable bar-2 %% fake enable bar-2
ok = ensure_state(Bar2, rear, true), ok = ensure_state(Bar2, rear, true),
@ -269,18 +270,18 @@ t_start_restart_and_stop(Config) ->
emqx_plugins:ensure_started() emqx_plugins:ensure_started()
), ),
%% but demo plugin should still be running %% but demo plugin should still be running
assert_app_running(emqx_plugin_template, true), assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
%% stop all %% stop all
ok = emqx_plugins:ensure_stopped(), ok = emqx_plugins:ensure_stopped(),
assert_app_running(emqx_plugin_template, false), assert_app_running(?EMQX_PLUGIN_APP_NAME, false),
ok = ensure_state(Bar2, rear, false), ok = ensure_state(Bar2, rear, false),
ok = emqx_plugins:restart(NameVsn), ok = emqx_plugins:restart(NameVsn),
assert_app_running(emqx_plugin_template, true), assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
%% repeat %% repeat
ok = emqx_plugins:restart(NameVsn), ok = emqx_plugins:restart(NameVsn),
assert_app_running(emqx_plugin_template, true), assert_app_running(?EMQX_PLUGIN_APP_NAME, true),
ok = emqx_plugins:ensure_stopped(), ok = emqx_plugins:ensure_stopped(),
ok = emqx_plugins:ensure_disabled(NameVsn), ok = emqx_plugins:ensure_disabled(NameVsn),