1644 lines
62 KiB
Erlang
1644 lines
62 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rulesql_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-include("emqx_rule_test.hrl").
|
|
|
|
all() ->
|
|
[ {group, rulesql_select}
|
|
, {group, rulesql_select_events}
|
|
, {group, rulesql_select_metadata}
|
|
, {group, rulesql_foreach}
|
|
, {group, rulesql_case_when}
|
|
, {group, rulesql_array_index}
|
|
, {group, rulesql_array_range}
|
|
, {group, rulesql_compare}
|
|
, {group, rulesql_boolean}
|
|
, {group, rulesql_others}
|
|
].
|
|
|
|
suite() ->
|
|
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
|
|
|
|
groups() ->
|
|
[{rulesql_select, [],
|
|
[ t_sqlselect_0
|
|
, t_sqlselect_00
|
|
, t_sqlselect_01
|
|
, t_sqlselect_02
|
|
, t_sqlselect_1
|
|
, t_sqlselect_2
|
|
]},
|
|
{rulesql_select_events, [],
|
|
[ t_sqlparse_event_client_connected_01
|
|
, t_sqlparse_event_client_connected_02
|
|
, t_sqlparse_event_client_disconnected_normal
|
|
, t_sqlparse_event_client_disconnected_kicked
|
|
, t_sqlparse_event_client_disconnected_discarded
|
|
, t_sqlparse_event_client_disconnected_takeovered
|
|
, t_sqlparse_event_session_subscribed
|
|
, t_sqlparse_event_session_unsubscribed
|
|
, t_sqlparse_event_message_delivered
|
|
, t_sqlparse_event_message_acked
|
|
, t_sqlparse_event_message_dropped
|
|
]},
|
|
{rulesql_select_metadata, [],
|
|
[ t_sqlparse_select_matadata_1
|
|
]},
|
|
{rulesql_foreach, [],
|
|
[ t_sqlparse_foreach_1
|
|
, t_sqlparse_foreach_2
|
|
, t_sqlparse_foreach_3
|
|
, t_sqlparse_foreach_4
|
|
, t_sqlparse_foreach_5
|
|
, t_sqlparse_foreach_6
|
|
, t_sqlparse_foreach_7
|
|
, t_sqlparse_foreach_8
|
|
]},
|
|
{rulesql_case_when, [],
|
|
[ t_sqlparse_case_when_1
|
|
, t_sqlparse_case_when_2
|
|
, t_sqlparse_case_when_3
|
|
]},
|
|
{rulesql_array_index, [],
|
|
[ t_sqlparse_array_index_1
|
|
, t_sqlparse_array_index_2
|
|
, t_sqlparse_array_index_3
|
|
, t_sqlparse_array_index_4
|
|
, t_sqlparse_array_index_5
|
|
]},
|
|
{rulesql_array_range, [],
|
|
[ t_sqlparse_array_range_1
|
|
, t_sqlparse_array_range_2
|
|
]},
|
|
{rulesql_compare, [],
|
|
[ t_sqlparse_compare_undefined
|
|
, t_sqlparse_compare_null_null
|
|
, t_sqlparse_compare_null_notnull
|
|
, t_sqlparse_compare_notnull_null
|
|
, t_sqlparse_compare
|
|
]},
|
|
{rulesql_boolean, [],
|
|
[ t_sqlparse_true_false
|
|
]},
|
|
{rulesql_others, [],
|
|
[ t_sqlparse_new_map
|
|
, t_sqlparse_invalid_json
|
|
]}
|
|
].
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Overall setup/teardown
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
ok = ekka_mnesia:start(),
|
|
ok = emqx_rule_registry:mnesia(boot),
|
|
start_apps(),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
stop_apps(),
|
|
ok.
|
|
|
|
on_resource_create(_id, _) -> #{}.
|
|
on_resource_destroy(_id, _) -> ok.
|
|
on_get_resource_status(_id, _) -> #{is_alive => true}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Group specific setup/teardown
|
|
%%------------------------------------------------------------------------------
|
|
|
|
group(_Groupname) ->
|
|
[].
|
|
|
|
init_per_group(registry, Config) ->
|
|
Config;
|
|
init_per_group(_Groupname, Config) ->
|
|
Config.
|
|
|
|
end_per_group(_Groupname, _Config) ->
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcase specific setup/teardown
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
|
|
application:set_env(emqx, client_disconnect_discarded, true),
|
|
Config;
|
|
init_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
|
|
application:set_env(emqx, client_disconnect_takeovered, true),
|
|
Config;
|
|
init_per_testcase(_TestCase, Config) ->
|
|
init_events_counters(),
|
|
ok = emqx_rule_registry:register_resource_types(
|
|
[make_simple_debug_resource_type()]),
|
|
%ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]),
|
|
Config.
|
|
|
|
end_per_testcase(t_sqlparse_event_client_disconnected_takeovered, Config) ->
|
|
application:set_env(emqx, client_disconnect_takeovered, false), %% back to default
|
|
Config;
|
|
end_per_testcase(t_sqlparse_event_client_disconnected_discarded, Config) ->
|
|
application:set_env(emqx, client_disconnect_discarded, false), %% back to default
|
|
Config;
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for `select`
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlselect_0(_Config) ->
|
|
%% Verify SELECT with and without 'AS'
|
|
|
|
Sql1 = "SELECT * "
|
|
"FROM \"t/#\" "
|
|
"WHERE payload.cmd.info = 'tt'",
|
|
%% all available fields by `select` from message publish, selecte other key will be `undefined`
|
|
Topic = <<"t/a">>,
|
|
Payload = <<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
{ok, #{username := <<"u_emqx">>,
|
|
topic := Topic,
|
|
timestamp := TimeStamp,
|
|
qos := 1,
|
|
publish_received_at := TimeStamp,
|
|
peerhost := <<"127.0.0.1">>,
|
|
payload := Payload,
|
|
node := 'test@127.0.0.1',
|
|
metadata := #{rule_id := TestRuleId},
|
|
id := MsgId,
|
|
flags := #{sys := true, event := true},
|
|
clientid := <<"c_emqx">>
|
|
}
|
|
} = emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => Payload,
|
|
<<"topic">> => Topic}}),
|
|
?assert(is_binary(TestRuleId)),
|
|
?assert(is_binary(MsgId)),
|
|
?assert(is_integer(TimeStamp)),
|
|
|
|
Sql2 = "select payload.cmd as cmd "
|
|
"from \"t/#\" "
|
|
"where cmd.info = 'tt'",
|
|
?assertMatch({ok,#{<<"cmd">> := #{<<"info">> := <<"tt">>}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
Sql3 = "select payload.cmd as cmd, cmd.info as info "
|
|
"from \"t/#\" "
|
|
"where cmd.info = 'tt' and info = 'tt'",
|
|
?assertMatch({ok,#{<<"cmd">> := #{<<"info">> := <<"tt">>},
|
|
<<"info">> := <<"tt">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql3,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% cascaded as
|
|
Sql4 = "select payload.cmd as cmd, cmd.info as meta.info "
|
|
"from \"t/#\" "
|
|
"where cmd.info = 'tt' and meta.info = 'tt'",
|
|
?assertMatch({ok,#{<<"cmd">> := #{<<"info">> := <<"tt">>},
|
|
<<"meta">> := #{<<"info">> := <<"tt">>}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql4,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlselect_00(_Config) ->
|
|
%% Verify plus/subtract and unary_add_or_subtract
|
|
Sql0 = "select 1 - 1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 0}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql0,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
Sql1 = "select -1 + 1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 0}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
Sql2 = "select 1 + 1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 2}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
Sql3 = "select +1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 1}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql3,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
Sql4 = "select payload.msg1 + payload.msg2 as msg "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"msg">> := <<"hello world">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql4,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"msg1\": \"hello\", \"msg2\": \" world\"}">>,
|
|
<<"topic">> => <<"t/1">>}})),
|
|
|
|
Sql5 = "select payload.msg1 + payload.msg2 as msg "
|
|
"from \"t/#\" ",
|
|
?assertMatch({error, {select_and_transform_error, {error, unsupported_type_implicit_conversion, _ST}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql5,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"msg1\": \"hello\", \"msg2\": 1}">>,
|
|
<<"topic">> => <<"t/1">>}})).
|
|
|
|
%% Verify SELECT with and with 'WHERE'
|
|
t_sqlselect_01(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
TopicRule1 = create_simple_repub_rule(
|
|
<<"t2">>,
|
|
"SELECT json_decode(payload) as p, payload "
|
|
"FROM \"t3/#\", \"t1\" "
|
|
"WHERE p.x = 1"),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
|
ct:sleep(100),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
ct:fail(unexpected_t2)
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
|
|
receive {publish, #{topic := T3, payload := Payload3}} ->
|
|
?assertEqual(<<"t2">>, T3),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload3)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule1).
|
|
|
|
t_sqlselect_02(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
TopicRule1 = create_simple_repub_rule(
|
|
<<"t2">>,
|
|
"SELECT * "
|
|
"FROM \"t3/#\", \"t1\" "
|
|
"WHERE payload.x = 1"),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
|
ct:sleep(100),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := Payload0}} ->
|
|
ct:fail({unexpected_t2, Payload0})
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
|
|
receive {publish, #{topic := T3, payload := Payload3}} ->
|
|
?assertEqual(<<"t2">>, T3),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload3)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule1).
|
|
|
|
t_sqlselect_03(_Config) ->
|
|
%% Verify SELECT with and with 'WHERE' and `+` `=` and `or` condition in 'WHERE' clause
|
|
ok = emqx_rule_engine:load_providers(),
|
|
TopicRule1 = create_simple_repub_rule(
|
|
<<"t2">>,
|
|
"SELECT * "
|
|
"FROM \"t3/#\", \"t1\" "
|
|
"WHERE payload.x + payload.y = 2 or payload.x + payload.y = \"11\""),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1, \"y\":1}">>, 0),
|
|
ct:sleep(100),
|
|
receive {publish, #{topic := T1, payload := Payload0}} ->
|
|
?assertEqual(<<"t2">>, T1),
|
|
?assertEqual(<<"{\"x\":1, \"y\":1}">>, Payload0)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
receive {publish, #{topic := T2, payload := Payload1}} ->
|
|
?assertEqual(<<"t2">>, T2),
|
|
?assertEqual(<<"{\"x\":\"1\", \"y\":\"1\"}">>, Payload1)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1, \"y\":2}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := Payload2}} ->
|
|
ct:fail({unexpected_t2, Payload2})
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1, \"y\":1}">>, 0),
|
|
receive {publish, #{topic := T3, payload := Payload3}} ->
|
|
?assertEqual(<<"t2">>, T3),
|
|
?assertEqual(<<"{\"x\":1, \"y\":1}">>, Payload3)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule1).
|
|
|
|
t_sqlselect_1(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
TopicRule = create_simple_repub_rule(
|
|
<<"t2">>,
|
|
"SELECT json_decode(payload) as p, payload "
|
|
"FROM \"t1\" "
|
|
"WHERE p.x = 1 and p.y = 2"),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
ct:sleep(200),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
ct:fail(unexpected_t2)
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
t_sqlselect_2(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
%% recursively republish to t2
|
|
TopicRule = create_simple_repub_rule(
|
|
<<"t2">>,
|
|
"SELECT * "
|
|
"FROM \"t2\" "),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
|
|
emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 0),
|
|
Fun = fun() ->
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
received_t2
|
|
after 500 ->
|
|
received_nothing
|
|
end
|
|
end,
|
|
received_t2 = Fun(),
|
|
received_t2 = Fun(),
|
|
received_nothing = Fun(),
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for events
|
|
%%------------------------------------------------------------------------------
|
|
|
|
%% FROM $events/client_connected
|
|
t_sqlparse_event_client_connected_01(_Config) ->
|
|
Sql = "select *"
|
|
"from \"$events/client_connected\" ",
|
|
|
|
%% all available fields by `select` from message publish, selecte other key will be `undefined`
|
|
{ok, #{clientid := <<"c_emqx">>,
|
|
username := <<"u_emqx">>,
|
|
timestamp := TimeStamp,
|
|
connected_at := TimeStamp,
|
|
peername := <<"127.0.0.1:12345">>,
|
|
metadata := #{rule_id := RuleId},
|
|
%% default value
|
|
node := 'test@127.0.0.1',
|
|
sockname := <<"0.0.0.0:1883">>,
|
|
proto_name := <<"MQTT">>,
|
|
proto_ver := 5,
|
|
mountpoint := undefined,
|
|
keepalive := 60,
|
|
is_bridge := false,
|
|
expiry_interval := 3600,
|
|
event := 'client.connected',
|
|
clean_start := true
|
|
}
|
|
} = emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"clientid">> => <<"c_emqx">>, <<"peername">> => <<"127.0.0.1:12345">>, <<"username">> => <<"u_emqx">>}}),
|
|
?assert(is_binary(RuleId)),
|
|
?assert(is_integer(TimeStamp)).
|
|
|
|
t_sqlparse_event_client_connected_02(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
%% republish the client.connected msg
|
|
TopicRule = create_simple_repub_rule(
|
|
<<"repub/to/connected">>,
|
|
"SELECT * "
|
|
"FROM \"$events/client_connected\" "
|
|
"WHERE username = 'emqx1'",
|
|
<<"{clientid: ${clientid}}">>),
|
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"repub/to/connected">>, 0),
|
|
ct:sleep(200),
|
|
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"repub/to/connected">>, T),
|
|
?assertEqual(<<"{clientid: c_emqx1}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
ct:fail(unexpected_t2)
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:stop(Client), emqtt:stop(Client1),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
%% FROM $events/client_disconnected
|
|
t_sqlparse_event_client_disconnected_normal(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
Sql = "select * "
|
|
"from \"$events/client_disconnected\" ",
|
|
RepubT = <<"repub/to/disconnected/normal">>,
|
|
|
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
|
|
|
{ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
|
ct:sleep(200),
|
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
emqtt:disconnect(Client1),
|
|
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(RepubT, T),
|
|
?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
|
|
after 1000 ->
|
|
ct:fail(wait_for_repub_disconnected_normal)
|
|
end,
|
|
emqtt:stop(Client),
|
|
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
t_sqlparse_event_client_disconnected_kicked(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
Sql = "select * "
|
|
"from \"$events/client_disconnected\" ",
|
|
RepubT = <<"repub/to/disconnected/kicked">>,
|
|
|
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
|
|
|
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
{ok, _} = emqtt:connect(ClientRecvRepub),
|
|
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
|
ct:sleep(200),
|
|
|
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
emqx_cm:kick_session(<<"emqx">>),
|
|
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
|
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(RepubT, T),
|
|
?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
|
|
after 1000 ->
|
|
ct:fail(wait_for_repub_disconnected_kicked)
|
|
end,
|
|
emqtt:stop(ClientRecvRepub),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
t_sqlparse_event_client_disconnected_discarded(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
Sql = "select * "
|
|
"from \"$events/client_disconnected\" ",
|
|
RepubT = <<"repub/to/disconnected/discarded">>,
|
|
|
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
|
|
|
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
{ok, _} = emqtt:connect(ClientRecvRepub),
|
|
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
|
ct:sleep(200),
|
|
|
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
|
|
|
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}]),
|
|
{ok, _} = emqtt:connect(Client2),
|
|
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(RepubT, T),
|
|
?assertMatch(#{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps]))
|
|
after 1000 ->
|
|
ct:fail(wait_for_repub_disconnected_discarded)
|
|
end,
|
|
|
|
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
t_sqlparse_event_client_disconnected_takeovered(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
Sql = "select * "
|
|
"from \"$events/client_disconnected\" ",
|
|
RepubT = <<"repub/to/disconnected/takeovered">>,
|
|
|
|
TopicRule = create_simple_repub_rule(RepubT, Sql, <<>>),
|
|
|
|
{ok, ClientRecvRepub} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
{ok, _} = emqtt:connect(ClientRecvRepub),
|
|
{ok, _, _} = emqtt:subscribe(ClientRecvRepub, RepubT, 0),
|
|
ct:sleep(200),
|
|
|
|
{ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
unlink(Client1), %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
|
|
|
{ok, Client2} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}]),
|
|
{ok, _} = emqtt:connect(Client2),
|
|
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(RepubT, T),
|
|
?assertMatch(#{<<"reason">> := <<"takeovered">>}, emqx_json:decode(Payload, [return_maps]))
|
|
after 1000 ->
|
|
ct:fail(wait_for_repub_disconnected_discarded)
|
|
end,
|
|
|
|
emqtt:stop(ClientRecvRepub), emqtt:stop(Client2),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
%% FROM $events/session_subscribed
|
|
t_sqlparse_event_session_subscribed(_Config) ->
|
|
Sql = "select topic as tp "
|
|
"from \"$events/session_subscribed\" ",
|
|
?assertMatch({ok,#{<<"tp">> := <<"t/tt">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"topic">> => <<"t/tt">>}})).
|
|
|
|
%% FROM $events/session_unsubscribed
|
|
t_sqlparse_event_session_unsubscribed(_Config) ->
|
|
%% TODO
|
|
ok.
|
|
|
|
%% FROM $events/message_delivered
|
|
t_sqlparse_event_message_delivered(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
%% recursively republish to t2, if the msg delivered
|
|
TopicRule = create_simple_repub_rule(
|
|
<<"t2">>,
|
|
"SELECT * "
|
|
"FROM \"$events/message_delivered\" "),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 0),
|
|
Fun = fun() ->
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
received_t2
|
|
after 500 ->
|
|
received_nothing
|
|
end
|
|
end,
|
|
received_t2 = Fun(),
|
|
received_t2 = Fun(),
|
|
received_nothing = Fun(),
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
%% FROM $events/message_acked
|
|
t_sqlparse_event_message_acked(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
%% republish to `repub/if/acked`, if the msg acked
|
|
TopicRule = create_simple_repub_rule(
|
|
<<"repub/if/acked">>,
|
|
"SELECT * "
|
|
"FROM \"$events/message_acked\" "),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"repub/if/acked">>, ?QOS_1),
|
|
|
|
Fun = fun() ->
|
|
receive {publish, #{topic := <<"repub/if/acked">>, payload := _}} ->
|
|
received_acked
|
|
after 500 ->
|
|
received_nothing
|
|
end
|
|
end,
|
|
|
|
%% sub with max qos1 to generate ack packet
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"any/topic">>, ?QOS_1),
|
|
|
|
Payload = <<"{\"x\":1,\"y\":144}">>,
|
|
|
|
%% even sub with qos1, but publish with qos0, no ack
|
|
emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_0),
|
|
received_nothing = Fun(),
|
|
|
|
%% sub and pub both are qos1, acked
|
|
emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_1),
|
|
received_acked = Fun(),
|
|
received_nothing = Fun(),
|
|
|
|
%% pub with qos2 but subscribed with qos1, acked
|
|
emqtt:publish(Client, <<"any/topic">>, Payload, ?QOS_2),
|
|
received_acked = Fun(),
|
|
received_nothing = Fun(),
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
%% FROM $events/message_dropped
|
|
t_sqlparse_event_message_dropped(_Config) ->
|
|
ok = emqx_rule_engine:load_providers(),
|
|
%% republish to `repub/if/dropped`, if any msg dropped
|
|
TopicRule = create_simple_repub_rule(
|
|
<<"repub/if/dropped">>,
|
|
"SELECT * "
|
|
"FROM \"$events/message_dropped\" "),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
|
|
%% this message will be dropped and then repub to `repub/if/dropped`
|
|
emqtt:publish(Client, <<"any/topic/1">>, <<"{\"x\":1,\"y\":144}">>, 0),
|
|
|
|
Fun_t2 = fun() ->
|
|
receive {publish, #{topic := <<"repub/if/dropped">>, payload := _}} ->
|
|
received_repub
|
|
after 500 ->
|
|
received_nothing
|
|
end
|
|
end,
|
|
|
|
%% No client subscribed `any/topic`, triggered repub rule.
|
|
%% But havn't sub `repub/if/dropped`, so the repub message will also be dropped with recursively republish.
|
|
received_nothing = Fun_t2(),
|
|
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"repub/if/dropped">>, 0),
|
|
|
|
%% this message will be dropped and then repub to `repub/to/t`
|
|
emqtt:publish(Client, <<"any/topic/2">>, <<"{\"x\":1,\"y\":144}">>, 0),
|
|
|
|
%% received subscribed `repub/to/t`
|
|
received_repub = Fun_t2(),
|
|
|
|
emqtt:stop(Client),
|
|
emqx_rule_registry:remove_rule(TopicRule).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for `foreach`
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlparse_foreach_1(_Config) ->
|
|
%% Verify foreach with and without 'AS'
|
|
Sql = "foreach payload.sensors as s "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"s">> := 1}, #{<<"s">> := 2}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"sensors\": [1, 2]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{item := 1}, #{item := 2}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"sensors\": [1, 2]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql3 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{item := #{<<"cmd">> := <<"1">>}, clientid := <<"c_a">>},
|
|
#{item := #{ <<"cmd">> := <<"2">>
|
|
, <<"name">> := <<"ct">>
|
|
}
|
|
, clientid := <<"c_a">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql3,
|
|
<<"ctx">> =>
|
|
#{ <<"payload">> => <<"{\"sensors\": [{\"cmd\":\"1\"}, "
|
|
"{\"cmd\":\"2\",\"name\":\"ct\"}]}">>
|
|
, <<"clientid">> => <<"c_a">>
|
|
, <<"topic">> => <<"t/a">>
|
|
}})),
|
|
Sql4 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
{ok,[#{metadata := #{rule_id := TRuleId}},
|
|
#{metadata := #{rule_id := TRuleId}}]} =
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql4,
|
|
<<"ctx">> => #{
|
|
<<"payload">> => <<"{\"sensors\": [1, 2]}">>,
|
|
<<"topic">> => <<"t/a">>}}),
|
|
?assert(is_binary(TRuleId)).
|
|
|
|
t_sqlparse_foreach_2(_Config) ->
|
|
%% Verify foreach-do with and without 'AS'
|
|
Sql = "foreach payload.sensors as s "
|
|
"do s.cmd as msg_type "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>},#{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"do item.cmd as msg_type "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>},#{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql3 = "foreach payload.sensors "
|
|
"do item as item "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"item">> := 1},#{<<"item">> := 2}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql3,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [1, 2]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_3(_Config) ->
|
|
%% Verify foreach-incase with and without 'AS'
|
|
Sql = "foreach payload.sensors as s "
|
|
"incase s.cmd != 1 "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"s">> := #{<<"cmd">> := 2}},
|
|
#{<<"s">> := #{<<"cmd">> := 3}}
|
|
]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":1}, {\"cmd\":2}, {\"cmd\":3}]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"incase item.cmd != 1 "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{item := #{<<"cmd">> := 2}},
|
|
#{item := #{<<"cmd">> := 3}}
|
|
]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":1}, {\"cmd\":2}, {\"cmd\":3}]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_4(_Config) ->
|
|
%% Verify foreach-do-incase
|
|
Sql = "foreach payload.sensors as s "
|
|
"do s.cmd as msg_type, s.name as name "
|
|
"incase is_not_null(s.cmd) "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>},#{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ ok
|
|
, [ #{<<"msg_type">> := <<"1">>, <<"name">> := <<"n1">>}
|
|
, #{<<"msg_type">> := <<"2">>}
|
|
]
|
|
},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\", \"name\":\"n1\"}, "
|
|
"{\"cmd\":\"2\"}, {\"name\":\"n3\"}]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok,[]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"sensors\": [1, 2]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_5(_Config) ->
|
|
%% Verify foreach on a empty-list or non-list variable
|
|
Sql = "foreach payload.sensors as s "
|
|
"do s.cmd as msg_type, s.name as name "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[]}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"sensors\": 1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok,[]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"sensors\": []}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[]}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"sensors\": 1}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_6(_Config) ->
|
|
%% Verify foreach on a empty-list or non-list variable
|
|
Sql = "foreach json_decode(payload) "
|
|
"do item.id as zid, timestamp as t "
|
|
"from \"t/#\" ",
|
|
{ok, Res} = emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"[{\"id\": 5},{\"id\": 15}]">>,
|
|
<<"topic">> => <<"t/a">>}}),
|
|
[#{<<"t">> := Ts1, <<"zid">> := Zid1},
|
|
#{<<"t">> := Ts2, <<"zid">> := Zid2}] = Res,
|
|
?assertEqual(true, is_integer(Ts1)),
|
|
?assertEqual(true, is_integer(Ts2)),
|
|
?assert(Zid1 == 5 orelse Zid1 == 15),
|
|
?assert(Zid2 == 5 orelse Zid2 == 15).
|
|
|
|
t_sqlparse_foreach_7(_Config) ->
|
|
%% Verify foreach-do-incase and cascaded AS
|
|
Sql = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"incase is_not_null(info.cmd) "
|
|
"from \"t/#\" "
|
|
"where s.page = '2' ",
|
|
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": "
|
|
"{\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>,
|
|
?assertMatch({ ok
|
|
, [ #{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}
|
|
, #{<<"msg_type">> := <<"2">>}
|
|
]
|
|
},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => Payload,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql2 = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"incase is_not_null(info.cmd) "
|
|
"from \"t/#\" "
|
|
"where s.page = '3' ",
|
|
?assertMatch({error, nomatch},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => Payload,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_8(_Config) ->
|
|
%% Verify foreach-do-incase and cascaded AS
|
|
Sql = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"incase is_map(info) "
|
|
"from \"t/#\" "
|
|
"where s.page = '2' ",
|
|
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": "
|
|
"{\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }">>,
|
|
?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => Payload,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
Sql3 = "foreach json_decode(payload) as p, p.sensors as s,"
|
|
" s.collection as c, sublist(2,1,c.info) as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"from \"t/#\" "
|
|
"where s.page = '2' ",
|
|
[?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => SqlN,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => Payload,
|
|
<<"topic">> => <<"t/a">>}}))
|
|
|| SqlN <- [Sql3]].
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for `case..when..`
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlparse_case_when_1(_Config) ->
|
|
%% case-when-else clause
|
|
Sql = "select "
|
|
" case when payload.x < 0 then 0 "
|
|
" when payload.x > 7 then 7 "
|
|
" else payload.x "
|
|
" end as y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"y">> := 1}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 0}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 0}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 0}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": -1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 7}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 7}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 7}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 8}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
ok.
|
|
|
|
t_sqlparse_case_when_2(_Config) ->
|
|
% switch clause
|
|
Sql = "select "
|
|
" case payload.x when 1 then 2 "
|
|
" when 2 then 3 "
|
|
" else 4 "
|
|
" end as y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"y">> := 2}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 3}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 2}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 4}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 4}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 4}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 7}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 4}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 8}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_case_when_3(_Config) ->
|
|
%% case-when clause
|
|
Sql = "select "
|
|
" case when payload.x < 0 then 0 "
|
|
" when payload.x > 7 then 7 "
|
|
" end as y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 5}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 0}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 0}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": -1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 7}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 7}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 8}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for array index
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlparse_array_index_1(_Config) ->
|
|
%% index get
|
|
Sql = "select "
|
|
" json_decode(payload) as p, "
|
|
" p[1] as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"a">> := #{<<"x">> := 1}}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"[{\"x\": 1}]">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": 1}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% index get without 'as'
|
|
Sql2 = "select "
|
|
" payload.x[2] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [3]}}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> => #{<<"payload">> => #{<<"x">> => [1,3,4]},
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% index get without 'as' again
|
|
Sql3 = "select "
|
|
" payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch( {ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 3}]}}}
|
|
, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql3,
|
|
<<"ctx">> => #{<<"payload">> => #{<<"x">> => [1,#{y => 3},4]},
|
|
<<"topic">> => <<"t/a">>}})
|
|
),
|
|
|
|
%% index get with 'as'
|
|
Sql4 = "select "
|
|
" payload.x[2].y as b "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := 3}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql4,
|
|
<<"ctx">> => #{<<"payload">> => #{<<"x">> => [1,#{y => 3},4]},
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_index_2(_Config) ->
|
|
%% array get with negative index
|
|
Sql1 = "select "
|
|
" payload.x[-2].y as b "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := 3}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> => #{<<"payload">> => #{<<"x">> => [1,#{y => 3},4]},
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% array append to head or tail of a list:
|
|
Sql2 = "select "
|
|
" payload.x as b, "
|
|
" 1 as c[-0], "
|
|
" 2 as c[-0], "
|
|
" b as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := 0, <<"c">> := [0,1,2]}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> => #{<<"payload">> => #{<<"x">> => 0},
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% construct an empty list:
|
|
Sql3 = "select "
|
|
" [] as c, "
|
|
" 1 as c[-0], "
|
|
" 2 as c[-0], "
|
|
" 0 as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,1,2]}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql3,
|
|
<<"ctx">> => #{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% construct a list:
|
|
Sql4 = "select "
|
|
" [payload.a, \"topic\", 'c'] as c, "
|
|
" 1 as c[-0], "
|
|
" 2 as c[-0], "
|
|
" 0 as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,11,<<"t/a">>,<<"c">>,1,2]}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql4,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"a\":11}">>,
|
|
<<"topic">> => <<"t/a">>
|
|
}})).
|
|
|
|
t_sqlparse_array_index_3(_Config) ->
|
|
%% array with json string payload:
|
|
Sql0 = "select "
|
|
"payload,"
|
|
"payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := [1,2]}, 3]}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql0,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% same as above but don't select payload:
|
|
Sql1 = "select "
|
|
"payload.x[2].y as b "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := [1,2]}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% same as above but add 'as' clause:
|
|
Sql2 = "select "
|
|
"payload.x[2].y as b.c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := #{<<"c">> := [1,2]}}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_index_4(_Config) ->
|
|
%% array with json string payload:
|
|
Sql0 = "select "
|
|
"0 as payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 0}]}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql0,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% array with json string payload, and also select payload.x:
|
|
Sql1 = "select "
|
|
"payload.x, "
|
|
"0 as payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_index_5(_Config) ->
|
|
Sql00 = "select "
|
|
" [1,2,3,4] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql00,
|
|
<<"ctx">> => #{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}}),
|
|
?assert(lists:any(fun({_K, V}) ->
|
|
V =:= [1,2,3,4]
|
|
end, maps:to_list(Res00))).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for rule metadata
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlparse_select_matadata_1(_Config) ->
|
|
%% array with json string payload:
|
|
Sql0 = "select "
|
|
"payload "
|
|
"from \"t/#\" ",
|
|
?assertNotMatch({ok, #{<<"payload">> := <<"abc">>, metadata := _}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql0,
|
|
<<"ctx">> => #{<<"payload">> => <<"abc">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
Sql1 = "select "
|
|
"payload, metadata "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := <<"abc">>, <<"metadata">> := _}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> => #{<<"payload">> => <<"abc">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for array range
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlparse_array_range_1(_Config) ->
|
|
%% get a range of list
|
|
Sql0 = "select "
|
|
" payload.a[1..4] as c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,1,2,3]}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql0,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"a\":[0,1,2,3,4,5]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% get a range from non-list data
|
|
Sql02 = "select "
|
|
" payload.a[1..4] as c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({error, {select_and_transform_error, {error,{range_get,non_list_data},_}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql02,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"x\":[0,1,2,3,4,5]}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
%% construct a range:
|
|
Sql1 = "select "
|
|
" [1..4] as c, "
|
|
" 5 as c[-0], "
|
|
" 6 as c[-0], "
|
|
" 0 as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,1,2,3,4,5,6]}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql1,
|
|
<<"ctx">> => #{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_range_2(_Config) ->
|
|
%% construct a range without 'as'
|
|
Sql00 = "select "
|
|
" [1..4] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql00,
|
|
<<"ctx">> => #{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}}),
|
|
?assert(lists:any(fun({_K, V}) ->
|
|
V =:= [1,2,3,4]
|
|
end, maps:to_list(Res00))),
|
|
%% construct a range without 'as'
|
|
Sql01 = "select "
|
|
" a[2..4] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"a">> := [2,3,4]}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql01,
|
|
<<"ctx">> => #{<<"a">> => [1,2,3,4,5],
|
|
<<"topic">> => <<"t/a">>}})),
|
|
%% get a range of list without 'as'
|
|
Sql02 = "select "
|
|
" payload.a[1..4] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"a">> := [0,1,2,3]}}}, emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql02,
|
|
<<"ctx">> => #{<<"payload">> => <<"{\"a\":[0,1,2,3,4,5]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for boolean
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_sqlparse_true_false(_Config) ->
|
|
%% construct a range without 'as'
|
|
Sql00 = "select "
|
|
" true as a, false as b, "
|
|
" false as x.y, true as c[-0] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql00,
|
|
<<"ctx">> => #{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}}),
|
|
?assertMatch(#{<<"a">> := true, <<"b">> := false,
|
|
<<"x">> := #{<<"y">> := false},
|
|
<<"c">> := [true]
|
|
}, Res00).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for compare
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-define(TEST_SQL(SQL),
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => SQL,
|
|
<<"ctx">> => #{<<"payload">> => <<"{}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|
|
|
|
t_sqlparse_compare_undefined(_Config) ->
|
|
Sql00 = "select "
|
|
" * "
|
|
"from \"t/#\" "
|
|
"where dev != undefined ",
|
|
%% no match
|
|
?assertMatch({error, nomatch}, ?TEST_SQL(Sql00)),
|
|
|
|
Sql01 = "select "
|
|
" 'd' as dev "
|
|
"from \"t/#\" "
|
|
"where dev != undefined ",
|
|
{ok, Res01} = ?TEST_SQL(Sql01),
|
|
%% pass
|
|
?assertMatch(#{}, Res01),
|
|
|
|
Sql02 = "select "
|
|
" * "
|
|
"from \"t/#\" "
|
|
"where dev != 'undefined' ",
|
|
{ok, Res02} = ?TEST_SQL(Sql02),
|
|
%% pass
|
|
?assertMatch(#{}, Res02).
|
|
|
|
t_sqlparse_compare_null_null(_Config) ->
|
|
%% test undefined == undefined
|
|
Sql00 = "select "
|
|
" a = b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} = ?TEST_SQL(Sql00),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res00),
|
|
|
|
%% test undefined != undefined
|
|
Sql01 = "select "
|
|
" a != b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res01} = ?TEST_SQL(Sql01),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res01),
|
|
|
|
%% test undefined > undefined
|
|
Sql02 = "select "
|
|
" a > b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res02} = ?TEST_SQL(Sql02),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res02),
|
|
|
|
%% test undefined < undefined
|
|
Sql03 = "select "
|
|
" a < b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res03} = ?TEST_SQL(Sql03),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res03),
|
|
|
|
%% test undefined <= undefined
|
|
Sql04 = "select "
|
|
" a <= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res04} = ?TEST_SQL(Sql04),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res04),
|
|
|
|
%% test undefined >= undefined
|
|
Sql05 = "select "
|
|
" a >= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res05} = ?TEST_SQL(Sql05),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res05).
|
|
|
|
t_sqlparse_compare_null_notnull(_Config) ->
|
|
%% test undefined == b
|
|
Sql00 = "select "
|
|
" 'b' as b, a = b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} = ?TEST_SQL(Sql00),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res00),
|
|
|
|
%% test undefined != b
|
|
Sql01 = "select "
|
|
" 'b' as b, a != b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res01} = ?TEST_SQL(Sql01),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res01),
|
|
|
|
%% test undefined > b
|
|
Sql02 = "select "
|
|
" 'b' as b, a > b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res02} = ?TEST_SQL(Sql02),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res02),
|
|
|
|
%% test undefined < b
|
|
Sql03 = "select "
|
|
" 'b' as b, a < b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res03} = ?TEST_SQL(Sql03),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res03),
|
|
|
|
%% test undefined <= b
|
|
Sql04 = "select "
|
|
" 'b' as b, a <= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res04} = ?TEST_SQL(Sql04),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res04),
|
|
|
|
%% test undefined >= b
|
|
Sql05 = "select "
|
|
" 'b' as b, a >= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res05} = ?TEST_SQL(Sql05),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res05).
|
|
|
|
t_sqlparse_compare_notnull_null(_Config) ->
|
|
%% test 'a' == undefined
|
|
Sql00 = "select "
|
|
" 'a' as a, a = b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} = ?TEST_SQL(Sql00),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res00),
|
|
|
|
%% test 'a' != undefined
|
|
Sql01 = "select "
|
|
" 'a' as a, a != b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res01} = ?TEST_SQL(Sql01),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res01),
|
|
|
|
%% test 'a' > undefined
|
|
Sql02 = "select "
|
|
" 'a' as a, a > b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res02} = ?TEST_SQL(Sql02),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res02),
|
|
|
|
%% test 'a' < undefined
|
|
Sql03 = "select "
|
|
" 'a' as a, a < b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res03} = ?TEST_SQL(Sql03),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res03),
|
|
|
|
%% test 'a' <= undefined
|
|
Sql04 = "select "
|
|
" 'a' as a, a <= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res04} = ?TEST_SQL(Sql04),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res04),
|
|
|
|
%% test 'a' >= undefined
|
|
Sql05 = "select "
|
|
" 'a' as a, a >= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res05} = ?TEST_SQL(Sql05),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res05).
|
|
|
|
t_sqlparse_compare(_Config) ->
|
|
Sql00 = "select "
|
|
" 'a' as a, 'a' as b, a = b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} = ?TEST_SQL(Sql00),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res00),
|
|
|
|
Sql01 = "select "
|
|
" is_null(a) as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res01} = ?TEST_SQL(Sql01),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res01),
|
|
|
|
Sql02 = "select "
|
|
" 1 as a, 2 as b, a < b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res02} = ?TEST_SQL(Sql02),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res02),
|
|
|
|
Sql03 = "select "
|
|
" 1 as a, 2 as b, a > b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res03} = ?TEST_SQL(Sql03),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res03),
|
|
|
|
Sql04 = "select "
|
|
" 1 as a, 2 as b, a = b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res04} = ?TEST_SQL(Sql04),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res04),
|
|
|
|
%% test 'a' >= undefined
|
|
Sql05 = "select "
|
|
" 1 as a, 2 as b, a >= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res05} = ?TEST_SQL(Sql05),
|
|
?assertMatch(#{<<"c">> := false
|
|
}, Res05),
|
|
|
|
%% test 'a' >= undefined
|
|
Sql06 = "select "
|
|
" 1 as a, 2 as b, a <= b as c "
|
|
"from \"t/#\" ",
|
|
{ok, Res06} = ?TEST_SQL(Sql06),
|
|
?assertMatch(#{<<"c">> := true
|
|
}, Res06).
|
|
|
|
|
|
|
|
t_sqlparse_new_map(_Config) ->
|
|
%% construct a range without 'as'
|
|
Sql00 = "select "
|
|
" map_new() as a, map_new() as b, "
|
|
" map_new() as x.y, map_new() as c[-0] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql00,
|
|
<<"ctx">> => #{<<"payload">> => <<"">>,
|
|
<<"topic">> => <<"t/a">>}}),
|
|
?assertMatch(#{<<"a">> := #{}, <<"b">> := #{},
|
|
<<"x">> := #{<<"y">> := #{}},
|
|
<<"c">> := [#{}]
|
|
}, Res00).
|
|
|
|
|
|
t_sqlparse_invalid_json(_Config) ->
|
|
Sql02 = "select "
|
|
" payload.a[1..4] as c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({error, {select_and_transform_error, {error,{decode_json_failed,_},_}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql02,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> => <<"{\"x\":[0,1,2,3,}">>,
|
|
<<"topic">> => <<"t/a">>}})),
|
|
|
|
|
|
Sql2 = "foreach payload.sensors "
|
|
"do item.cmd as msg_type "
|
|
"from \"t/#\" ",
|
|
?assertMatch({error, {select_and_collect_error, {error,{decode_json_failed,_},_}}},
|
|
emqx_rule_sqltester:test(
|
|
#{<<"rawsql">> => Sql2,
|
|
<<"ctx">> =>
|
|
#{<<"payload">> =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>,
|
|
<<"topic">> => <<"t/a">>}})).
|