test(exhook): more case for topic filter wildcards/level and different qos
This commit is contained in:
parent
2657b78c44
commit
820e848909
|
@ -169,22 +169,72 @@ t_message_topic_filters(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1),
|
{ok, C1} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C1),
|
||||||
{ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2),
|
{ok, C2} = emqtt:start_link([{clientid, <<"test_filter_client">>}, {username, <<"gooduser">>}]), {ok, _} = emqtt:connect(C2),
|
||||||
|
|
||||||
{ok, _, _} = emqtt:subscribe(C1,[{<<"t/1">>, qos0}, {<<"t/2">>, qos0}]),
|
{ok, _, _} = emqtt:subscribe(C1,[{<<"exhook/hardcoded">>, qos0},
|
||||||
|
{<<"t/1">>, qos0},
|
||||||
|
{<<"t/2">>, qos0},
|
||||||
|
{<<"a/1">>, qos1},
|
||||||
|
{<<"a/2">>, qos2},
|
||||||
|
{<<"b/1">>, qos1},
|
||||||
|
{<<"b/2">>, qos2},
|
||||||
|
{<<"b/3/4">>, qos1},
|
||||||
|
{<<"b/3/4/5">>, qos2}
|
||||||
|
]),
|
||||||
|
|
||||||
|
%% server only handle topic `t/1`, rewrite all topic to `exhook/hardcoded`,
|
||||||
|
%% rewrite all payload to <<"after_hardcoded">>
|
||||||
|
%% See emqx_exhook_demo_svr:on_message_publish/2
|
||||||
|
'test_t/1_topic'(C1, C2),
|
||||||
|
'test_a/#_topic'(C1, C2),
|
||||||
|
'test_b/+_topic'(C1, C2),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
ok = emqtt:disconnect(C2).
|
||||||
|
|
||||||
|
|
||||||
|
'test_t/1_topic'(_C1, C2) ->
|
||||||
ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0),
|
ok = emqtt:publish(C2, <<"t/1">>, ?BEFORE, 0),
|
||||||
[Msg1 | _] = receive_messages(1),
|
[Msg1 | _] = receive_messages(1),
|
||||||
ct:pal("~p", [Msg1]),
|
|
||||||
%% server only handle topic `t/1`, rewrite topic `t/1` => `t/2`, rewrite payload hardcoded.
|
|
||||||
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||||
|
|
||||||
ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0),
|
ok = emqtt:publish(C2, <<"t/2">>, ?BEFORE, 0),
|
||||||
[Msg2 | _] = receive_messages(1),
|
[Msg2 | _] = receive_messages(1),
|
||||||
ct:pal("~p", [Msg2]),
|
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)).
|
||||||
%% `t/2` not matched, no gRPC call, no handled
|
|
||||||
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg2)),
|
|
||||||
|
'test_a/#_topic'(_C1, C2) ->
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"a/1">>, ?BEFORE, 1),
|
||||||
|
[Msg1 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"a/2">>, ?BEFORE, 1),
|
||||||
|
[Msg2 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg2)),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"a/3/4">>, ?BEFORE, 1),
|
||||||
|
[Msg3 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg3)),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"a/3/4/5">>, ?BEFORE, 1),
|
||||||
|
[Msg4 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg4)).
|
||||||
|
|
||||||
|
'test_b/+_topic'(_C1, C2) ->
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"b/1">>, ?BEFORE, 1),
|
||||||
|
[Msg1 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg1)),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"b/2">>, ?BEFORE, 1),
|
||||||
|
[Msg2 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?AFTER}, maps:find(payload, Msg2)),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"b/3/4">>, ?BEFORE, 1),
|
||||||
|
[Msg3 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg3)),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(C2, <<"b/3/4/5">>, ?BEFORE, 2),
|
||||||
|
[Msg4 | _] = receive_messages(1),
|
||||||
|
?assertEqual({ok, ?BEFORE}, maps:find(payload, Msg4)).
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1),
|
|
||||||
ok = emqtt:disconnect(C2).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Utils
|
%% Utils
|
||||||
|
|
|
@ -137,7 +137,7 @@ on_provider_loaded(#{meta := #{cluster_name := Name}} = Req, Md) ->
|
||||||
#{name => <<"session.takeovered">>},
|
#{name => <<"session.takeovered">>},
|
||||||
#{name => <<"session.terminated">>}],
|
#{name => <<"session.terminated">>}],
|
||||||
PublishWithFilter =
|
PublishWithFilter =
|
||||||
[#{name => <<"message.publish">>, topics => [<<"t/1">>]}],
|
[#{name => <<"message.publish">>, topics => [<<"t/1">>, <<"a/#">>, <<"b/+">>]}],
|
||||||
PublishWithOutFilter =
|
PublishWithOutFilter =
|
||||||
[#{name => <<"message.publish">>}],
|
[#{name => <<"message.publish">>}],
|
||||||
HooksMessage =
|
HooksMessage =
|
||||||
|
@ -328,7 +328,7 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
|
||||||
value => {message, NMsg}}, Md};
|
value => {message, NMsg}}, Md};
|
||||||
<<"test_filter_client">> ->
|
<<"test_filter_client">> ->
|
||||||
%% rewrite topic and payload
|
%% rewrite topic and payload
|
||||||
NMsg = Msg#{topic => <<"t/2">>,
|
NMsg = Msg#{topic => <<"exhook/hardcoded">>,
|
||||||
payload => ?AFTER_HARDCODED_PAYLOAD},
|
payload => ?AFTER_HARDCODED_PAYLOAD},
|
||||||
{ok, #{type => 'STOP_AND_RETURN',
|
{ok, #{type => 'STOP_AND_RETURN',
|
||||||
value => {message, NMsg}}, Md};
|
value => {message, NMsg}}, Md};
|
||||||
|
|
Loading…
Reference in New Issue