test(rules): add a few more tests to assert our rule evaluation behavior
This commit is contained in:
parent
4034bcbb26
commit
f9452241bd
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
|
@ -583,6 +584,122 @@ t_ensure_action_removed(_) ->
|
||||||
%% Test cases for rule runtime
|
%% Test cases for rule runtime
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_json_payload_decoding(_Config) ->
|
||||||
|
{ok, C} = emqtt:start_link(),
|
||||||
|
on_exit(fun() -> emqtt:stop(C) end),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
|
||||||
|
Cases =
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
select_fields =>
|
||||||
|
<<"payload.measurement, payload.data_type, payload.value, payload.device_id">>,
|
||||||
|
payload => emqx_utils_json:encode(#{
|
||||||
|
measurement => <<"temp">>,
|
||||||
|
data_type => <<"FLOAT">>,
|
||||||
|
value => <<"32.12">>,
|
||||||
|
device_id => <<"devid">>
|
||||||
|
}),
|
||||||
|
expected => #{
|
||||||
|
payload => #{
|
||||||
|
<<"measurement">> => <<"temp">>,
|
||||||
|
<<"data_type">> => <<"FLOAT">>,
|
||||||
|
<<"value">> => <<"32.12">>,
|
||||||
|
<<"device_id">> => <<"devid">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
%% "last write wins" examples
|
||||||
|
#{
|
||||||
|
select_fields => <<"payload as p, payload.f as p.answer">>,
|
||||||
|
payload => emqx_utils_json:encode(#{f => 42, keep => <<"that?">>}),
|
||||||
|
expected => #{
|
||||||
|
<<"p">> => #{
|
||||||
|
<<"answer">> => 42
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
select_fields => <<"payload as p, payload.f as p.jsonlike.f">>,
|
||||||
|
payload => emqx_utils_json:encode(#{
|
||||||
|
jsonlike => emqx_utils_json:encode(#{a => 0}),
|
||||||
|
f => <<"huh">>
|
||||||
|
}),
|
||||||
|
%% behavior from 4.4: jsonlike gets wiped without preserving old "keys"
|
||||||
|
%% here we overwrite it since we don't explicitly decode it
|
||||||
|
expected => #{
|
||||||
|
<<"p">> => #{
|
||||||
|
<<"jsonlike">> => #{<<"f">> => <<"huh">>}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
select_fields =>
|
||||||
|
<<"payload as p, 42 as p, payload.measurement as p.measurement, 51 as p">>,
|
||||||
|
payload => emqx_utils_json:encode(#{
|
||||||
|
measurement => <<"temp">>,
|
||||||
|
data_type => <<"FLOAT">>,
|
||||||
|
value => <<"32.12">>,
|
||||||
|
device_id => <<"devid">>
|
||||||
|
}),
|
||||||
|
expected => #{
|
||||||
|
<<"p">> => 51
|
||||||
|
}
|
||||||
|
},
|
||||||
|
%% if selected field is already structured, new values are inserted into it
|
||||||
|
#{
|
||||||
|
select_fields =>
|
||||||
|
<<"json_decode(payload) as p, payload.a as p.z">>,
|
||||||
|
payload => emqx_utils_json:encode(#{
|
||||||
|
a => 1,
|
||||||
|
b => <<"2">>
|
||||||
|
}),
|
||||||
|
expected => #{
|
||||||
|
<<"p">> => #{
|
||||||
|
<<"a">> => 1,
|
||||||
|
<<"b">> => <<"2">>,
|
||||||
|
<<"z">> => 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
|
||||||
|
Topic = <<"some/topic">>,
|
||||||
|
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
on_exit(fun() -> snabbkaffe:stop() end),
|
||||||
|
on_exit(fun() -> delete_rule(?TMP_RULEID) end),
|
||||||
|
lists:foreach(
|
||||||
|
fun(#{select_fields := Fs, payload := P, expected := E} = Case) ->
|
||||||
|
ct:pal("testing case ~p", [Case]),
|
||||||
|
SQL = <<"select ", Fs/binary, " from \"", Topic/binary, "\"">>,
|
||||||
|
delete_rule(?TMP_RULEID),
|
||||||
|
{ok, _Rule} = emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
sql => SQL,
|
||||||
|
id => ?TMP_RULEID,
|
||||||
|
actions => [#{function => ActionFn}]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
{_, {ok, Event}} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqtt:publish(C, Topic, P, 0),
|
||||||
|
#{?snk_kind := action_response},
|
||||||
|
5_000
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{selected := E},
|
||||||
|
Event,
|
||||||
|
#{payload => P, fields => Fs, expected => E}
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
Cases
|
||||||
|
),
|
||||||
|
snabbkaffe:stop(),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
t_events(_Config) ->
|
t_events(_Config) ->
|
||||||
{ok, Client} = emqtt:start_link(
|
{ok, Client} = emqtt:start_link(
|
||||||
[
|
[
|
||||||
|
@ -3065,6 +3182,14 @@ republish_action(Topic, Payload, UserProperties) ->
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
action_response(Selected, Envs, Args) ->
|
||||||
|
?tp(action_response, #{
|
||||||
|
selected => Selected,
|
||||||
|
envs => Envs,
|
||||||
|
args => Args
|
||||||
|
}),
|
||||||
|
ok.
|
||||||
|
|
||||||
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
|
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
|
||||||
SQL = <<"select * from \"simple/topic\"">>,
|
SQL = <<"select * from \"simple/topic\"">>,
|
||||||
make_simple_rule(RuleId, SQL, Ts).
|
make_simple_rule(RuleId, SQL, Ts).
|
||||||
|
|
|
@ -72,15 +72,10 @@ t_nested_put_map(_) ->
|
||||||
#{k => #{<<"t">> => #{<<"a">> => v1}}},
|
#{k => #{<<"t">> => #{<<"a">> => v1}}},
|
||||||
nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}})
|
nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}})
|
||||||
),
|
),
|
||||||
%% since we currently support passing a binary-encoded json as input...
|
|
||||||
%% will always decode as json, even if non-intentional...
|
|
||||||
|
|
||||||
%% note: since we handle json-encoded binaries when evaluating the
|
%% note: since we handle json-encoded binaries when evaluating the
|
||||||
%% rule rather than baking the decoding in `nested_put`, we test
|
%% rule rather than baking the decoding in `nested_put`, we test
|
||||||
%% this corner case that _would_ otherwise lose data to
|
%% this corner case that _would_ otherwise lose data to
|
||||||
%% demonstrate this behavior.
|
%% demonstrate this behavior.
|
||||||
|
|
||||||
%% loses `b' !
|
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
#{payload => #{<<"a">> => v1}},
|
#{payload => #{<<"a">> => v1}},
|
||||||
nested_put(
|
nested_put(
|
||||||
|
@ -89,17 +84,21 @@ t_nested_put_map(_) ->
|
||||||
#{payload => emqx_utils_json:encode(#{b => <<"v2">>})}
|
#{payload => emqx_utils_json:encode(#{b => <<"v2">>})}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
%% We have an asymmetry in the behavior here because `nested_put'
|
||||||
|
%% currently, at each key, will use `general_find' to get the
|
||||||
|
%% current value of the eky, and that attempts JSON decoding the
|
||||||
|
%% such value... So, the cases below, `old' gets preserved
|
||||||
|
%% because it's in this direct path.
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
#{payload => #{<<"a">> => #{<<"old">> => <<"v2">>, <<"new">> => v1}}},
|
#{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"v2">>}}},
|
||||||
nested_put(
|
nested_put(
|
||||||
?path([payload, <<"a">>, <<"new">>]),
|
?path([payload, <<"a">>, <<"new">>]),
|
||||||
v1,
|
v1,
|
||||||
#{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})}
|
#{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})}
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
%% loses `b' !
|
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
#{payload => #{<<"a">> => #{<<"old">> => <<"{}">>, <<"new">> => v1}}},
|
#{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"{}">>}}},
|
||||||
nested_put(
|
nested_put(
|
||||||
?path([payload, <<"a">>, <<"new">>]),
|
?path([payload, <<"a">>, <<"new">>]),
|
||||||
v1,
|
v1,
|
||||||
|
|
Loading…
Reference in New Issue