From 111b66121cad3f3b7d29e8bd1c131727eb45931a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 13 Mar 2019 22:24:19 +0800 Subject: [PATCH] Support TLS/DTLS PSK (#2297) * Improve filter functions for emqx-hook * Add PSK hook * Reset hook args for filter functions --- etc/emqx.conf | 30 +++++++++++++++--- priv/emqx.schema | 64 ++++++++++++++++++++++++++++++++++----- src/emqx_hooks.erl | 47 ++++++++++++++++------------ src/emqx_psk.erl | 36 ++++++++++++++++++++++ test/emqx_hooks_SUITE.erl | 55 +++++++++++++++++++++++---------- 5 files changed, 186 insertions(+), 46 deletions(-) create mode 100644 src/emqx_psk.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index f119b05ea..cb2d0bb00 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1119,6 +1119,12 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## Value: Ciphers listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA +## Ciphers for TLS PSK. +## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA + ## SSL parameter renegotiation is a feature that allows a client and a server ## to renegotiate the parameters of the SSL connection on the fly. ## RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, @@ -1474,7 +1480,13 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## See: listener.ssl.$name.ciphers ## ## Value: Ciphers -## listener.wss.external.ciphers = +listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA + +## Ciphers for TLS PSK. +## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## See: listener.ssl.$name.secure_renegotiate ## @@ -1545,8 +1557,6 @@ listener.wss.external.send_timeout_close = on ## Value: true | false ## listener.wss.external.nodelay = true -listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA - ##-------------------------------------------------------------------- ## Bridges ##-------------------------------------------------------------------- @@ -1627,7 +1637,13 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## SSL Ciphers used by the bridge. ## ## Value: String -## bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +#bridge.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## Ciphers for TLS PSK. +## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#bridge.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## Ping interval of a down bridge. ## @@ -1787,6 +1803,12 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Value: String ## bridge.azure.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 +## Ciphers for TLS PSK. +## Note that 'bridge.*.ciphers' and 'bridge.*.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#bridge.azure.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA + ## Ping interval of a down bridge. ## ## Value: Duration diff --git a/priv/emqx.schema b/priv/emqx.schema index 3cd8a5032..caa0a2258 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1086,6 +1086,10 @@ end}. {datatype, string} ]}. +{mapping, "listener.ssl.$name.psk_ciphers", "emqx.listeners", [ + {datatype, string} +]}. + {mapping, "listener.ssl.$name.handshake_timeout", "emqx.listeners", [ {default, "15s"}, {datatype, {duration, ms}} @@ -1348,6 +1352,10 @@ end}. {datatype, string} ]}. +{mapping, "listener.wss.$name.psk_ciphers", "emqx.listeners", [ + {datatype, string} +]}. + {mapping, "listener.wss.$name.keyfile", "emqx.listeners", [ {datatype, string} ]}. @@ -1437,14 +1445,40 @@ end}. end, SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - + MapPSKCiphers = fun(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers) + end, SslOpts = fun(Prefix) -> - Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of + Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of undefined -> undefined; L -> [list_to_atom(V) || V <- L] end, + TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined), + PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined), + Ciphers = + case {TLSCiphers, PSKCiphers} of + {undefined, undefined} -> + cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent"); + {TLSCiphers, undefined} -> + SplitFun(TLSCiphers); + {undefined, PSKCiphers} -> + MapPSKCiphers(SplitFun(PSKCiphers)); + {_TLSCiphers, _PSKCiphers} -> + cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time") + end, + UserLookupFun = + case PSKCiphers of + undefined -> undefined; + _ -> {fun emqx_psk:lookup/3, <<>>} + end, Filter([{versions, Versions}, - {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, + {ciphers, Ciphers}, + {user_lookup_fun, UserLookupFun}, {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, @@ -1544,6 +1578,10 @@ end}. {datatype, string} ]}. +{mapping, "bridge.$name.psk_ciphers", "emqx.bridges", [ + {datatype, string} +]}. + {mapping, "bridge.$name.keepalive", "emqx.bridges", [ {default, "10s"}, {datatype, {duration, ms}} @@ -1599,22 +1637,34 @@ end}. ]}. {translation, "emqx.bridges", fun(Conf) -> + MapPSKCiphers = fun(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers) + end, + Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, IsSsl = fun(cacertfile) -> true; (certfile) -> true; (keyfile) -> true; (ciphers) -> true; + (psk_ciphers) -> true; (tls_versions) -> true; (_Opt) -> false end, Parse = fun(tls_versions, Vers) -> - {versions, [list_to_atom(S) || S <- Split(Vers)]}; + [{versions, [list_to_atom(S) || S <- Split(Vers)]}]; (ciphers, Ciphers) -> - {ciphers, Split(Ciphers)}; + [{ciphers, Split(Ciphers)}]; + (psk_ciphers, Ciphers) -> + [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}]; (Opt, Val) -> - {Opt, Val} + [{Opt, Val}] end, Merge = fun(forwards, Val, Opts) -> @@ -1622,7 +1672,7 @@ end}. (Opt, Val, Opts) -> case IsSsl(Opt) of true -> - SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], + SslOpts = Parse(Opt, Val) ++ [proplists:get_value(ssl_opts, Opts, [])], lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); false -> [{Opt, Val}|Opts] diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index b3ab3aa74..42989ec0c 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -28,6 +28,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% Multiple callbacks can be registered on a hookpoint. +%% The execution order depends on the priority value: +%% - Callbacks with greater priority values will be run before +%% the ones with lower priority values. e.g. A Callback with +%% priority = 2 precedes the callback with priority = 1. +%% - The execution order is the adding order of callbacks if they have +%% equal priority values. + -type(hookpoint() :: atom()). -type(action() :: function() | mfa()). -type(filter() :: function() | mfa()). @@ -56,14 +64,14 @@ stop() -> %%------------------------------------------------------------------------------ %% @doc Register a callback --spec(add(hookpoint(), action() | #callback{}) -> emqx_types:ok_or_error(already_exists)). +-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)). add(HookPoint, Callback) when is_record(Callback, callback) -> gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> add(HookPoint, #callback{action = Action, priority = 0}). -spec(add(hookpoint(), action(), filter() | integer() | list()) - -> emqx_types:ok_or_error(already_exists)). + -> ok_or_error(already_exists)). add(HookPoint, Action, InitArgs) when is_function(Action), is_list(InitArgs) -> add(HookPoint, #callback{action = {Action, InitArgs}, priority = 0}); add(HookPoint, Action, Filter) when is_function(Filter); is_tuple(Filter) -> @@ -72,8 +80,8 @@ add(HookPoint, Action, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, priority = Priority}). -spec(add(hookpoint(), action(), filter(), integer()) - -> emqx_types:ok_or_error(already_exists)). -add(HookPoint, Action, Filter, Priority) -> + -> ok_or_error(already_exists)). +add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). %% @doc Unregister a callback. @@ -87,17 +95,17 @@ run(HookPoint, Args) -> run_(lookup(HookPoint), Args). %% @doc Run hooks with Accumulator. --spec(run(atom(), list(Arg :: any()), any()) -> any()). +-spec(run(atom(), list(Arg::any()), Acc::any()) -> {ok, Acc::any()} | {stop, Acc::any()}). run(HookPoint, Args, Acc) -> run_(lookup(HookPoint), Args, Acc). %% @private run_([#callback{action = Action, filter = Filter} | Callbacks], Args) -> - case filtered(Filter, Args) orelse execute(Action, Args) of - true -> run_(Callbacks, Args); - ok -> run_(Callbacks, Args); - stop -> stop; - _Any -> run_(Callbacks, Args) + case filter_passed(Filter, Args) andalso execute(Action, Args) of + false -> run_(Callbacks, Args); + ok -> run_(Callbacks, Args); + stop -> stop; + _Any -> run_(Callbacks, Args) end; run_([], _Args) -> ok. @@ -105,8 +113,8 @@ run_([], _Args) -> %% @private run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> Args1 = Args ++ [Acc], - case filtered(Filter, Args1) orelse execute(Action, Args1) of - true -> run_(Callbacks, Args, Acc); + case filter_passed(Filter, Args1) andalso execute(Action, Args1) of + false -> run_(Callbacks, Args, Acc); ok -> run_(Callbacks, Args, Acc); {ok, NewAcc} -> run_(Callbacks, Args, NewAcc); stop -> {stop, Acc}; @@ -116,13 +124,14 @@ run_([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) -> run_([], _Args, Acc) -> {ok, Acc}. -filtered(undefined, _Args) -> - false; -filtered(Filter, Args) -> +-spec(filter_passed(filter(), Args::term()) -> true | false). +filter_passed(undefined, _Args) -> true; +filter_passed(Filter, Args) -> execute(Filter, Args). -execute(Action, Args) when is_function(Action) -> - erlang:apply(Action, Args); +%% @doc execute a function. +execute(Fun, Args) when is_function(Fun) -> + erlang:apply(Fun, Args); execute({Fun, InitArgs}, Args) when is_function(Fun) -> erlang:apply(Fun, Args ++ InitArgs); execute({M, F, A}, Args) -> @@ -142,11 +151,11 @@ lookup(HookPoint) -> %%------------------------------------------------------------------------------ init([]) -> - ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}, protected]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> - Reply = case lists:keymember(Action, 2, Callbacks = lookup(HookPoint)) of + Reply = case lists:keymember(Action, #callback.action, Callbacks = lookup(HookPoint)) of true -> {error, already_exists}; false -> diff --git a/src/emqx_psk.erl b/src/emqx_psk.erl new file mode 100644 index 000000000..8062274ce --- /dev/null +++ b/src/emqx_psk.erl @@ -0,0 +1,36 @@ +%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_psk). + +-include("logger.hrl"). + +%% SSL PSK Callbacks +-export([lookup/3]). + +-define(TAB, ?MODULE). + +-type psk_identity() :: string(). +-type psk_user_state() :: term(). + +-spec lookup(psk, psk_identity(), psk_user_state()) -> {ok, SharedSecret :: binary()} | error. +lookup(psk, ClientPSKID, UserState) -> + try emqx_hooks:run('tls_handshake.psk_lookup', [ClientPSKID], UserState) of + {ok, SharedSecret} -> {ok, SharedSecret}; + {stop, SharedSecret} -> {ok, SharedSecret} + catch + Except:Error:Stacktrace -> + ?LOG(error, "Lookup PSK failed, ~p: ~p", [{Except,Error}, Stacktrace]), + error + end. \ No newline at end of file diff --git a/test/emqx_hooks_SUITE.erl b/test/emqx_hooks_SUITE.erl index 864a3ab96..5fb7ed461 100644 --- a/test/emqx_hooks_SUITE.erl +++ b/test/emqx_hooks_SUITE.erl @@ -34,17 +34,23 @@ add_delete_hook(_) -> ?assertEqual(Callbacks, emqx_hooks:lookup(test_hook)), ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun1/1), ok = emqx:unhook(test_hook, fun ?MODULE:hook_fun2/1), - timer:sleep(1000), + timer:sleep(200), ?assertEqual([], emqx_hooks:lookup(test_hook)), - ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 8), - ok = emqx:hook(emqx_hook, {?MODULE, hook_fun1, []}, 9), - Callbacks2 = [{callback, {?MODULE, hook_fun1, []}, undefined, 9}, - {callback, {?MODULE, hook_fun2, []}, undefined, 8}], + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun8, []}, 8), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun2, []}, 2), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun10, []}, 10), + ok = emqx:hook(emqx_hook, {?MODULE, hook_fun9, []}, 9), + Callbacks2 = [{callback, {?MODULE, hook_fun10, []}, undefined, 10}, + {callback, {?MODULE, hook_fun9, []}, undefined, 9}, + {callback, {?MODULE, hook_fun8, []}, undefined, 8}, + {callback, {?MODULE, hook_fun2, []}, undefined, 2}], ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}), - ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2}), - timer:sleep(1000), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun8, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun9, []}), + ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun10, []}), + timer:sleep(200), ?assertEqual([], emqx_hooks:lookup(emqx_hook)), ok = emqx_hooks:stop(). @@ -67,16 +73,27 @@ run_hooks(_) -> ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}), {stop, []} = emqx:run_hooks(foldl_hook2, [arg], []), - ok = emqx:hook(filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0), - ok = emqx:run_hooks(filter1_hook, [arg]), + %% foreach hook always returns 'ok' or 'stop' + ok = emqx:hook(foreach_filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0), + ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg])), %% filter passed + ?assertEqual(ok, emqx:run_hooks(foreach_filter1_hook, [arg1])), %% filter failed - ok = emqx:hook(filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, []}), - {ok, []} = emqx:run_hooks(filter2_hook, [arg], []), + %% foldl hook always returns {'ok', Acc} or {'stop', Acc} + ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, [init_arg]}), + ok = emqx:hook(foldl_filter2_hook, {?MODULE, hook_fun2_1, []}, {?MODULE, hook_filter2_1, [init_arg]}), + ?assertEqual({ok, 3}, emqx:run_hooks(foldl_filter2_hook, [arg], 1)), + ?assertEqual({ok, 2}, emqx:run_hooks(foldl_filter2_hook, [arg1], 1)), ok = emqx_hooks:stop(). -hook_fun1([]) -> ok. -hook_fun2([]) -> {ok, []}. +hook_fun1(arg) -> ok; +hook_fun1(_) -> stop. + +hook_fun2(arg) -> ok; +hook_fun2(_) -> stop. + +hook_fun2(_, Acc) -> {ok, Acc + 1}. +hook_fun2_1(_, Acc) -> {ok, Acc + 1}. hook_fun3(arg1, arg2, _Acc, init) -> ok. hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. @@ -89,6 +106,12 @@ hook_fun8(arg, initArg) -> stop. hook_fun9(arg, _Acc) -> any. hook_fun10(arg, _Acc) -> stop. -hook_filter1(arg) -> true. -hook_filter2(arg, _Acc) -> true. +hook_filter1(arg) -> true; +hook_filter1(_) -> false. +hook_filter2(arg, _Acc, init_arg) -> true; +hook_filter2(_, _Acc, _IntArg) -> false. + +hook_filter2_1(arg, _Acc, init_arg) -> true; +hook_filter2_1(arg1, _Acc, init_arg) -> true; +hook_filter2_1(_, _Acc, _IntArg) -> false.