1605 lines
63 KiB
Erlang
1605 lines
63 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_rule_engine_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
|
-define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)).
|
|
|
|
all() ->
|
|
[ {group, engine}
|
|
, {group, funcs}
|
|
, {group, registry}
|
|
, {group, runtime}
|
|
, {group, events}
|
|
, {group, bugs}
|
|
].
|
|
|
|
suite() ->
|
|
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
|
|
|
|
groups() ->
|
|
[{engine, [sequence],
|
|
[t_create_rule
|
|
]},
|
|
{funcs, [],
|
|
[t_kv_store
|
|
]},
|
|
{registry, [sequence],
|
|
[t_add_get_remove_rule,
|
|
t_add_get_remove_rules,
|
|
t_create_existing_rule,
|
|
t_get_rules_for_topic,
|
|
t_get_rules_for_topic_2,
|
|
t_get_rules_with_same_event
|
|
]},
|
|
{runtime, [],
|
|
[t_match_atom_and_binary,
|
|
t_sqlselect_0,
|
|
t_sqlselect_00,
|
|
t_sqlselect_01,
|
|
t_sqlselect_02,
|
|
t_sqlselect_1,
|
|
t_sqlselect_2,
|
|
t_sqlselect_3,
|
|
t_sqlparse_event_1,
|
|
t_sqlparse_event_2,
|
|
t_sqlparse_event_3,
|
|
t_sqlparse_foreach_1,
|
|
t_sqlparse_foreach_2,
|
|
t_sqlparse_foreach_3,
|
|
t_sqlparse_foreach_4,
|
|
t_sqlparse_foreach_5,
|
|
t_sqlparse_foreach_6,
|
|
t_sqlparse_foreach_7,
|
|
t_sqlparse_foreach_8,
|
|
t_sqlparse_case_when_1,
|
|
t_sqlparse_case_when_2,
|
|
t_sqlparse_case_when_3,
|
|
t_sqlparse_array_index_1,
|
|
t_sqlparse_array_index_2,
|
|
t_sqlparse_array_index_3,
|
|
t_sqlparse_array_index_4,
|
|
t_sqlparse_array_index_5,
|
|
t_sqlparse_select_matadata_1,
|
|
t_sqlparse_array_range_1,
|
|
t_sqlparse_array_range_2,
|
|
t_sqlparse_true_false,
|
|
t_sqlparse_new_map
|
|
]},
|
|
{events, [],
|
|
[t_events
|
|
]},
|
|
{bugs, [],
|
|
[t_sqlparse_payload_as,
|
|
t_sqlparse_nested_get
|
|
]}
|
|
].
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Overall setup/teardown
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init_per_suite(Config) ->
|
|
application:load(emqx_conf),
|
|
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
|
|
ok.
|
|
|
|
on_resource_create(_id, _) -> #{}.
|
|
on_resource_destroy(_id, _) -> ok.
|
|
on_get_resource_status(_id, _) -> #{}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Group specific setup/teardown
|
|
%%------------------------------------------------------------------------------
|
|
|
|
group(_Groupname) ->
|
|
[].
|
|
|
|
init_per_group(registry, Config) ->
|
|
Config;
|
|
init_per_group(_Groupname, Config) ->
|
|
Config.
|
|
|
|
end_per_group(_Groupname, _Config) ->
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcase specific setup/teardown
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init_per_testcase(t_events, Config) ->
|
|
init_events_counters(),
|
|
SQL = "SELECT * FROM \"$events/client_connected\", "
|
|
"\"$events/client_disconnected\", "
|
|
"\"$events/session_subscribed\", "
|
|
"\"$events/session_unsubscribed\", "
|
|
"\"$events/message_acked\", "
|
|
"\"$events/message_delivered\", "
|
|
"\"$events/message_dropped\", "
|
|
"\"t1\"",
|
|
{ok, Rule} = emqx_rule_engine:create_rule(
|
|
#{id => <<"rule:t_events">>,
|
|
sql => SQL,
|
|
outputs => [
|
|
#{function => console},
|
|
#{function => <<"emqx_rule_engine_SUITE:output_record_triggered_events">>,
|
|
args => #{}}
|
|
],
|
|
description => <<"to console and record triggered events">>}),
|
|
?assertMatch(#{id := <<"rule:t_events">>}, Rule),
|
|
[{hook_points_rules, Rule} | Config];
|
|
init_per_testcase(_TestCase, Config) ->
|
|
Config.
|
|
|
|
end_per_testcase(t_events, Config) ->
|
|
ets:delete(events_record_tab),
|
|
ok = delete_rule(?config(hook_points_rules, Config));
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for rule engine
|
|
%%------------------------------------------------------------------------------
|
|
t_create_rule(_Config) ->
|
|
{ok, #{id := Id}} = emqx_rule_engine:create_rule(
|
|
#{sql => <<"select * from \"t/a\"">>,
|
|
id => <<"t_create_rule">>,
|
|
outputs => [#{function => console}],
|
|
description => <<"debug rule">>}),
|
|
ct:pal("======== emqx_rule_engine:get_rules :~p", [emqx_rule_engine:get_rules()]),
|
|
?assertMatch({ok, #{id := Id, from := [<<"t/a">>]}},
|
|
emqx_rule_engine:get_rule(Id)),
|
|
delete_rule(Id),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for rule funcs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_kv_store(_) ->
|
|
undefined = emqx_rule_funcs:kv_store_get(<<"abc">>),
|
|
<<"not_found">> = emqx_rule_funcs:kv_store_get(<<"abc">>, <<"not_found">>),
|
|
emqx_rule_funcs:kv_store_put(<<"abc">>, 1),
|
|
1 = emqx_rule_funcs:kv_store_get(<<"abc">>),
|
|
emqx_rule_funcs:kv_store_del(<<"abc">>),
|
|
undefined = emqx_rule_funcs:kv_store_get(<<"abc">>).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for rule registry
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_add_get_remove_rule(_Config) ->
|
|
RuleId0 = <<"rule-debug-0">>,
|
|
ok = emqx_rule_engine:insert_rule(make_simple_rule(RuleId0)),
|
|
?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)),
|
|
ok = delete_rule(RuleId0),
|
|
?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)),
|
|
|
|
RuleId1 = <<"rule-debug-1">>,
|
|
Rule1 = make_simple_rule(RuleId1),
|
|
ok = emqx_rule_engine:insert_rule(Rule1),
|
|
?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)),
|
|
ok = delete_rule(Rule1),
|
|
?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)),
|
|
ok.
|
|
|
|
t_add_get_remove_rules(_Config) ->
|
|
delete_rules_by_ids(emqx_rule_engine:get_rules()),
|
|
ok = insert_rules(
|
|
[make_simple_rule(<<"rule-debug-1">>),
|
|
make_simple_rule(<<"rule-debug-2">>)]),
|
|
?assertEqual(2, length(emqx_rule_engine:get_rules())),
|
|
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]),
|
|
?assertEqual([], emqx_rule_engine:get_rules()),
|
|
ok.
|
|
|
|
t_create_existing_rule(_Config) ->
|
|
%% create a rule using given rule id
|
|
{ok, _} = emqx_rule_engine:create_rule(
|
|
#{id => <<"an_existing_rule">>,
|
|
sql => <<"select * from \"t/#\"">>,
|
|
outputs => [#{function => console}]
|
|
}),
|
|
{ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>),
|
|
?assertEqual(<<"select * from \"t/#\"">>, SQL),
|
|
|
|
ok = delete_rule(<<"an_existing_rule">>),
|
|
?assertEqual(not_found, emqx_rule_engine:get_rule(<<"an_existing_rule">>)),
|
|
ok.
|
|
|
|
t_get_rules_for_topic(_Config) ->
|
|
Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)),
|
|
ok = insert_rules(
|
|
[make_simple_rule(<<"rule-debug-1">>),
|
|
make_simple_rule(<<"rule-debug-2">>)]),
|
|
?assertEqual(Len0+2, length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>))),
|
|
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]),
|
|
ok.
|
|
|
|
t_get_rules_ordered_by_ts(_Config) ->
|
|
Now = fun() -> erlang:system_time(nanosecond) end,
|
|
ok = insert_rules(
|
|
[make_simple_rule_with_ts(<<"rule-debug-0">>, Now()),
|
|
make_simple_rule_with_ts(<<"rule-debug-1">>, Now()),
|
|
make_simple_rule_with_ts(<<"rule-debug-2">>, Now())
|
|
]),
|
|
?assertMatch([
|
|
#{id := <<"rule-debug-0">>},
|
|
#{id := <<"rule-debug-1">>},
|
|
#{id := <<"rule-debug-2">>}
|
|
], emqx_rule_engine:get_rules_ordered_by_ts()).
|
|
|
|
t_get_rules_for_topic_2(_Config) ->
|
|
Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)),
|
|
ok = insert_rules(
|
|
[make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
|
|
make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]),
|
|
make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [<<"simple/+/1">>]),
|
|
make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/1\"">>, [<<"simple/1">>]),
|
|
make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]),
|
|
make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>])
|
|
]),
|
|
?assertEqual(Len0+4, length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>))),
|
|
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
|
|
ok.
|
|
|
|
t_get_rules_with_same_event(_Config) ->
|
|
PubT = <<"simple/1">>,
|
|
PubN = length(emqx_rule_engine:get_rules_with_same_event(PubT)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)),
|
|
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)),
|
|
ok = insert_rules(
|
|
[make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
|
|
make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
|
|
make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]),
|
|
make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]),
|
|
make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]),
|
|
make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]),
|
|
make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]),
|
|
make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]),
|
|
make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]),
|
|
make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>])
|
|
]),
|
|
?assertEqual(PubN + 3, length(emqx_rule_engine:get_rules_with_same_event(PubT))),
|
|
?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>))),
|
|
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>))),
|
|
?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>))),
|
|
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>))),
|
|
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>))),
|
|
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>))),
|
|
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>))),
|
|
ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Test cases for rule runtime
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_events(_Config) ->
|
|
{ok, Client} = emqtt:start_link(
|
|
[ {username, <<"u_event">>}
|
|
, {clientid, <<"c_event">>}
|
|
, {proto_ver, v5}
|
|
, {properties, #{'Session-Expiry-Interval' => 60}}
|
|
]),
|
|
{ok, Client2} = emqtt:start_link(
|
|
[ {username, <<"u_event2">>}
|
|
, {clientid, <<"c_event2">>}
|
|
, {proto_ver, v5}
|
|
, {properties, #{'Session-Expiry-Interval' => 60}}
|
|
]),
|
|
ct:pal("====== verify $events/client_connected"),
|
|
client_connected(Client, Client2),
|
|
ct:pal("====== verify $events/session_subscribed"),
|
|
session_subscribed(Client2),
|
|
ct:pal("====== verify t1"),
|
|
message_publish(Client),
|
|
ct:pal("====== verify $events/message_delivered"),
|
|
message_delivered(Client),
|
|
ct:pal("====== verify $events/message_acked"),
|
|
message_acked(Client),
|
|
ct:pal("====== verify $events/session_unsubscribed"),
|
|
session_unsubscribed(Client2),
|
|
ct:pal("====== verify $events/message_dropped"),
|
|
message_dropped(Client),
|
|
ct:pal("====== verify $events/client_disconnected"),
|
|
client_disconnected(Client, Client2),
|
|
ok.
|
|
|
|
message_publish(Client) ->
|
|
emqtt:publish(Client, <<"t1">>, #{'Message-Expiry-Interval' => 60},
|
|
<<"{\"id\": 1, \"name\": \"ha\"}">>, [{qos, 1}]),
|
|
verify_event('message.publish'),
|
|
ok.
|
|
client_connected(Client, Client2) ->
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _} = emqtt:connect(Client2),
|
|
verify_event('client.connected'),
|
|
ok.
|
|
client_disconnected(Client, Client2) ->
|
|
ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}),
|
|
ok = emqtt:disconnect(Client2, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}),
|
|
verify_event('client.disconnected'),
|
|
ok.
|
|
session_subscribed(Client2) ->
|
|
{ok, _, _} = emqtt:subscribe(Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>, 1),
|
|
verify_event('session.subscribed'),
|
|
ok.
|
|
session_unsubscribed(Client2) ->
|
|
{ok, _, _} = emqtt:unsubscribe(Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>),
|
|
verify_event('session.unsubscribed'),
|
|
ok.
|
|
|
|
message_delivered(_Client) ->
|
|
verify_event('message.delivered'),
|
|
ok.
|
|
message_dropped(Client) ->
|
|
message_publish(Client),
|
|
verify_event('message.dropped'),
|
|
ok.
|
|
message_acked(_Client) ->
|
|
verify_event('message.acked'),
|
|
ok.
|
|
|
|
t_match_atom_and_binary(_Config) ->
|
|
SQL = "SELECT connected_at as ts, * "
|
|
"FROM \"$events/client_connected\" "
|
|
"WHERE username = 'emqx2' ",
|
|
Repub = republish_output(<<"t2">>, <<"user:${ts}">>),
|
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
#{sql => SQL, id => ?TMP_RULEID,
|
|
outputs => [Repub]}),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
ct:sleep(100),
|
|
{ok, Client2} = emqtt:start_link([{username, <<"emqx2">>}]),
|
|
{ok, _} = emqtt:connect(Client2),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
<<"user:", ConnAt/binary>> = Payload,
|
|
_ = binary_to_integer(ConnAt)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
delete_rule(TopicRule).
|
|
|
|
t_sqlselect_0(_Config) ->
|
|
%% Verify SELECT with and without 'AS'
|
|
Sql = "select * "
|
|
"from \"t/#\" "
|
|
"where payload.cmd.info = 'tt'",
|
|
?assertMatch({ok,#{payload := <<"{\"cmd\": {\"info\":\"tt\"}}">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "select payload.cmd as cmd "
|
|
"from \"t/#\" "
|
|
"where cmd.info = 'tt'",
|
|
?assertMatch({ok,#{<<"cmd">> := #{<<"info">> := <<"tt">>}}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql3 = "select payload.cmd as cmd, cmd.info as info "
|
|
"from \"t/#\" "
|
|
"where cmd.info = 'tt' and info = 'tt'",
|
|
?assertMatch({ok,#{<<"cmd">> := #{<<"info">> := <<"tt">>},
|
|
<<"info">> := <<"tt">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql3,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% cascaded as
|
|
Sql4 = "select payload.cmd as cmd, cmd.info as meta.info "
|
|
"from \"t/#\" "
|
|
"where cmd.info = 'tt' and meta.info = 'tt'",
|
|
?assertMatch({ok,#{<<"cmd">> := #{<<"info">> := <<"tt">>},
|
|
<<"meta">> := #{<<"info">> := <<"tt">>}}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql4,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"cmd\": {\"info\":\"tt\"}}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlselect_00(_Config) ->
|
|
%% Verify plus/subtract and unary_add_or_subtract
|
|
Sql = "select 1-1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 0}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => <<"">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql1 = "select -1 + 1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 0}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql1,
|
|
context =>
|
|
#{payload => <<"">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "select 1 + 1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 2}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context =>
|
|
#{payload => <<"">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql3 = "select +1 as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"a">> := 1}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql3,
|
|
context =>
|
|
#{payload => <<"">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlselect_01(_Config) ->
|
|
SQL = "SELECT json_decode(payload) as p, payload "
|
|
"FROM \"t3/#\", \"t1\" "
|
|
"WHERE p.x = 1",
|
|
Repub = republish_output(<<"t2">>),
|
|
{ok, TopicRule1} = emqx_rule_engine:create_rule(
|
|
#{sql => SQL, id => ?TMP_RULEID,
|
|
outputs => [Repub]}),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
|
ct:sleep(100),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
ct:fail(unexpected_t2)
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
|
|
receive {publish, #{topic := T3, payload := Payload3}} ->
|
|
?assertEqual(<<"t2">>, T3),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload3)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
delete_rule(TopicRule1).
|
|
|
|
t_sqlselect_02(_Config) ->
|
|
SQL = "SELECT * "
|
|
"FROM \"t3/#\", \"t1\" "
|
|
"WHERE payload.x = 1",
|
|
Repub = republish_output(<<"t2">>),
|
|
{ok, TopicRule1} = emqx_rule_engine:create_rule(
|
|
#{sql => SQL, id => ?TMP_RULEID,
|
|
outputs => [Repub]}),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
|
|
ct:sleep(100),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := Payload0}} ->
|
|
ct:fail({unexpected_t2, Payload0})
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
|
|
receive {publish, #{topic := T3, payload := Payload3}} ->
|
|
?assertEqual(<<"t2">>, T3),
|
|
?assertEqual(<<"{\"x\":1}">>, Payload3)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
delete_rule(TopicRule1).
|
|
|
|
t_sqlselect_1(_Config) ->
|
|
SQL = "SELECT json_decode(payload) as p, payload "
|
|
"FROM \"t1\" "
|
|
"WHERE p.x = 1 and p.y = 2",
|
|
Repub = republish_output(<<"t2">>),
|
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
#{sql => SQL, id => ?TMP_RULEID,
|
|
outputs => [Repub]}),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
ct:sleep(200),
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
ct:fail(unexpected_t2)
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
delete_rule(TopicRule).
|
|
|
|
t_sqlselect_2(_Config) ->
|
|
%% recursively republish to t2
|
|
SQL = "SELECT * FROM \"t2\" ",
|
|
Repub = republish_output(<<"t2">>),
|
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
#{sql => SQL, id => ?TMP_RULEID,
|
|
outputs => [Repub]}),
|
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
|
|
emqtt:publish(Client, <<"t2">>, <<"{\"x\":1,\"y\":144}">>, 0),
|
|
Fun = fun() ->
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
received_t2
|
|
after 500 ->
|
|
received_nothing
|
|
end
|
|
end,
|
|
received_t2 = Fun(),
|
|
received_t2 = Fun(),
|
|
received_nothing = Fun(),
|
|
|
|
emqtt:stop(Client),
|
|
delete_rule(TopicRule).
|
|
|
|
t_sqlselect_3(_Config) ->
|
|
%% republish the client.connected msg
|
|
SQL = "SELECT * "
|
|
"FROM \"$events/client_connected\" "
|
|
"WHERE username = 'emqx1'",
|
|
Repub = republish_output(<<"t2">>, <<"clientid=${clientid}">>),
|
|
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
#{sql => SQL, id => ?TMP_RULEID,
|
|
outputs => [Repub]}),
|
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
|
|
{ok, _} = emqtt:connect(Client),
|
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
|
ct:sleep(200),
|
|
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
|
|
{ok, _} = emqtt:connect(Client1),
|
|
receive {publish, #{topic := T, payload := Payload}} ->
|
|
?assertEqual(<<"t2">>, T),
|
|
?assertEqual(<<"clientid=c_emqx1">>, Payload)
|
|
after 1000 ->
|
|
ct:fail(wait_for_t2)
|
|
end,
|
|
|
|
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":1}">>, 0),
|
|
receive {publish, #{topic := <<"t2">>, payload := _}} ->
|
|
ct:fail(unexpected_t2)
|
|
after 1000 ->
|
|
ok
|
|
end,
|
|
|
|
emqtt:stop(Client),
|
|
delete_rule(TopicRule).
|
|
|
|
t_sqlparse_event_1(_Config) ->
|
|
Sql = "select topic as tp "
|
|
"from \"$events/session_subscribed\" ",
|
|
?assertMatch({ok,#{<<"tp">> := <<"t/tt">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{topic => <<"t/tt">>}})).
|
|
|
|
t_sqlparse_event_2(_Config) ->
|
|
Sql = "select clientid "
|
|
"from \"$events/client_connected\" ",
|
|
?assertMatch({ok,#{<<"clientid">> := <<"abc">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{clientid => <<"abc">>}})).
|
|
|
|
t_sqlparse_event_3(_Config) ->
|
|
Sql = "select clientid, topic as tp "
|
|
"from \"t/tt\", \"$events/client_connected\" ",
|
|
?assertMatch({ok,#{<<"clientid">> := <<"abc">>, <<"tp">> := <<"t/tt">>}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{clientid => <<"abc">>, topic => <<"t/tt">>}})).
|
|
|
|
t_sqlparse_foreach_1(_Config) ->
|
|
%% Verify foreach with and without 'AS'
|
|
Sql = "foreach payload.sensors as s "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"s">> := 1}, #{<<"s">> := 2}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"sensors\": [1, 2]}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{item := 1}, #{item := 2}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context => #{payload => <<"{\"sensors\": [1, 2]}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql3 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{item := #{<<"cmd">> := <<"1">>}, clientid := <<"c_a">>},
|
|
#{item := #{<<"cmd">> := <<"2">>, <<"name">> := <<"ct">>}, clientid := <<"c_a">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql3,
|
|
context => #{
|
|
payload => <<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\",\"name\":\"ct\"}]}">>,
|
|
clientid => <<"c_a">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql4 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
{ok,[#{metadata := #{rule_id := TRuleId}},
|
|
#{metadata := #{rule_id := TRuleId}}]} =
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql4,
|
|
context => #{
|
|
payload => <<"{\"sensors\": [1, 2]}">>,
|
|
topic => <<"t/a">>}}),
|
|
?assert(is_binary(TRuleId)).
|
|
|
|
t_sqlparse_foreach_2(_Config) ->
|
|
%% Verify foreach-do with and without 'AS'
|
|
Sql = "foreach payload.sensors as s "
|
|
"do s.cmd as msg_type "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>},#{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"do item.cmd as msg_type "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>},#{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql3 = "foreach payload.sensors "
|
|
"do item as item "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"item">> := 1},#{<<"item">> := 2}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql3,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [1, 2]}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_3(_Config) ->
|
|
%% Verify foreach-incase with and without 'AS'
|
|
Sql = "foreach payload.sensors as s "
|
|
"incase s.cmd != 1 "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"s">> := #{<<"cmd">> := 2}},
|
|
#{<<"s">> := #{<<"cmd">> := 3}}
|
|
]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [{\"cmd\":1}, {\"cmd\":2}, {\"cmd\":3}]}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"incase item.cmd != 1 "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{item := #{<<"cmd">> := 2}},
|
|
#{item := #{<<"cmd">> := 3}}
|
|
]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [{\"cmd\":1}, {\"cmd\":2}, {\"cmd\":3}]}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_4(_Config) ->
|
|
%% Verify foreach-do-incase
|
|
Sql = "foreach payload.sensors as s "
|
|
"do s.cmd as msg_type, s.name as name "
|
|
"incase is_not_null(s.cmd) "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>},#{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\"}, {\"cmd\":\"2\"}]}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok,[#{<<"msg_type">> := <<"1">>, <<"name">> := <<"n1">>}, #{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload =>
|
|
<<"{\"sensors\": [{\"cmd\":\"1\", \"name\":\"n1\"}, {\"cmd\":\"2\"}, {\"name\":\"n3\"}]}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok,[]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => <<"{\"sensors\": [1, 2]}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_5(_Config) ->
|
|
%% Verify foreach on a empty-list or non-list variable
|
|
Sql = "foreach payload.sensors as s "
|
|
"do s.cmd as msg_type, s.name as name "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[]}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => <<"{\"sensors\": 1}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok,[]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => <<"{\"sensors\": []}">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "foreach payload.sensors "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,[]}, emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context =>
|
|
#{payload => <<"{\"sensors\": 1}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_6(_Config) ->
|
|
%% Verify foreach on a empty-list or non-list variable
|
|
Sql = "foreach json_decode(payload) "
|
|
"do item.id as zid, timestamp as t "
|
|
"from \"t/#\" ",
|
|
{ok, Res} = emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => <<"[{\"id\": 5},{\"id\": 15}]">>,
|
|
topic => <<"t/a">>}}),
|
|
[#{<<"t">> := Ts1, <<"zid">> := Zid1},
|
|
#{<<"t">> := Ts2, <<"zid">> := Zid2}] = Res,
|
|
?assertEqual(true, is_integer(Ts1)),
|
|
?assertEqual(true, is_integer(Ts2)),
|
|
?assert(Zid1 == 5 orelse Zid1 == 15),
|
|
?assert(Zid2 == 5 orelse Zid2 == 15).
|
|
|
|
t_sqlparse_foreach_7(_Config) ->
|
|
%% Verify foreach-do-incase and cascaded AS
|
|
Sql = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"incase is_not_null(info.cmd) "
|
|
"from \"t/#\" "
|
|
"where s.page = '2' ",
|
|
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>,
|
|
?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => Payload,
|
|
topic => <<"t/a">>}})),
|
|
Sql2 = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"incase is_not_null(info.cmd) "
|
|
"from \"t/#\" "
|
|
"where s.page = '3' ",
|
|
?assertMatch({error, nomatch},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context =>
|
|
#{payload => Payload,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_foreach_8(_Config) ->
|
|
%% Verify foreach-do-incase and cascaded AS
|
|
Sql = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, c.info as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"incase is_map(info) "
|
|
"from \"t/#\" "
|
|
"where s.page = '2' ",
|
|
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }">>,
|
|
?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context =>
|
|
#{payload => Payload,
|
|
topic => <<"t/a">>}})),
|
|
|
|
Sql3 = "foreach json_decode(payload) as p, p.sensors as s, s.collection as c, sublist(2,1,c.info) as info "
|
|
"do info.cmd as msg_type, info.name as name "
|
|
"from \"t/#\" "
|
|
"where s.page = '2' ",
|
|
[?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => SqlN,
|
|
context =>
|
|
#{payload => Payload,
|
|
topic => <<"t/a">>}}))
|
|
|| SqlN <- [Sql3]].
|
|
|
|
t_sqlparse_case_when_1(_Config) ->
|
|
%% case-when-else clause
|
|
Sql = "select "
|
|
" case when payload.x < 0 then 0 "
|
|
" when payload.x > 7 then 7 "
|
|
" else payload.x "
|
|
" end as y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"y">> := 1}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 1}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 0}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 0}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 0}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": -1}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 7}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 7}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 7}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 8}">>,
|
|
topic => <<"t/a">>}})),
|
|
ok.
|
|
|
|
t_sqlparse_case_when_2(_Config) ->
|
|
% switch clause
|
|
Sql = "select "
|
|
" case payload.x when 1 then 2 "
|
|
" when 2 then 3 "
|
|
" else 4 "
|
|
" end as y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"y">> := 2}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 1}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 3}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 2}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 4}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 4}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 4}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 7}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 4}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 8}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_case_when_3(_Config) ->
|
|
%% case-when clause
|
|
Sql = "select "
|
|
" case when payload.x < 0 then 0 "
|
|
" when payload.x > 7 then 7 "
|
|
" end as y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 1}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 5}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 0}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 0}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": -1}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 7}">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{<<"y">> := 7}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 8}">>,
|
|
topic => <<"t/a">>}})),
|
|
ok.
|
|
|
|
t_sqlparse_array_index_1(_Config) ->
|
|
%% index get
|
|
Sql = "select "
|
|
" json_decode(payload) as p, "
|
|
" p[1] as a "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"a">> := #{<<"x">> := 1}}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"[{\"x\": 1}]">>,
|
|
topic => <<"t/a">>}})),
|
|
?assertMatch({ok, #{}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{payload => <<"{\"x\": 1}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% index get without 'as'
|
|
Sql2 = "select "
|
|
" payload.x[2] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [3]}}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context => #{payload => #{<<"x">> => [1,3,4]},
|
|
topic => <<"t/a">>}})),
|
|
%% index get without 'as' again
|
|
Sql3 = "select "
|
|
" payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 3}]}}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql3,
|
|
context => #{payload => #{<<"x">> => [1,#{y => 3},4]},
|
|
topic => <<"t/a">>}})),
|
|
|
|
%% index get with 'as'
|
|
Sql4 = "select "
|
|
" payload.x[2].y as b "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := 3}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql4,
|
|
context => #{payload => #{<<"x">> => [1,#{y => 3},4]},
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_index_2(_Config) ->
|
|
%% array get with negative index
|
|
Sql1 = "select "
|
|
" payload.x[-2].y as b "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := 3}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql1,
|
|
context => #{payload => #{<<"x">> => [1,#{y => 3},4]},
|
|
topic => <<"t/a">>}})),
|
|
%% array append to head or tail of a list:
|
|
Sql2 = "select "
|
|
" payload.x as b, "
|
|
" 1 as c[-0], "
|
|
" 2 as c[-0], "
|
|
" b as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := 0, <<"c">> := [0,1,2]}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context => #{payload => #{<<"x">> => 0},
|
|
topic => <<"t/a">>}})),
|
|
%% construct an empty list:
|
|
Sql3 = "select "
|
|
" [] as c, "
|
|
" 1 as c[-0], "
|
|
" 2 as c[-0], "
|
|
" 0 as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,1,2]}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql3,
|
|
context => #{payload => <<"">>,
|
|
topic => <<"t/a">>}})),
|
|
%% construct a list:
|
|
Sql4 = "select "
|
|
" [payload.a, \"topic\", 'c'] as c, "
|
|
" 1 as c[-0], "
|
|
" 2 as c[-0], "
|
|
" 0 as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,11,<<"t/a">>,<<"c">>,1,2]}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql4,
|
|
context => #{payload => <<"{\"a\":11}">>,
|
|
topic => <<"t/a">>
|
|
}})).
|
|
|
|
t_sqlparse_array_index_3(_Config) ->
|
|
%% array with json string payload:
|
|
Sql0 = "select "
|
|
"payload,"
|
|
"payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := [1,2]}, 3]}}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql0,
|
|
context => #{payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% same as above but don't select payload:
|
|
Sql1 = "select "
|
|
"payload.x[2].y as b "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := [1,2]}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql1,
|
|
context => #{payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% same as above but add 'as' clause:
|
|
Sql2 = "select "
|
|
"payload.x[2].y as b.c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"b">> := #{<<"c">> := [1,2]}}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql2,
|
|
context => #{payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_index_4(_Config) ->
|
|
%% array with json string payload:
|
|
Sql0 = "select "
|
|
"0 as payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [#{<<"y">> := 0}]}}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql0,
|
|
context => #{payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% array with json string payload, and also select payload.x:
|
|
Sql1 = "select "
|
|
"payload.x, "
|
|
"0 as payload.x[2].y "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"x">> := [1, #{<<"y">> := 0}, 3]}}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql1,
|
|
context => #{payload => <<"{\"x\": [1,{\"y\": [1,2]},3]}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_index_5(_Config) ->
|
|
Sql00 = "select "
|
|
" [1,2,3,4] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql00,
|
|
context => #{payload => <<"">>,
|
|
topic => <<"t/a">>}}),
|
|
?assert(lists:any(fun({_K, V}) ->
|
|
V =:= [1,2,3,4]
|
|
end, maps:to_list(Res00))).
|
|
|
|
t_sqlparse_select_matadata_1(_Config) ->
|
|
%% array with json string payload:
|
|
Sql0 = "select "
|
|
"payload "
|
|
"from \"t/#\" ",
|
|
?assertNotMatch({ok, #{<<"payload">> := <<"abc">>, metadata := _}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql0,
|
|
context => #{payload => <<"abc">>,
|
|
topic => <<"t/a">>}})),
|
|
Sql1 = "select "
|
|
"payload, metadata "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := <<"abc">>, <<"metadata">> := _}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql1,
|
|
context => #{payload => <<"abc">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_range_1(_Config) ->
|
|
%% get a range of list
|
|
Sql0 = "select "
|
|
" payload.a[1..4] as c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,1,2,3]}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql0,
|
|
context => #{payload => <<"{\"a\":[0,1,2,3,4,5]}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% get a range from non-list data
|
|
Sql02 = "select "
|
|
" payload.a[1..4] as c "
|
|
"from \"t/#\" ",
|
|
?assertThrow({select_and_transform_error, {error,{range_get,non_list_data},_}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql02,
|
|
context =>
|
|
#{payload => <<"{\"x\":[0,1,2,3,4,5]}">>,
|
|
topic => <<"t/a">>}})),
|
|
%% construct a range:
|
|
Sql1 = "select "
|
|
" [1..4] as c, "
|
|
" 5 as c[-0], "
|
|
" 6 as c[-0], "
|
|
" 0 as c[0] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"c">> := [0,1,2,3,4,5,6]}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql1,
|
|
context => #{payload => <<"">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_array_range_2(_Config) ->
|
|
%% construct a range without 'as'
|
|
Sql00 = "select "
|
|
" [1..4] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql00,
|
|
context => #{payload => <<"">>,
|
|
topic => <<"t/a">>}}),
|
|
?assert(lists:any(fun({_K, V}) ->
|
|
V =:= [1,2,3,4]
|
|
end, maps:to_list(Res00))),
|
|
%% construct a range without 'as'
|
|
Sql01 = "select "
|
|
" a[2..4] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"a">> := [2,3,4]}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql01,
|
|
context => #{<<"a">> => [1,2,3,4,5],
|
|
topic => <<"t/a">>}})),
|
|
%% get a range of list without 'as'
|
|
Sql02 = "select "
|
|
" payload.a[1..4] "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok, #{<<"payload">> := #{<<"a">> := [0,1,2,3]}}}, emqx_rule_sqltester:test(
|
|
#{sql => Sql02,
|
|
context => #{payload => <<"{\"a\":[0,1,2,3,4,5]}">>,
|
|
topic => <<"t/a">>}})).
|
|
|
|
t_sqlparse_true_false(_Config) ->
|
|
%% construct a range without 'as'
|
|
Sql00 = "select "
|
|
" true as a, false as b, "
|
|
" false as x.y, true as c[-0] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql00,
|
|
context => #{payload => <<"">>,
|
|
topic => <<"t/a">>}}),
|
|
?assertMatch(#{<<"a">> := true, <<"b">> := false,
|
|
<<"x">> := #{<<"y">> := false},
|
|
<<"c">> := [true]
|
|
}, Res00).
|
|
|
|
t_sqlparse_new_map(_Config) ->
|
|
%% construct a range without 'as'
|
|
Sql00 = "select "
|
|
" map_new() as a, map_new() as b, "
|
|
" map_new() as x.y, map_new() as c[-0] "
|
|
"from \"t/#\" ",
|
|
{ok, Res00} =
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql00,
|
|
context => #{payload => <<"">>,
|
|
topic => <<"t/a">>}}),
|
|
?assertMatch(#{<<"a">> := #{}, <<"b">> := #{},
|
|
<<"x">> := #{<<"y">> := #{}},
|
|
<<"c">> := [#{}]
|
|
}, Res00).
|
|
|
|
t_sqlparse_payload_as(_Config) ->
|
|
%% https://github.com/emqx/emqx/issues/3866
|
|
Sql00 = "SELECT "
|
|
" payload, map_get('engineWorkTime', payload.params, -1) as payload.params.engineWorkTime, "
|
|
" map_get('hydOilTem', payload.params, -1) as payload.params.hydOilTem "
|
|
"FROM \"t/#\" ",
|
|
Payload1 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42, \"hydOilTem\": 30 } }">>,
|
|
{ok, Res01} = emqx_rule_sqltester:test(
|
|
#{sql => Sql00,
|
|
context => #{payload => Payload1,
|
|
topic => <<"t/a">>}}),
|
|
?assertMatch(#{
|
|
<<"payload">> := #{
|
|
<<"params">> := #{
|
|
<<"convertTemp">> := 20,
|
|
<<"engineSpeed">> := 42,
|
|
<<"engineWorkTime">> := -1,
|
|
<<"hydOilTem">> := 30
|
|
}
|
|
}
|
|
}, Res01),
|
|
|
|
Payload2 = <<"{ \"msgId\": 1002, \"params\": { \"convertTemp\": 20, \"engineSpeed\": 42 } }">>,
|
|
{ok, Res02} = emqx_rule_sqltester:test(
|
|
#{sql => Sql00,
|
|
context => #{payload => Payload2,
|
|
topic => <<"t/a">>}}),
|
|
?assertMatch(#{
|
|
<<"payload">> := #{
|
|
<<"params">> := #{
|
|
<<"convertTemp">> := 20,
|
|
<<"engineSpeed">> := 42,
|
|
<<"engineWorkTime">> := -1,
|
|
<<"hydOilTem">> := -1
|
|
}
|
|
}
|
|
}, Res02).
|
|
|
|
t_sqlparse_nested_get(_Config) ->
|
|
Sql = "select payload as p, p.a.b as c "
|
|
"from \"t/#\" ",
|
|
?assertMatch({ok,#{<<"c">> := 0}},
|
|
emqx_rule_sqltester:test(
|
|
#{sql => Sql,
|
|
context => #{
|
|
topic => <<"t/1">>,
|
|
payload => <<"{\"a\": {\"b\": 0}}">>
|
|
}})).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal helpers
|
|
%%------------------------------------------------------------------------------
|
|
|
|
republish_output(Topic) ->
|
|
republish_output(Topic, <<"${payload}">>).
|
|
republish_output(Topic, Payload) ->
|
|
#{function => republish,
|
|
args => #{payload => Payload, topic => Topic, qos => 0, retain => false}}.
|
|
|
|
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
|
|
SQL = <<"select * from \"simple/topic\"">>,
|
|
Topics = [<<"simple/topic">>],
|
|
make_simple_rule(RuleId, SQL, Topics, Ts).
|
|
|
|
make_simple_rule(RuleId) when is_binary(RuleId) ->
|
|
SQL = <<"select * from \"simple/topic\"">>,
|
|
Topics = [<<"simple/topic">>],
|
|
make_simple_rule(RuleId, SQL, Topics).
|
|
|
|
make_simple_rule(RuleId, SQL, Topics) when is_binary(RuleId) ->
|
|
make_simple_rule(RuleId, SQL, Topics, erlang:system_time(millisecond)).
|
|
|
|
make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
|
|
#{
|
|
id => RuleId,
|
|
sql => SQL,
|
|
from => Topics,
|
|
fields => [<<"*">>],
|
|
is_foreach => false,
|
|
conditions => {},
|
|
outputs => [#{mod => emqx_rule_outputs, func => console, args => #{}}],
|
|
description => <<"simple rule">>,
|
|
created_at => Ts
|
|
}.
|
|
|
|
output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->
|
|
ct:pal("applying output_record_triggered_events: ~p", [Data]),
|
|
ets:insert(events_record_tab, {EventName, Data}).
|
|
|
|
verify_event(EventName) ->
|
|
ct:sleep(50),
|
|
case ets:lookup(events_record_tab, EventName) of
|
|
[] ->
|
|
ct:fail({no_such_event, EventName, ets:tab2list(events_record_tab)});
|
|
Records ->
|
|
[begin
|
|
%% verify fields can be formatted to JSON string
|
|
_ = emqx_json:encode(Fields),
|
|
%% verify metadata fields
|
|
verify_metadata_fields(EventName, Fields),
|
|
%% verify available fields for each event name
|
|
verify_event_fields(EventName, Fields)
|
|
end || {_Name, Fields} <- Records]
|
|
end.
|
|
|
|
verify_metadata_fields(_EventName, #{metadata := Metadata}) ->
|
|
?assertMatch(
|
|
#{rule_id := <<"rule:t_events">>},
|
|
Metadata).
|
|
|
|
verify_event_fields('message.publish', Fields) ->
|
|
#{id := ID,
|
|
clientid := ClientId,
|
|
username := Username,
|
|
payload := Payload,
|
|
peerhost := PeerHost,
|
|
topic := Topic,
|
|
qos := QoS,
|
|
flags := Flags,
|
|
headers := Headers,
|
|
pub_props := Properties,
|
|
timestamp := Timestamp,
|
|
publish_received_at := EventAt
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
RcvdAtElapse = Now - EventAt,
|
|
?assert(is_binary(ID)),
|
|
?assertEqual(<<"c_event">>, ClientId),
|
|
?assertEqual(<<"u_event">>, Username),
|
|
?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
|
|
verify_ipaddr(PeerHost),
|
|
?assertEqual(<<"t1">>, Topic),
|
|
?assertEqual(1, QoS),
|
|
?assert(is_map(Flags)),
|
|
?assert(is_map(Headers)),
|
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
?assert(EventAt =< Timestamp);
|
|
|
|
verify_event_fields('client.connected', Fields) ->
|
|
#{clientid := ClientId,
|
|
username := Username,
|
|
mountpoint := MountPoint,
|
|
peername := PeerName,
|
|
sockname := SockName,
|
|
proto_name := ProtoName,
|
|
proto_ver := ProtoVer,
|
|
keepalive := Keepalive,
|
|
clean_start := CleanStart,
|
|
expiry_interval := ExpiryInterval,
|
|
is_bridge := IsBridge,
|
|
conn_props := Properties,
|
|
timestamp := Timestamp,
|
|
connected_at := EventAt
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
RcvdAtElapse = Now - EventAt,
|
|
?assert(is_binary(MountPoint) orelse MountPoint == undefined),
|
|
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
|
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
|
verify_peername(PeerName),
|
|
verify_peername(SockName),
|
|
?assertEqual(<<"MQTT">>, ProtoName),
|
|
?assertEqual(5, ProtoVer),
|
|
?assert(is_integer(Keepalive)),
|
|
?assert(is_boolean(CleanStart)),
|
|
?assertEqual(60, ExpiryInterval),
|
|
?assertEqual(false, IsBridge),
|
|
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
?assert(EventAt =< Timestamp);
|
|
|
|
verify_event_fields('client.disconnected', Fields) ->
|
|
#{reason := Reason,
|
|
clientid := ClientId,
|
|
username := Username,
|
|
peername := PeerName,
|
|
sockname := SockName,
|
|
disconn_props := Properties,
|
|
timestamp := Timestamp,
|
|
disconnected_at := EventAt
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
RcvdAtElapse = Now - EventAt,
|
|
?assert(is_atom(Reason)),
|
|
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
|
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
|
verify_peername(PeerName),
|
|
verify_peername(SockName),
|
|
?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
?assert(EventAt =< Timestamp);
|
|
|
|
verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed'
|
|
; SubUnsub == 'session.unsubscribed' ->
|
|
#{clientid := ClientId,
|
|
username := Username,
|
|
peerhost := PeerHost,
|
|
topic := Topic,
|
|
qos := QoS,
|
|
timestamp := Timestamp
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
?assert(is_atom(reason)),
|
|
?assertEqual(<<"c_event2">>, ClientId),
|
|
?assertEqual(<<"u_event2">>, Username),
|
|
verify_ipaddr(PeerHost),
|
|
?assertEqual(<<"t1">>, Topic),
|
|
?assertEqual(1, QoS),
|
|
PropKey =
|
|
case SubUnsub of
|
|
'session.subscribed' -> sub_props;
|
|
'session.unsubscribed' -> unsub_props
|
|
end,
|
|
?assertMatch(#{'User-Property' := #{<<"topic_name">> := <<"t1">>}},
|
|
maps:get(PropKey, Fields)),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000);
|
|
|
|
verify_event_fields('message.dropped', Fields) ->
|
|
#{id := ID,
|
|
reason := Reason,
|
|
clientid := ClientId,
|
|
username := Username,
|
|
payload := Payload,
|
|
peerhost := PeerHost,
|
|
topic := Topic,
|
|
qos := QoS,
|
|
flags := Flags,
|
|
pub_props := Properties,
|
|
timestamp := Timestamp,
|
|
publish_received_at := EventAt
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
RcvdAtElapse = Now - EventAt,
|
|
?assert(is_binary(ID)),
|
|
?assert(is_atom(Reason)),
|
|
?assertEqual(<<"c_event">>, ClientId),
|
|
?assertEqual(<<"u_event">>, Username),
|
|
?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
|
|
verify_ipaddr(PeerHost),
|
|
?assertEqual(<<"t1">>, Topic),
|
|
?assertEqual(1, QoS),
|
|
?assert(is_map(Flags)),
|
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
?assert(EventAt =< Timestamp);
|
|
|
|
verify_event_fields('message.delivered', Fields) ->
|
|
#{id := ID,
|
|
clientid := ClientId,
|
|
username := Username,
|
|
from_clientid := FromClientId,
|
|
from_username := FromUsername,
|
|
payload := Payload,
|
|
peerhost := PeerHost,
|
|
topic := Topic,
|
|
qos := QoS,
|
|
flags := Flags,
|
|
pub_props := Properties,
|
|
timestamp := Timestamp,
|
|
publish_received_at := EventAt
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
RcvdAtElapse = Now - EventAt,
|
|
?assert(is_binary(ID)),
|
|
?assertEqual(<<"c_event2">>, ClientId),
|
|
?assertEqual(<<"u_event2">>, Username),
|
|
?assertEqual(<<"c_event">>, FromClientId),
|
|
?assertEqual(<<"u_event">>, FromUsername),
|
|
?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
|
|
verify_ipaddr(PeerHost),
|
|
?assertEqual(<<"t1">>, Topic),
|
|
?assertEqual(1, QoS),
|
|
?assert(is_map(Flags)),
|
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
?assert(EventAt =< Timestamp);
|
|
|
|
verify_event_fields('message.acked', Fields) ->
|
|
#{id := ID,
|
|
clientid := ClientId,
|
|
username := Username,
|
|
from_clientid := FromClientId,
|
|
from_username := FromUsername,
|
|
payload := Payload,
|
|
peerhost := PeerHost,
|
|
topic := Topic,
|
|
qos := QoS,
|
|
flags := Flags,
|
|
pub_props := PubProps,
|
|
puback_props := PubAckProps,
|
|
timestamp := Timestamp,
|
|
publish_received_at := EventAt
|
|
} = Fields,
|
|
Now = erlang:system_time(millisecond),
|
|
TimestampElapse = Now - Timestamp,
|
|
RcvdAtElapse = Now - EventAt,
|
|
?assert(is_binary(ID)),
|
|
?assertEqual(<<"c_event2">>, ClientId),
|
|
?assertEqual(<<"u_event2">>, Username),
|
|
?assertEqual(<<"c_event">>, FromClientId),
|
|
?assertEqual(<<"u_event">>, FromUsername),
|
|
?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload),
|
|
verify_ipaddr(PeerHost),
|
|
?assertEqual(<<"t1">>, Topic),
|
|
?assertEqual(1, QoS),
|
|
?assert(is_map(Flags)),
|
|
?assertMatch(#{'Message-Expiry-Interval' := 60}, PubProps),
|
|
?assert(is_map(PubAckProps)),
|
|
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
|
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
|
?assert(EventAt =< Timestamp).
|
|
|
|
verify_peername(PeerName) ->
|
|
case string:split(PeerName, ":") of
|
|
[IPAddrS, PortS] ->
|
|
verify_ipaddr(IPAddrS),
|
|
_ = binary_to_integer(PortS);
|
|
_ -> ct:fail({invalid_peername, PeerName})
|
|
end.
|
|
|
|
verify_ipaddr(IPAddrS) ->
|
|
?assertMatch({ok, _}, inet:parse_address(binary_to_list(IPAddrS))).
|
|
|
|
init_events_counters() ->
|
|
ets:new(events_record_tab, [named_table, bag, public]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Start Apps
|
|
%%------------------------------------------------------------------------------
|
|
deps_path(App, RelativePath) ->
|
|
Path0 = code:lib_dir(App),
|
|
Path = case file:read_link(Path0) of
|
|
{ok, Resolved} -> Resolved;
|
|
{error, _} -> Path0
|
|
end,
|
|
filename:join([Path, RelativePath]).
|
|
|
|
local_path(RelativePath) ->
|
|
deps_path(emqx_rule_engine, RelativePath).
|
|
|
|
insert_rules(Rules) ->
|
|
lists:foreach(fun(Rule) ->
|
|
ok = emqx_rule_engine:insert_rule(Rule)
|
|
end, Rules).
|
|
|
|
delete_rules_by_ids(Ids) ->
|
|
lists:foreach(fun(Id) ->
|
|
ok = emqx_rule_engine:delete_rule(Id)
|
|
end, Ids).
|
|
|
|
delete_rule(#{id := Id}) ->
|
|
ok = emqx_rule_engine:delete_rule(Id);
|
|
delete_rule(Id) when is_binary(Id) ->
|
|
ok = emqx_rule_engine:delete_rule(Id).
|