From d0f88b1af9aec7b5838278c7e94b443e042c397f Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Aug 2022 18:18:14 +0800 Subject: [PATCH 1/4] fix(exhook): ignore hook exection for `no_matched` topic For message hooks, if the topics do not match, the hook should continue to be executed. --- apps/emqx_exhook/src/emqx_exhook.erl | 47 +++++++++++++------- apps/emqx_exhook/src/emqx_exhook_handler.erl | 19 +++++--- apps/emqx_exhook/src/emqx_exhook_server.erl | 36 ++++++++------- 3 files changed, 64 insertions(+), 38 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index 551d0a126..ca8b37ec5 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -71,44 +71,59 @@ cast(Hookpoint, Req) -> cast(_, _, []) -> ok; cast(Hookpoint, Req, [ServerName|More]) -> - %% XXX: Need a real asynchronous running + %% TODO: Need a real asynchronous running _ = emqx_exhook_server:call(Hookpoint, Req, emqx_exhook_mngr:server(ServerName)), cast(Hookpoint, Req, More). -spec call_fold(atom(), term(), function()) -> {ok, term()} - | {stop, term()}. + | {stop, term()} + | ignore. call_fold(Hookpoint, Req, AccFun) -> FailedAction = emqx_exhook_mngr:get_request_failed_action(), ServerNames = emqx_exhook_mngr:running(), case ServerNames == [] andalso FailedAction == deny of true -> + ?LOG(warning, "No available Server for hook: ~p . Stop hook chain execution with `request_failed_action=deny`.", [Hookpoint]), {stop, deny_action_result(Hookpoint, Req)}; _ -> + %% `Req` (includede message.. etc.) as `InitAcc` for `emqx_hook` call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames) end. -call_fold(_, Req, _, _, []) -> - {ok, Req}; -call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) -> +-define(LOG_CALL_RESULT(ServerName, Res, Fmt), + ?LOG(debug, "ExHook server: ~p respond type: ~p. " ++ Fmt, [ServerName, Resp])). + +call_fold(_, Acc, _, _, []) -> + {ok, Acc}; +call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, [ServerName | More]) -> Server = emqx_exhook_mngr:server(ServerName), - case emqx_exhook_server:call(Hookpoint, Req, Server) of + case emqx_exhook_server:call(Hookpoint, Acc, Server) of + ignore -> + %% Server is not mounted / or does not care about this hook + %% See emqx_exhook_server:need_call/3 + ignore; {ok, Resp} -> - case AccFun(Req, Resp) of - {stop, NReq} -> - {stop, NReq}; - {ok, NReq} -> - call_fold(Hookpoint, NReq, FailedAction, AccFun, More); - _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, More) + case MergeAccFun(Acc, Resp) of + {stop, NewAcc} -> + ?LOG_CALL_RESULT(ServerName, "'STOP_AND_RETURN'", "Stop hook chain execution"), + {stop, NewAcc}; + {ok, NewAccAsNReq} -> + ?LOG_CALL_RESULT(ServerName, "'CONTINUE'", "Continue calling remaining ExHook servers."), + call_fold(Hookpoint, NewAccAsNReq, FailedAction, MergeAccFun, More); + ignore -> + ?LOG_CALL_RESULT(ServerName, "'IGNORE'", "Continue calling remaining ExHook servers."), + call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More) end; - _ -> + {error, _Reason} -> case FailedAction of deny -> - {stop, deny_action_result(Hookpoint, Req)}; + ?LOG(error, "Call server: ~p for hook: ~p failed. Stop hook chain execution with `request_failed_action=deny`.", + [ServerName, Hookpoint]), + {stop, deny_action_result(Hookpoint, Acc)}; _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, More) + call_fold(Hookpoint, Acc, FailedAction, MergeAccFun, More) end end. diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 1809d81ee..6ec21e033 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -62,6 +62,9 @@ , call_fold/3 ]). +-define(STOP_OR_OK(Res), + (Res =:= ok orelse Res =:= stop)). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -102,10 +105,11 @@ on_client_authenticate(ClientInfo, AuthResult) -> case call_fold('client.authenticate', Req, fun merge_responsed_bool/2) of - {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> + {StopOrOk, #{result := Result0}} + when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) -> Result = case Result0 of true -> success; _ -> not_authorized end, {StopOrOk, AuthResult#{auth_result => Result, anonymous => false}}; - _ -> + ignore -> {ok, AuthResult} end. @@ -122,10 +126,11 @@ on_client_check_acl(ClientInfo, PubSub, Topic, Result) -> }, case call_fold('client.check_acl', Req, fun merge_responsed_bool/2) of - {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> + {StopOrOk, #{result := Result0}} + when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) -> NResult = case Result0 of true -> allow; _ -> deny end, {StopOrOk, NResult}; - _ -> {ok, Result} + ignore -> {ok, Result} end. on_client_subscribe(ClientInfo, Props, TopicFilters) -> @@ -190,9 +195,11 @@ on_message_publish(Message) -> Req = #{message => message(Message)}, case call_fold('message.publish', Req, fun emqx_exhook_handler:merge_responsed_message/2) of - {StopOrOk, #{message := NMessage}} -> + {StopOrOk, #{message := NMessage}} + when ?STOP_OR_OK(StopOrOk) -> {StopOrOk, assign_to_message(NMessage, Message)}; - _ -> {ok, Message} + ignore -> + {ok, Message} end. on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 5ce602535..9a8468700 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -240,22 +240,12 @@ name(#server{name = Name}) -> | {error, term()}. call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, hookspec = Hooks, prefix = Prefix}) -> - GrpcFunc = hk2func(Hookpoint), - case maps:get(Hookpoint, Hooks, undefined) of - undefined -> ignore; - Opts -> - NeedCall = case lists:member(Hookpoint, message_hooks()) of - false -> true; - _ -> - #{message := #{topic := Topic}} = Req, - match_topic_filter(Topic, maps:get(topics, Opts, [])) - end, - case NeedCall of - false -> ignore; - _ -> - inc_metrics(Prefix, Hookpoint), - do_call(ChannName, GrpcFunc, Req, ReqOpts) - end + case need_call(Hookpoint, Req, Hooks) of + true -> + inc_metrics(Prefix, Hookpoint), + do_call(ChannName, hk2func(Hookpoint), Req, ReqOpts); + false -> + ignore end. %% @private @@ -266,6 +256,20 @@ inc_metrics(IncFun, Name) when is_function(IncFun) -> inc_metrics(Prefix, Name) when is_list(Prefix) -> emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). +need_call(Hookpoint, Req, Hooks) -> + case maps:get(Hookpoint, Hooks, undefined) of + undefined -> + false; %% Hookpoint is not mounted on this server + Opts -> + case lists:member(Hookpoint, message_hooks()) of + false -> + true; + _ -> + #{message := #{topic := Topic}} = Req, + match_topic_filter(Topic, maps:get(topics, Opts, [])) + end + end. + -compile({inline, [match_topic_filter/2]}). match_topic_filter(_, []) -> true; From b083a1cd2d7793ad7be1d7f53dd3038117e35560 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Thu, 25 Aug 2022 18:31:14 +0800 Subject: [PATCH 2/4] chore: appup.src and CHANGES.md --- CHANGES-4.3.md | 10 ++-- apps/emqx_exhook/src/emqx_exhook.appup.src | 55 +++++++++++----------- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 2c7fce55f..42148dc6a 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -14,13 +14,14 @@ File format: ### Enhancements -- Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654). +- Improve error message for LwM2M plugin when object ID is not valid. [#8654](https://github.com/emqx/emqx/pull/8654). - Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671) -- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597) +- Add node evacuation and cluster rebalancing features. [#8597](https://github.com/emqx/emqx/pull/8597) - Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737) +- Increases the latency interval for MQTT Bridge test connections to improve compatibility in high-latency environments. [#8745](https://github.com/emqx/emqx/pull/8745) - Close ExProto client process immediately if it's keepalive timeouted. [#8725](https://github.com/emqx/emqx/pull/8725) - Upgrade grpc-erl driver to 0.6.7 to support batch operation in sending stream. [#8725](https://github.com/emqx/emqx/pull/8725) -- Improved jwt authentication module initialization process.[#8736](https://github.com/emqx/emqx/pull/8736) +- Improved jwt authentication module initialization process. [#8736](https://github.com/emqx/emqx/pull/8736) ### Bug fixes @@ -30,7 +31,8 @@ File format: The `foo` variable is a null value, so `clientid != foo` should be evaluated as true. - Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655) - Add an idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628) -- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](//github.com/emqx/emqx/pull/8725) +- Fix ExHook can't be un-hooked if the grpc service stop first. [#8725](https://github.com/emqx/emqx/pull/8725) +- Fix the problem that ExHook cannot continue hook chains execution for mismatched topics. [#8807](https://github.com/emqx/emqx/pull/8807) - Fix GET `/listeners/` crashes when listener is not ready. [#8752](https://github.com/emqx/emqx/pull/8752) diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index eada3146e..8aec1b9d4 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,30 +1,31 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.3\\.[5-6]">>, - [{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, - {"4.3.4", - [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, - {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, - {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, - {<<".*">>,[]}], - [{<<"4\\.3\\.[5-6]">>, - [{load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, - {"4.3.5", - [{load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]}, - {"4.3.4", - [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, - {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, - {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, - {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, - {<<".*">>,[]}]}. + [{<<"4\\.3\\.[5-6]">>, + [{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}]}, + {"4.3.4", + [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, + {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, + {<<".*">>,[]}], + [{<<"4\\.3\\.[5-6]">>, + [{load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_mngr,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}]}, + {"4.3.4", + [{load_module,emqx_exhook_sup,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_server,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook_pb,brutal_purge,soft_purge,[]}, + {load_module,emqx_exhook,brutal_purge,soft_purge,[]}, + {update,emqx_exhook_mngr,{advanced,["4.3.4"]}}]}, + {<<"4\\.3\\.[0-3]">>,[{restart_application,emqx_exhook}]}, + {<<".*">>,[]}]}. From 2657b78c442d86e852769f83792f2f2b7fce4b77 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Aug 2022 03:15:49 +0800 Subject: [PATCH 3/4] test(exhook): ExHook message hooks with topic filters --- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 65 +++++++++++++++++++ .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 21 ++++-- 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index a8bd34b5d..d6e11cbc8 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -27,6 +27,12 @@ -define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster). -define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster"). +-define(TEST_PUBLISH_FILTERS_ATOM, test_message_filters). +-define(TEST_PUBLISH_FILTERS_STRING, "test_message_filters"). + +-define(BEFORE, <<"before_hardcoded">>). +-define(AFTER, <<"after_hardcoded">>). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -136,6 +142,50 @@ t_cluster_name(_) -> [Callback | _] = emqx_hooks:lookup('client.connected'), ?assertEqual(1, emqx_hooks:callback_priority(Callback)). +t_message_topic_filters(_) -> + %% ========== Prepare hooks + Priority = 2, + SetEnvFun = + fun(emqx) -> + set_special_cfgs(emqx), + application:set_env(ekka, cluster_name, ?TEST_PUBLISH_FILTERS_ATOM); + (emqx_exhook) -> + application:set_env(emqx_exhook, hook_priority, Priority) + end, + + emqx_ct_helpers:stop_apps([emqx, emqx_exhook]), + emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), + + ?assertEqual(?TEST_PUBLISH_FILTERS_STRING, emqx_sys:cluster_name()), + + emqx_exhook:disable(default), + ok = emqx_exhook:enable(default), + %% See emqx_exhook_demo_svr:on_provider_loaded/2 + + [Callback | _] = emqx_hooks:lookup('message.publish'), + ?assertEqual(Priority, emqx_hooks:callback_priority(Callback)), + + %% ========== Test topic filters + {ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2), + + {ok, _, _} = emqtt:subscribe(C1,[{<<"t/1">>, qos0}, {<<"t/2">>, qos0}]), + + ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0), + [Msg1 | _] = receive_messages(1), + ct:pal("~p", [Msg1]), + %% server only handle topic `t/1`, rewrite topic `t/1` => `t/2`, rewrite payload hardcoded. + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)), + + ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0), + [Msg2 | _] = receive_messages(1), + ct:pal("~p", [Msg2]), + %% `t/2` not matched, no gRPC call, no handled + ?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- @@ -170,3 +220,18 @@ meck_print() -> unmeck_print() -> meck:unload(emqx_ctl). + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count-1, [Msg|Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 1000 -> + Msgs + end. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index 636de0d86..cdba81c1a 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -53,6 +53,8 @@ -define(NAME, ?MODULE). -define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). -define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>). +-define(TEST_PUBLISH_FILTERS_BIN, <<"test_message_filters">>). +-define(AFTER_HARDCODED_PAYLOAD, <<"after_hardcoded">>). %%-------------------------------------------------------------------- %% Server APIs @@ -134,16 +136,21 @@ on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) -> #{name => <<"session.discarded">>}, #{name => <<"session.takeovered">>}, #{name => <<"session.terminated">>}], + PublishWithFilter = + [#{name => <<"message.publish">>, topics => [<<"t/1">>]}], + PublishWithOutFilter = + [#{name => <<"message.publish">>}], HooksMessage = - [#{name => <<"message.publish">>}, - #{name => <<"message.delivered">>}, + [#{name => <<"message.delivered">>}, #{name => <<"message.acked">>}, #{name => <<"message.dropped">>}], case Name of ?DEFAULT_CLUSTER_NAME -> - {ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md}; + {ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithOutFilter ++ HooksMessage}, Md}; ?OTHER_CLUSTER_NAME_BIN -> - {ok, #{hooks => HooksClient}, Md} + {ok, #{hooks => HooksClient}, Md}; + ?TEST_PUBLISH_FILTERS_BIN -> + {ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithFilter ++ HooksMessage}, Md} end. -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} @@ -319,6 +326,12 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> payload => From}, {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; + <<"test_filter_client">> -> + %% rewrite topic and payload + NMsg = Msg#{topic => <<"t/2">>, + payload => ?AFTER_HARDCODED_PAYLOAD}, + {ok, #{type => 'STOP_AND_RETURN', + value => {message, NMsg}}, Md}; _ -> {ok, #{type => 'IGNORE'}, Md} end. From 820e848909ba498f3e380b4f8c413c7110207f88 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Aug 2022 11:13:44 +0800 Subject: [PATCH 4/4] test(exhook): more case for topic filter wildcards/level and different qos --- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 66 ++++++++++++++++--- .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 4 +- 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index d6e11cbc8..b4e096be7 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -169,22 +169,72 @@ t_message_topic_filters(_) -> {ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1), {ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2), - {ok, _, _} = emqtt:subscribe(C1,[{<<"t/1">>, qos0}, {<<"t/2">>, qos0}]), + {ok, _, _} = emqtt:subscribe(C1,[{<<"exhook/hardcoded">>, qos0}, + {<<"t/1">>, qos0}, + {<<"t/2">>, qos0}, + {<<"a/1">>, qos1}, + {<<"a/2">>, qos2}, + {<<"b/1">>, qos1}, + {<<"b/2">>, qos2}, + {<<"b/3/4">>, qos1}, + {<<"b/3/4/5">>, qos2} + ]), + %% server only handle topic `t/1`, rewrite all topic to `exhook/hardcoded`, + %% rewrite all payload to <<"after_hardcoded">> + %% See emqx_exhook_demo_svr:on_message_publish/2 + 'test_t/1_topic'(C1, C2), + 'test_a/#_topic'(C1, C2), + 'test_b/+_topic'(C1, C2), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + + +'test_t/1_topic'(_C1, C2) -> ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0), [Msg1 | _] = receive_messages(1), - ct:pal("~p", [Msg1]), - %% server only handle topic `t/1`, rewrite topic `t/1` => `t/2`, rewrite payload hardcoded. ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)), ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0), [Msg2 | _] = receive_messages(1), - ct:pal("~p", [Msg2]), - %% `t/2` not matched, no gRPC call, no handled - ?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)), + ?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)). + + +'test_a/#_topic'(_C1, C2) -> + {ok, _} = emqtt:publish(C2, <<"a/1">>, ?BEFORE, 1), + [Msg1 | _] = receive_messages(1), + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)), + + {ok, _} = emqtt:publish(C2, <<"a/2">>, ?BEFORE, 1), + [Msg2 | _] = receive_messages(1), + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg2)), + + {ok, _} = emqtt:publish(C2, <<"a/3/4">>, ?BEFORE, 1), + [Msg3 | _] = receive_messages(1), + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg3)), + + {ok, _} = emqtt:publish(C2, <<"a/3/4/5">>, ?BEFORE, 1), + [Msg4 | _] = receive_messages(1), + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg4)). + +'test_b/+_topic'(_C1, C2) -> + {ok, _} = emqtt:publish(C2, <<"b/1">>, ?BEFORE, 1), + [Msg1 | _] = receive_messages(1), + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)), + + {ok, _} = emqtt:publish(C2, <<"b/2">>, ?BEFORE, 1), + [Msg2 | _] = receive_messages(1), + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg2)), + + {ok, _} = emqtt:publish(C2, <<"b/3/4">>, ?BEFORE, 1), + [Msg3 | _] = receive_messages(1), + ?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg3)), + + {ok, _} = emqtt:publish(C2, <<"b/3/4/5">>, ?BEFORE, 2), + [Msg4 | _] = receive_messages(1), + ?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg4)). - ok = emqtt:disconnect(C1), - ok = emqtt:disconnect(C2). %%-------------------------------------------------------------------- %% Utils diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index cdba81c1a..1636beb2a 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -137,7 +137,7 @@ on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) -> #{name => <<"session.takeovered">>}, #{name => <<"session.terminated">>}], PublishWithFilter = - [#{name => <<"message.publish">>, topics => [<<"t/1">>]}], + [#{name => <<"message.publish">>, topics => [<<"t/1">>, <<"a/#">>, <<"b/+">>]}], PublishWithOutFilter = [#{name => <<"message.publish">>}], HooksMessage = @@ -328,7 +328,7 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> value => {message, NMsg}}, Md}; <<"test_filter_client">> -> %% rewrite topic and payload - NMsg = Msg#{topic => <<"t/2">>, + NMsg = Msg#{topic => <<"exhook/hardcoded">>, payload => ?AFTER_HARDCODED_PAYLOAD}, {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md};