test(exhook): ExHook message hooks with topic filters
This commit is contained in:
parent
b083a1cd2d
commit
2657b78c44
|
@ -27,6 +27,12 @@
|
|||
-define(OTHER_CLUSTER_NAME_ATOM, test_emqx_cluster).
|
||||
-define(OTHER_CLUSTER_NAME_STRING, "test_emqx_cluster").
|
||||
|
||||
-define(TEST_PUBLISH_FILTERS_ATOM, test_message_filters).
|
||||
-define(TEST_PUBLISH_FILTERS_STRING, "test_message_filters").
|
||||
|
||||
-define(BEFORE, <<"before_hardcoded">>).
|
||||
-define(AFTER, <<"after_hardcoded">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -136,6 +142,50 @@ t_cluster_name(_) ->
|
|||
[Callback | _] = emqx_hooks:lookup('client.connected'),
|
||||
?assertEqual(1, emqx_hooks:callback_priority(Callback)).
|
||||
|
||||
t_message_topic_filters(_) ->
|
||||
%% ========== Prepare hooks
|
||||
Priority = 2,
|
||||
SetEnvFun =
|
||||
fun(emqx) ->
|
||||
set_special_cfgs(emqx),
|
||||
application:set_env(ekka, cluster_name, ?TEST_PUBLISH_FILTERS_ATOM);
|
||||
(emqx_exhook) ->
|
||||
application:set_env(emqx_exhook, hook_priority, Priority)
|
||||
end,
|
||||
|
||||
emqx_ct_helpers:stop_apps([emqx, emqx_exhook]),
|
||||
emqx_ct_helpers:start_apps([emqx, emqx_exhook], SetEnvFun),
|
||||
|
||||
?assertEqual(?TEST_PUBLISH_FILTERS_STRING, emqx_sys:cluster_name()),
|
||||
|
||||
emqx_exhook:disable(default),
|
||||
ok = emqx_exhook:enable(default),
|
||||
%% See emqx_exhook_demo_svr:on_provider_loaded/2
|
||||
|
||||
[Callback | _] = emqx_hooks:lookup('message.publish'),
|
||||
?assertEqual(Priority, emqx_hooks:callback_priority(Callback)),
|
||||
|
||||
%% ========== Test topic filters
|
||||
{ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1),
|
||||
{ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2),
|
||||
|
||||
{ok, _, _} = emqtt:subscribe(C1,[{<<"t/1">>, qos0}, {<<"t/2">>, qos0}]),
|
||||
|
||||
ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0),
|
||||
[Msg1 | _] = receive_messages(1),
|
||||
ct:pal("~p", [Msg1]),
|
||||
%% server only handle topic `t/1`, rewrite topic `t/1` => `t/2`, rewrite payload hardcoded.
|
||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||
|
||||
ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0),
|
||||
[Msg2 | _] = receive_messages(1),
|
||||
ct:pal("~p", [Msg2]),
|
||||
%% `t/2` not matched, no gRPC call, no handled
|
||||
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)),
|
||||
|
||||
ok = emqtt:disconnect(C1),
|
||||
ok = emqtt:disconnect(C2).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Utils
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -170,3 +220,18 @@ meck_print() ->
|
|||
|
||||
unmeck_print() ->
|
||||
meck:unload(emqx_ctl).
|
||||
|
||||
receive_messages(Count) ->
|
||||
receive_messages(Count, []).
|
||||
|
||||
receive_messages(0, Msgs) ->
|
||||
Msgs;
|
||||
receive_messages(Count, Msgs) ->
|
||||
receive
|
||||
{publish, Msg} ->
|
||||
receive_messages(Count-1, [Msg|Msgs]);
|
||||
_Other ->
|
||||
receive_messages(Count, Msgs)
|
||||
after 1000 ->
|
||||
Msgs
|
||||
end.
|
||||
|
|
|
@ -53,6 +53,8 @@
|
|||
-define(NAME, ?MODULE).
|
||||
-define(DEFAULT_CLUSTER_NAME, <<"emqxcl">>).
|
||||
-define(OTHER_CLUSTER_NAME_BIN, <<"test_emqx_cluster">>).
|
||||
-define(TEST_PUBLISH_FILTERS_BIN, <<"test_message_filters">>).
|
||||
-define(AFTER_HARDCODED_PAYLOAD, <<"after_hardcoded">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Server APIs
|
||||
|
@ -134,16 +136,21 @@ on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) ->
|
|||
#{name => <<"session.discarded">>},
|
||||
#{name => <<"session.takeovered">>},
|
||||
#{name => <<"session.terminated">>}],
|
||||
PublishWithFilter =
|
||||
[#{name => <<"message.publish">>, topics => [<<"t/1">>]}],
|
||||
PublishWithOutFilter =
|
||||
[#{name => <<"message.publish">>}],
|
||||
HooksMessage =
|
||||
[#{name => <<"message.publish">>},
|
||||
#{name => <<"message.delivered">>},
|
||||
[#{name => <<"message.delivered">>},
|
||||
#{name => <<"message.acked">>},
|
||||
#{name => <<"message.dropped">>}],
|
||||
case Name of
|
||||
?DEFAULT_CLUSTER_NAME ->
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ HooksMessage}, Md};
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithOutFilter ++ HooksMessage}, Md};
|
||||
?OTHER_CLUSTER_NAME_BIN ->
|
||||
{ok, #{hooks => HooksClient}, Md}
|
||||
{ok, #{hooks => HooksClient}, Md};
|
||||
?TEST_PUBLISH_FILTERS_BIN ->
|
||||
{ok, #{hooks => HooksClient ++ HooksSession ++ PublishWithFilter ++ HooksMessage}, Md}
|
||||
end.
|
||||
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
|
||||
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
|
||||
|
@ -319,6 +326,12 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
|
|||
payload => From},
|
||||
{ok, #{type => 'STOP_AND_RETURN',
|
||||
value => {message, NMsg}}, Md};
|
||||
<<"test_filter_client">> ->
|
||||
%% rewrite topic and payload
|
||||
NMsg = Msg#{topic => <<"t/2">>,
|
||||
payload => ?AFTER_HARDCODED_PAYLOAD},
|
||||
{ok, #{type => 'STOP_AND_RETURN',
|
||||
value => {message, NMsg}}, Md};
|
||||
_ ->
|
||||
{ok, #{type => 'IGNORE'}, Md}
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue