diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index a8bd34b5d..d6e11cbc8 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -27,6 +27,12 @@ -define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster). -define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster"). +-define(TEST_PUBLISH_FILTERS_ATOM, test_message_filters). +-define(TEST_PUBLISH_FILTERS_STRING, "test_message_filters"). + +-define(BEFORE, <<"before_hardcoded">>). +-define(AFTER, <<"after_hardcoded">>). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -136,6 +142,50 @@ t_cluster_name(_) -> [Callback | _] = emqx_hooks:lookup('client.connected'), ?assertEqual(1, emqx_hooks:callback_priority(Callback)). +t_message_topic_filters(_) -> + %% ========== Prepare hooks + Priority = 2, + SetEnvFun = + fun(emqx) -> + set_special_cfgs(emqx), + application:set_env(ekka, cluster_name, ?TEST_PUBLISH_FILTERS_ATOM); + (emqx_exhook) -> + application:set_env(emqx_exhook, hook_priority, Priority) + end, + + emqx_ct_helpers:stop_apps([emqx, emqx_exhook]), + emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun), + + ?assertEqual(?TEST_PUBLISH_FILTERS_STRING, emqx_sys:cluster_name()), + + emqx_exhook:disable(default), + ok = emqx_exhook:enable(default), + %% See emqx_exhook_demo_svr:on_provider_loaded/2 + + [Callback | _] = emqx_hooks:lookup('message.publish'), + ?assertEqual(Priority, emqx_hooks:callback_priority(Callback)), + + %% ========== Test topic filters + {ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1), + {ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2), + + {ok, _, _} = emqtt:subscribe(C1,[{<<"t/1">>, qos0}, {<<"t/2">>, qos0}]), + + ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0), + [Msg1 | _] = receive_messages(1), + ct:pal("~p", [Msg1]), + %% server only handle topic `t/1`, rewrite topic `t/1` => `t/2`, rewrite payload hardcoded. + ?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)), + + ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0), + [Msg2 | _] = receive_messages(1), + ct:pal("~p", [Msg2]), + %% `t/2` not matched, no gRPC call, no handled + ?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- @@ -170,3 +220,18 @@ meck_print() -> unmeck_print() -> meck:unload(emqx_ctl). + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count-1, [Msg|Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 1000 -> + Msgs + end. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index 636de0d86..cdba81c1a 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -53,6 +53,8 @@ -define(NAME, ?MODULE). -define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>). -define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>). +-define(TEST_PUBLISH_FILTERS_BIN, <<"test_message_filters">>). +-define(AFTER_HARDCODED_PAYLOAD, <<"after_hardcoded">>). %%-------------------------------------------------------------------- %% Server APIs @@ -134,16 +136,21 @@ on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) -> #{name => <<"session.discarded">>}, #{name => <<"session.takeovered">>}, #{name => <<"session.terminated">>}], + PublishWithFilter = + [#{name => <<"message.publish">>, topics => [<<"t/1">>]}], + PublishWithOutFilter = + [#{name => <<"message.publish">>}], HooksMessage = - [#{name => <<"message.publish">>}, - #{name => <<"message.delivered">>}, + [#{name => <<"message.delivered">>}, #{name => <<"message.acked">>}, #{name => <<"message.dropped">>}], case Name of ?DEFAULT_CLUSTER_NAME -> - {ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md}; + {ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithOutFilter ++ HooksMessage}, Md}; ?OTHER_CLUSTER_NAME_BIN -> - {ok, #{hooks => HooksClient}, Md} + {ok, #{hooks => HooksClient}, Md}; + ?TEST_PUBLISH_FILTERS_BIN -> + {ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithFilter ++ HooksMessage}, Md} end. -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} @@ -319,6 +326,12 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> payload => From}, {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; + <<"test_filter_client">> -> + %% rewrite topic and payload + NMsg = Msg#{topic => <<"t/2">>, + payload => ?AFTER_HARDCODED_PAYLOAD}, + {ok, #{type => 'STOP_AND_RETURN', + value => {message, NMsg}}, Md}; _ -> {ok, #{type => 'IGNORE'}, Md} end.