diff --git a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl index 33de9b7dd..f4f329e87 100644 --- a/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rulesql_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx_rule_engine/include/rule_engine.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -51,15 +52,16 @@ groups() -> , t_sqlselect_02 , t_sqlselect_1 , t_sqlselect_2 - , t_sqlselect_2_1 - , t_sqlselect_2_2 - , t_sqlselect_2_3 - , t_sqlselect_3 ]}, {rulesql_select_events, [], - [ t_sqlparse_event_1 - , t_sqlparse_event_2 - , t_sqlparse_event_3 + [ t_sqlparse_event_client_connected_01 + , t_sqlparse_event_client_connected_02 + , t_sqlparse_event_client_disconnected + , t_sqlparse_event_session_subscribed + , t_sqlparse_event_session_unsubscribed + , t_sqlparse_event_message_delivered + , t_sqlparse_event_message_acked + , t_sqlparse_event_message_dropped ]}, {rulesql_select_metadata, [], [ t_sqlparse_select_matadata_1 @@ -159,16 +161,35 @@ end_per_testcase(_TestCase, _Config) -> t_sqlselect_0(_Config) -> %% Verify SELECT with and without 'AS' - Sql = "select * " - "from \"t/#\" " - "where payload.cmd.info = 'tt'", - ?assertMatch({ok,#{payload := <<"{\"cmd\": {\"info\":\"tt\"}}">>}}, - emqx_rule_sqltester:test( - #{<<"rawsql">> => Sql, - <<"ctx">> => - #{<<"payload">> => - <<"{\"cmd\": {\"info\":\"tt\"}}">>, - <<"topic">> => <<"t/a">>}})), + + Sql1 = "SELECT * " + "FROM \"t/#\" " + "WHERE payload.cmd.info = 'tt'", + %% all available fields by `select` from message publish, selecte other key will be `undefined` + Topic = <<"t/a">>, + Payload = <<"{\"cmd\": {\"info\":\"tt\"}}">>, + {ok, #{username := <<"u_emqx">>, + topic := Topic, + timestamp := TimeStamp, + qos := 1, + publish_received_at := TimeStamp, + peerhost := <<"127.0.0.1">>, + payload := Payload, + node := 'test@127.0.0.1', + metadata := #{rule_id := TestRuleId}, + id := MsgId, + flags := #{sys := true, event := true}, + clientid := <<"c_emqx">> + } + } = emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql1, + <<"ctx">> => + #{<<"payload">> => Payload, + <<"topic">> => Topic}}), + ?assert(is_binary(TestRuleId)), + ?assert(is_binary(MsgId)), + ?assert(is_integer(TimeStamp)), + Sql2 = "select payload.cmd as cmd " "from \"t/#\" " "where cmd.info = 'tt'", @@ -179,6 +200,7 @@ t_sqlselect_0(_Config) -> #{<<"payload">> => <<"{\"cmd\": {\"info\":\"tt\"}}">>, <<"topic">> => <<"t/a">>}})), + Sql3 = "select payload.cmd as cmd, cmd.info as info " "from \"t/#\" " "where cmd.info = 'tt' and info = 'tt'", @@ -205,14 +227,15 @@ t_sqlselect_0(_Config) -> t_sqlselect_00(_Config) -> %% Verify plus/subtract and unary_add_or_subtract - Sql = "select 1-1 as a " + Sql0 = "select 1 - 1 as a " "from \"t/#\" ", ?assertMatch({ok,#{<<"a">> := 0}}, emqx_rule_sqltester:test( - #{<<"rawsql">> => Sql, + #{<<"rawsql">> => Sql0, <<"ctx">> => #{<<"payload">> => <<"">>, <<"topic">> => <<"t/a">>}})), + Sql1 = "select -1 + 1 as a " "from \"t/#\" ", ?assertMatch({ok,#{<<"a">> := 0}}, @@ -221,6 +244,7 @@ t_sqlselect_00(_Config) -> <<"ctx">> => #{<<"payload">> => <<"">>, <<"topic">> => <<"t/a">>}})), + Sql2 = "select 1 + 1 as a " "from \"t/#\" ", ?assertMatch({ok,#{<<"a">> := 2}}, @@ -229,6 +253,7 @@ t_sqlselect_00(_Config) -> <<"ctx">> => #{<<"payload">> => <<"">>, <<"topic">> => <<"t/a">>}})), + Sql3 = "select +1 as a " "from \"t/#\" ", ?assertMatch({ok,#{<<"a">> := 1}}, @@ -431,58 +456,93 @@ t_sqlselect_2(_Config) -> emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). -t_sqlselect_2_1(_Config) -> +%%------------------------------------------------------------------------------ +%% Test cases for events +%%------------------------------------------------------------------------------ + +%% FROM $events/client_connected +t_sqlparse_event_client_connected_01(_Config) -> + Sql = "select *" + "from \"$events/client_connected\" ", + + %% all available fields by `select` from message publish, selecte other key will be `undefined` + {ok, #{clientid := <<"c_emqx">>, + username := <<"u_emqx">>, + timestamp := TimeStamp, + connected_at := TimeStamp, + peername := <<"127.0.0.1:12345">>, + metadata := #{rule_id := RuleId}, + %% default value + node := 'test@127.0.0.1', + sockname := <<"0.0.0.0:1883">>, + proto_name := <<"MQTT">>, + proto_ver := 5, + mountpoint := undefined, + keepalive := 60, + is_bridge := false, + expiry_interval := 3600, + event := 'client.connected', + clean_start := true + } + } = emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql, + <<"ctx">> => #{<<"clientid">> => <<"c_emqx">>, <<"peername">> => <<"127.0.0.1:12345">>, <<"username">> => <<"u_emqx">>}}), + ?assert(is_binary(RuleId)), + ?assert(is_integer(TimeStamp)). + +t_sqlparse_event_client_connected_02(_Config) -> ok = emqx_rule_engine:load_providers(), - %% recursively republish to t2, if the msg dropped + %% republish the client.connected msg TopicRule = create_simple_repub_rule( - <<"t2">>, + <<"repub/to/connected">>, "SELECT * " - "FROM \"$events/message_dropped\" "), - {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + "FROM \"$events/client_connected\" " + "WHERE username = 'emqx1'", + <<"{clientid: ${clientid}}">>), + {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), {ok, _} = emqtt:connect(Client), - emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 0), - Fun = fun() -> - receive {publish, #{topic := <<"t2">>, payload := _}} -> - received_t2 - after 500 -> - received_nothing - end - end, - received_nothing = Fun(), + {ok, _, _} = emqtt:subscribe(Client, <<"repub/to/connected">>, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]), + {ok, _} = emqtt:connect(Client1), + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(<<"repub/to/connected">>, T), + ?assertEqual(<<"{clientid: c_emqx1}">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, - %% it should not keep republishing "t2" - {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - received_nothing = Fun(), + emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0), + receive {publish, #{topic := <<"t2">>, payload := _}} -> + ct:fail(unexpected_t2) + after 1000 -> + ok + end, - emqtt:stop(Client), + emqtt:stop(Client), emqtt:stop(Client1), emqx_rule_registry:remove_rule(TopicRule). -t_sqlselect_2_2(_Config) -> - ok = emqx_rule_engine:load_providers(), - %% recursively republish to t2, if the msg acked - TopicRule = create_simple_repub_rule( - <<"t2">>, - "SELECT * " - "FROM \"$events/message_acked\" "), - {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), - {ok, _} = emqtt:connect(Client), - {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 1), - emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 1), - Fun = fun() -> - receive {publish, #{topic := <<"t2">>, payload := _}} -> - received_t2 - after 500 -> - received_nothing - end - end, - received_t2 = Fun(), - received_t2 = Fun(), - received_nothing = Fun(), +%% FROM $events/client_disconnected +t_sqlparse_event_client_disconnected(_Config) -> + %% TODO + ok. - emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule). +%% FROM $events/session_subscribed +t_sqlparse_event_session_subscribed(_Config) -> + Sql = "select topic as tp " + "from \"$events/session_subscribed\" ", + ?assertMatch({ok,#{<<"tp">> := <<"t/tt">>}}, + emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql, + <<"ctx">> => #{<<"topic">> => <<"t/tt">>}})). -t_sqlselect_2_3(_Config) -> +%% FROM $events/session_unsubscribed +t_sqlparse_event_session_unsubscribed(_Config) -> + %% TODO + ok. + +%% FROM $events/message_delivered +t_sqlparse_event_message_delivered(_Config) -> ok = emqx_rule_engine:load_providers(), %% recursively republish to t2, if the msg delivered TopicRule = create_simple_repub_rule( @@ -507,65 +567,84 @@ t_sqlselect_2_3(_Config) -> emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). -t_sqlselect_3(_Config) -> +%% FROM $events/message_acked +t_sqlparse_event_message_acked(_Config) -> ok = emqx_rule_engine:load_providers(), - %% republish the client.connected msg + %% republish to `repub/if/acked`, if the msg acked TopicRule = create_simple_repub_rule( - <<"t2">>, + <<"repub/if/acked">>, "SELECT * " - "FROM \"$events/client_connected\" " - "WHERE username = 'emqx1'", - <<"clientid=${clientid}">>), - {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), + "FROM \"$events/message_acked\" "), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, _} = emqtt:connect(Client), - {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - ct:sleep(200), - {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]), - {ok, _} = emqtt:connect(Client1), - receive {publish, #{topic := T, payload := Payload}} -> - ?assertEqual(<<"t2">>, T), - ?assertEqual(<<"clientid=c_emqx1">>, Payload) - after 1000 -> - ct:fail(wait_for_t2) - end, + {ok, _, _} = emqtt:subscribe(Client, <<"repub/if/acked">>, ?QOS_1), - emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0), - receive {publish, #{topic := <<"t2">>, payload := _}} -> - ct:fail(unexpected_t2) - after 1000 -> - ok - end, + Fun = fun() -> + receive {publish, #{topic := <<"repub/if/acked">>, payload := _}} -> + received_acked + after 500 -> + received_nothing + end + end, - emqtt:stop(Client), emqtt:stop(Client1), + %% sub with max qos1 to generate ack packet + {ok, _, _} = emqtt:subscribe(Client, <<"any/topic">>, ?QOS_1), + + Payload = <<"{\"x\":1,\"y\":144}">>, + + %% even sub with qos1, but publish with qos0, no ack + emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_0), + received_nothing = Fun(), + + %% sub and pub both are qos1, acked + emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_1), + received_acked = Fun(), + received_nothing = Fun(), + + %% pub with qos2 but subscribed with qos1, acked + emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_2), + received_acked = Fun(), + received_nothing = Fun(), + + emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). -%%------------------------------------------------------------------------------ -%% Test cases for events -%%------------------------------------------------------------------------------ +%% FROM $events/message_dropped +t_sqlparse_event_message_dropped(_Config) -> + ok = emqx_rule_engine:load_providers(), + %% republish to `repub/if/dropped`, if any msg dropped + TopicRule = create_simple_repub_rule( + <<"repub/if/dropped">>, + "SELECT * " + "FROM \"$events/message_dropped\" "), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), -t_sqlparse_event_1(_Config) -> - Sql = "select topic as tp " - "from \"$events/session_subscribed\" ", - ?assertMatch({ok,#{<<"tp">> := <<"t/tt">>}}, - emqx_rule_sqltester:test( - #{<<"rawsql">> => Sql, - <<"ctx">> => #{<<"topic">> => <<"t/tt">>}})). + %% this message will be dropped and then repub to `repub/if/dropped` + emqtt:publish(Client, <<"any/topic/1">>, <<"{\"x\":1,\"y\":144}">>, 0), -t_sqlparse_event_2(_Config) -> - Sql = "select clientid " - "from \"$events/client_connected\" ", - ?assertMatch({ok,#{<<"clientid">> := <<"abc">>}}, - emqx_rule_sqltester:test( - #{<<"rawsql">> => Sql, - <<"ctx">> => #{<<"clientid">> => <<"abc">>}})). + Fun_t2 = fun() -> + receive {publish, #{topic := <<"repub/if/dropped">>, payload := _}} -> + received_repub + after 500 -> + received_nothing + end + end, -t_sqlparse_event_3(_Config) -> - Sql = "select clientid, topic as tp " - "from \"t/tt\", \"$events/client_connected\" ", - ?assertMatch({ok,#{<<"clientid">> := <<"abc">>, <<"tp">> := <<"t/tt">>}}, - emqx_rule_sqltester:test( - #{<<"rawsql">> => Sql, - <<"ctx">> => #{<<"clientid">> => <<"abc">>, <<"topic">> => <<"t/tt">>}})). + %% No client subscribed `any/topic`, triggered repub rule. + %% But havn't sub `repub/if/dropped`, so the repub message will also be dropped with recursively republish. + received_nothing = Fun_t2(), + + {ok, _, _} = emqtt:subscribe(Client, <<"repub/if/dropped">>, 0), + + %% this message will be dropped and then repub to `repub/to/t` + emqtt:publish(Client, <<"any/topic/2">>, <<"{\"x\":1,\"y\":144}">>, 0), + + %% received subscribed `repub/to/t` + received_repub = Fun_t2(), + + emqtt:stop(Client), + emqx_rule_registry:remove_rule(TopicRule). %%------------------------------------------------------------------------------ %% Test cases for `foreach`