test: rulesql select fields and select from event message

This commit is contained in:
JimMoen 2022-10-09 14:20:48 +08:00
parent 27e19da066
commit 5047211950
1 changed files with 188 additions and 109 deletions

View File

@ -21,6 +21,7 @@
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -51,15 +52,16 @@ groups() ->
, t_sqlselect_02
, t_sqlselect_1
, t_sqlselect_2
, t_sqlselect_2_1
, t_sqlselect_2_2
, t_sqlselect_2_3
, t_sqlselect_3
]},
{rulesql_select_events, [],
[ t_sqlparse_event_1
, t_sqlparse_event_2
, t_sqlparse_event_3
[ t_sqlparse_event_client_connected_01
, t_sqlparse_event_client_connected_02
, t_sqlparse_event_client_disconnected
, t_sqlparse_event_session_subscribed
, t_sqlparse_event_session_unsubscribed
, t_sqlparse_event_message_delivered
, t_sqlparse_event_message_acked
, t_sqlparse_event_message_dropped
]},
{rulesql_select_metadata, [],
[ t_sqlparse_select_matadata_1
@ -159,16 +161,35 @@ end_per_testcase(_TestCase, _Config) ->
t_sqlselect_0(_Config) ->
%% Verify SELECT with and without 'AS'
Sql = "select * "
"from \"t/#\" "
"where payload.cmd.info = 'tt'",
?assertMatch({ok,#{payload := <<"{\"cmd\": {\"info\":\"tt\"}}">>}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
<<"topic">> => <<"t/a">>}})),
Sql1 = "SELECT * "
"FROM \"t/#\" "
"WHERE payload.cmd.info = 'tt'",
%% all available fields by `select` from message publish, selecte other key will be `undefined`
Topic = <<"t/a">>,
Payload = <<"{\"cmd\": {\"info\":\"tt\"}}">>,
{ok, #{username := <<"u_emqx">>,
topic := Topic,
timestamp := TimeStamp,
qos := 1,
publish_received_at := TimeStamp,
peerhost := <<"127.0.0.1">>,
payload := Payload,
node := 'test@127.0.0.1',
metadata := #{rule_id := TestRuleId},
id := MsgId,
flags := #{sys := true, event := true},
clientid := <<"c_emqx">>
}
} = emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql1,
<<"ctx">> =>
#{<<"payload">> => Payload,
<<"topic">> => Topic}}),
?assert(is_binary(TestRuleId)),
?assert(is_binary(MsgId)),
?assert(is_integer(TimeStamp)),
Sql2 = "select payload.cmd as cmd "
"from \"t/#\" "
"where cmd.info = 'tt'",
@ -179,6 +200,7 @@ t_sqlselect_0(_Config) ->
#{<<"payload">> =>
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
<<"topic">> => <<"t/a">>}})),
Sql3 = "select payload.cmd as cmd, cmd.info as info "
"from \"t/#\" "
"where cmd.info = 'tt' and info = 'tt'",
@ -205,14 +227,15 @@ t_sqlselect_0(_Config) ->
t_sqlselect_00(_Config) ->
%% Verify plus/subtract and unary_add_or_subtract
Sql = "select 1-1 as a "
Sql0 = "select 1 - 1 as a "
"from \"t/#\" ",
?assertMatch({ok,#{<<"a">> := 0}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
#{<<"rawsql">> => Sql0,
<<"ctx">> =>
#{<<"payload">> => <<"">>,
<<"topic">> => <<"t/a">>}})),
Sql1 = "select -1 + 1 as a "
"from \"t/#\" ",
?assertMatch({ok,#{<<"a">> := 0}},
@ -221,6 +244,7 @@ t_sqlselect_00(_Config) ->
<<"ctx">> =>
#{<<"payload">> => <<"">>,
<<"topic">> => <<"t/a">>}})),
Sql2 = "select 1 + 1 as a "
"from \"t/#\" ",
?assertMatch({ok,#{<<"a">> := 2}},
@ -229,6 +253,7 @@ t_sqlselect_00(_Config) ->
<<"ctx">> =>
#{<<"payload">> => <<"">>,
<<"topic">> => <<"t/a">>}})),
Sql3 = "select +1 as a "
"from \"t/#\" ",
?assertMatch({ok,#{<<"a">> := 1}},
@ -431,58 +456,93 @@ t_sqlselect_2(_Config) ->
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlselect_2_1(_Config) ->
%%------------------------------------------------------------------------------
%% Test cases for events
%%------------------------------------------------------------------------------
%% FROM $events/client_connected
t_sqlparse_event_client_connected_01(_Config) ->
Sql = "select *"
"from \"$events/client_connected\" ",
%% all available fields by `select` from message publish, selecte other key will be `undefined`
{ok, #{clientid := <<"c_emqx">>,
username := <<"u_emqx">>,
timestamp := TimeStamp,
connected_at := TimeStamp,
peername := <<"127.0.0.1:12345">>,
metadata := #{rule_id := RuleId},
%% default value
node := 'test@127.0.0.1',
sockname := <<"0.0.0.0:1883">>,
proto_name := <<"MQTT">>,
proto_ver := 5,
mountpoint := undefined,
keepalive := 60,
is_bridge := false,
expiry_interval := 3600,
event := 'client.connected',
clean_start := true
}
} = emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"clientid">> => <<"c_emqx">>, <<"peername">> => <<"127.0.0.1:12345">>, <<"username">> => <<"u_emqx">>}}),
?assert(is_binary(RuleId)),
?assert(is_integer(TimeStamp)).
t_sqlparse_event_client_connected_02(_Config) ->
ok = emqx_rule_engine:load_providers(),
%% recursively republish to t2, if the msg dropped
%% republish the client.connected msg
TopicRule = create_simple_repub_rule(
<<"t2">>,
<<"repub/to/connected">>,
"SELECT * "
"FROM \"$events/message_dropped\" "),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
"FROM \"$events/client_connected\" "
"WHERE username = 'emqx1'",
<<"{clientid: ${clientid}}">>),
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 0),
Fun = fun() ->
receive {publish, #{topic := <<"t2">>, payload := _}} ->
received_t2
after 500 ->
received_nothing
end
end,
received_nothing = Fun(),
{ok, _, _} = emqtt:subscribe(Client, <<"repub/to/connected">>, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
{ok, _} = emqtt:connect(Client1),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"repub/to/connected">>, T),
?assertEqual(<<"{clientid: c_emqx1}">>, Payload)
after 1000 ->
ct:fail(wait_for_t2)
end,
%% it should not keep republishing "t2"
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
received_nothing = Fun(),
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
receive {publish, #{topic := <<"t2">>, payload := _}} ->
ct:fail(unexpected_t2)
after 1000 ->
ok
end,
emqtt:stop(Client),
emqtt:stop(Client), emqtt:stop(Client1),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlselect_2_2(_Config) ->
ok = emqx_rule_engine:load_providers(),
%% recursively republish to t2, if the msg acked
TopicRule = create_simple_repub_rule(
<<"t2">>,
"SELECT * "
"FROM \"$events/message_acked\" "),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 1),
emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 1),
Fun = fun() ->
receive {publish, #{topic := <<"t2">>, payload := _}} ->
received_t2
after 500 ->
received_nothing
end
end,
received_t2 = Fun(),
received_t2 = Fun(),
received_nothing = Fun(),
%% FROM $events/client_disconnected
t_sqlparse_event_client_disconnected(_Config) ->
%% TODO
ok.
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
%% FROM $events/session_subscribed
t_sqlparse_event_session_subscribed(_Config) ->
Sql = "select topic as tp "
"from \"$events/session_subscribed\" ",
?assertMatch({ok,#{<<"tp">> := <<"t/tt">>}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"topic">> => <<"t/tt">>}})).
t_sqlselect_2_3(_Config) ->
%% FROM $events/session_unsubscribed
t_sqlparse_event_session_unsubscribed(_Config) ->
%% TODO
ok.
%% FROM $events/message_delivered
t_sqlparse_event_message_delivered(_Config) ->
ok = emqx_rule_engine:load_providers(),
%% recursively republish to t2, if the msg delivered
TopicRule = create_simple_repub_rule(
@ -507,65 +567,84 @@ t_sqlselect_2_3(_Config) ->
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
t_sqlselect_3(_Config) ->
%% FROM $events/message_acked
t_sqlparse_event_message_acked(_Config) ->
ok = emqx_rule_engine:load_providers(),
%% republish the client.connected msg
%% republish to `repub/if/acked`, if the msg acked
TopicRule = create_simple_repub_rule(
<<"t2">>,
<<"repub/if/acked">>,
"SELECT * "
"FROM \"$events/client_connected\" "
"WHERE username = 'emqx1'",
<<"clientid=${clientid}">>),
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
"FROM \"$events/message_acked\" "),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
{ok, _} = emqtt:connect(Client1),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"clientid=c_emqx1">>, Payload)
after 1000 ->
ct:fail(wait_for_t2)
end,
{ok, _, _} = emqtt:subscribe(Client, <<"repub/if/acked">>, ?QOS_1),
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
receive {publish, #{topic := <<"t2">>, payload := _}} ->
ct:fail(unexpected_t2)
after 1000 ->
ok
end,
Fun = fun() ->
receive {publish, #{topic := <<"repub/if/acked">>, payload := _}} ->
received_acked
after 500 ->
received_nothing
end
end,
emqtt:stop(Client), emqtt:stop(Client1),
%% sub with max qos1 to generate ack packet
{ok, _, _} = emqtt:subscribe(Client, <<"any/topic">>, ?QOS_1),
Payload = <<"{\"x\":1,\"y\":144}">>,
%% even sub with qos1, but publish with qos0, no ack
emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_0),
received_nothing = Fun(),
%% sub and pub both are qos1, acked
emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_1),
received_acked = Fun(),
received_nothing = Fun(),
%% pub with qos2 but subscribed with qos1, acked
emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_2),
received_acked = Fun(),
received_nothing = Fun(),
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
%%------------------------------------------------------------------------------
%% Test cases for events
%%------------------------------------------------------------------------------
%% FROM $events/message_dropped
t_sqlparse_event_message_dropped(_Config) ->
ok = emqx_rule_engine:load_providers(),
%% republish to `repub/if/dropped`, if any msg dropped
TopicRule = create_simple_repub_rule(
<<"repub/if/dropped">>,
"SELECT * "
"FROM \"$events/message_dropped\" "),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
t_sqlparse_event_1(_Config) ->
Sql = "select topic as tp "
"from \"$events/session_subscribed\" ",
?assertMatch({ok,#{<<"tp">> := <<"t/tt">>}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"topic">> => <<"t/tt">>}})).
%% this message will be dropped and then repub to `repub/if/dropped`
emqtt:publish(Client, <<"any/topic/1">>, <<"{\"x\":1,\"y\":144}">>, 0),
t_sqlparse_event_2(_Config) ->
Sql = "select clientid "
"from \"$events/client_connected\" ",
?assertMatch({ok,#{<<"clientid">> := <<"abc">>}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"clientid">> => <<"abc">>}})).
Fun_t2 = fun() ->
receive {publish, #{topic := <<"repub/if/dropped">>, payload := _}} ->
received_repub
after 500 ->
received_nothing
end
end,
t_sqlparse_event_3(_Config) ->
Sql = "select clientid, topic as tp "
"from \"t/tt\", \"$events/client_connected\" ",
?assertMatch({ok,#{<<"clientid">> := <<"abc">>, <<"tp">> := <<"t/tt">>}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql,
<<"ctx">> => #{<<"clientid">> => <<"abc">>, <<"topic">> => <<"t/tt">>}})).
%% No client subscribed `any/topic`, triggered repub rule.
%% But havn't sub `repub/if/dropped`, so the repub message will also be dropped with recursively republish.
received_nothing = Fun_t2(),
{ok, _, _} = emqtt:subscribe(Client, <<"repub/if/dropped">>, 0),
%% this message will be dropped and then repub to `repub/to/t`
emqtt:publish(Client, <<"any/topic/2">>, <<"{\"x\":1,\"y\":144}">>, 0),
%% received subscribed `repub/to/t`
received_repub = Fun_t2(),
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
%%------------------------------------------------------------------------------
%% Test cases for `foreach`