Merge pull request #11260 from thalesmg/fix-rule-maps-nested-put-r51

fix(rule_maps): avoid losing data when using `emqx_rule_maps:nested_put`
This commit is contained in:
Thales Macedo Garitezi 2023-07-14 15:02:24 -03:00 committed by GitHub
commit ab5fd1e5c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 235 additions and 14 deletions

View File

@ -10,6 +10,8 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
%% ct setup helpers
init_per_suite(Config, Apps) ->
@ -211,19 +213,27 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Res.
create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
BridgeName = ?config(bridge_name, Config),
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
Params = #{
enable => true,
sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>,
sql => SQL,
actions => [BridgeId]
},
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
ct:pal("rule action params: ~p", [Params]),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
Error -> Error
{ok, Res0} ->
Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
{ok, Res};
Error ->
Error
end.
%%------------------------------------------------------------------------------

View File

@ -379,6 +379,41 @@ t_sync_device_id_missing(Config) ->
iotdb_bridge_on_query
).
t_extract_device_id_from_rule_engine_message(Config) ->
BridgeType = ?config(bridge_type, Config),
RuleTopic = <<"t/iotdb">>,
DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"),
Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
?check_trace(
begin
{ok, _} = emqx_bridge_testlib:create_bridge(Config),
SQL = <<
"SELECT\n"
" payload.measurement, payload.data_type, payload.value, payload.device_id\n"
"FROM\n"
" \"",
RuleTopic/binary,
"\""
>>,
Opts = #{sql => SQL},
{ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
BridgeType, RuleTopic, Config, Opts
),
emqx:publish(Message),
?block_until(handle_async_reply, 5_000),
ok
end,
fun(Trace) ->
?assertMatch(
[#{action := ack, result := {ok, 200, _, _}}],
?of_kind(handle_async_reply, Trace)
),
ok
end
),
ok.
t_sync_invalid_data(Config) ->
emqx_bridge_testlib:t_sync_query(
Config,

View File

@ -23,7 +23,6 @@
-export([
apply_rule/3,
apply_rules/3,
clear_rule_payload/0,
inc_action_metrics/2
]).
@ -196,18 +195,18 @@ select_and_transform([], _Columns, Action) ->
select_and_transform(['*' | More], Columns, Action) ->
select_and_transform(More, Columns, maps:merge(Action, Columns));
select_and_transform([{as, Field, Alias} | More], Columns, Action) ->
Val = eval(Field, Columns),
Val = eval(Field, [Action, Columns]),
select_and_transform(
More,
nested_put(Alias, Val, Columns),
Columns,
nested_put(Alias, Val, Action)
);
select_and_transform([Field | More], Columns, Action) ->
Val = eval(Field, Columns),
Val = eval(Field, [Action, Columns]),
Key = alias(Field, Columns),
select_and_transform(
More,
nested_put(Key, Val, Columns),
Columns,
nested_put(Key, Val, Action)
).
@ -217,25 +216,25 @@ select_and_collect(Fields, Columns) ->
select_and_collect(Fields, Columns, {#{}, {'item', []}}).
select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) ->
Val = eval(Field, Columns),
Val = eval(Field, [Action, Columns]),
{nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) ->
Val = eval(Field, Columns),
Val = eval(Field, [Action, Columns]),
select_and_collect(
More,
nested_put(Alias, Val, Columns),
{nested_put(Alias, Val, Action), LastKV}
);
select_and_collect([Field], Columns, {Action, _}) ->
Val = eval(Field, Columns),
Val = eval(Field, [Action, Columns]),
Key = alias(Field, Columns),
{nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
select_and_collect([Field | More], Columns, {Action, LastKV}) ->
Val = eval(Field, Columns),
Val = eval(Field, [Action, Columns]),
Key = alias(Field, Columns),
select_and_collect(
More,
nested_put(Key, Val, Columns),
Columns,
{nested_put(Key, Val, Action), LastKV}
).
@ -368,6 +367,16 @@ do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, En
inc_action_metrics(RuleId, Result),
Result.
eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) ->
case Context of
[Columns] ->
eval(Exp, Columns);
[Columns | Rest] ->
case eval(Exp, Columns) of
undefined -> eval(Exp, Rest);
Val -> Val
end
end;
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload));
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl").
@ -583,6 +584,122 @@ t_ensure_action_removed(_) ->
%% Test cases for rule runtime
%%------------------------------------------------------------------------------
t_json_payload_decoding(_Config) ->
{ok, C} = emqtt:start_link(),
on_exit(fun() -> emqtt:stop(C) end),
{ok, _} = emqtt:connect(C),
Cases =
[
#{
select_fields =>
<<"payload.measurement, payload.data_type, payload.value, payload.device_id">>,
payload => emqx_utils_json:encode(#{
measurement => <<"temp">>,
data_type => <<"FLOAT">>,
value => <<"32.12">>,
device_id => <<"devid">>
}),
expected => #{
payload => #{
<<"measurement">> => <<"temp">>,
<<"data_type">> => <<"FLOAT">>,
<<"value">> => <<"32.12">>,
<<"device_id">> => <<"devid">>
}
}
},
%% "last write wins" examples
#{
select_fields => <<"payload as p, payload.f as p.answer">>,
payload => emqx_utils_json:encode(#{f => 42, keep => <<"that?">>}),
expected => #{
<<"p">> => #{
<<"answer">> => 42
}
}
},
#{
select_fields => <<"payload as p, payload.f as p.jsonlike.f">>,
payload => emqx_utils_json:encode(#{
jsonlike => emqx_utils_json:encode(#{a => 0}),
f => <<"huh">>
}),
%% behavior from 4.4: jsonlike gets wiped without preserving old "keys"
%% here we overwrite it since we don't explicitly decode it
expected => #{
<<"p">> => #{
<<"jsonlike">> => #{<<"f">> => <<"huh">>}
}
}
},
#{
select_fields =>
<<"payload as p, 42 as p, payload.measurement as p.measurement, 51 as p">>,
payload => emqx_utils_json:encode(#{
measurement => <<"temp">>,
data_type => <<"FLOAT">>,
value => <<"32.12">>,
device_id => <<"devid">>
}),
expected => #{
<<"p">> => 51
}
},
%% if selected field is already structured, new values are inserted into it
#{
select_fields =>
<<"json_decode(payload) as p, payload.a as p.z">>,
payload => emqx_utils_json:encode(#{
a => 1,
b => <<"2">>
}),
expected => #{
<<"p">> => #{
<<"a">> => 1,
<<"b">> => <<"2">>,
<<"z">> => 1
}
}
}
],
ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
Topic = <<"some/topic">>,
ok = snabbkaffe:start_trace(),
on_exit(fun() -> snabbkaffe:stop() end),
on_exit(fun() -> delete_rule(?TMP_RULEID) end),
lists:foreach(
fun(#{select_fields := Fs, payload := P, expected := E} = Case) ->
ct:pal("testing case ~p", [Case]),
SQL = <<"select ", Fs/binary, " from \"", Topic/binary, "\"">>,
delete_rule(?TMP_RULEID),
{ok, _Rule} = emqx_rule_engine:create_rule(
#{
sql => SQL,
id => ?TMP_RULEID,
actions => [#{function => ActionFn}]
}
),
{_, {ok, Event}} =
?wait_async_action(
emqtt:publish(C, Topic, P, 0),
#{?snk_kind := action_response},
5_000
),
?assertMatch(
#{selected := E},
Event,
#{payload => P, fields => Fs, expected => E}
),
ok
end,
Cases
),
snabbkaffe:stop(),
ok.
t_events(_Config) ->
{ok, Client} = emqtt:start_link(
[
@ -3065,6 +3182,14 @@ republish_action(Topic, Payload, UserProperties) ->
}
}.
action_response(Selected, Envs, Args) ->
?tp(action_response, #{
selected => Selected,
envs => Envs,
args => Args
}),
ok.
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
SQL = <<"select * from \"simple/topic\"">>,
make_simple_rule(RuleId, SQL, Ts).

View File

@ -71,7 +71,49 @@ t_nested_put_map(_) ->
?assertEqual(
#{k => #{<<"t">> => #{<<"a">> => v1}}},
nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}})
).
),
%% note: since we handle json-encoded binaries when evaluating the
%% rule rather than baking the decoding in `nested_put`, we test
%% this corner case that _would_ otherwise lose data to
%% demonstrate this behavior.
?assertEqual(
#{payload => #{<<"a">> => v1}},
nested_put(
?path([payload, <<"a">>]),
v1,
#{payload => emqx_utils_json:encode(#{b => <<"v2">>})}
)
),
%% We have an asymmetry in the behavior here because `nested_put'
%% currently, at each key, will use `general_find' to get the
%% current value of the eky, and that attempts JSON decoding the
%% such value... So, the cases below, `old' gets preserved
%% because it's in this direct path.
?assertEqual(
#{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"v2">>}}},
nested_put(
?path([payload, <<"a">>, <<"new">>]),
v1,
#{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})}
)
),
?assertEqual(
#{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"{}">>}}},
nested_put(
?path([payload, <<"a">>, <<"new">>]),
v1,
#{payload => emqx_utils_json:encode(#{a => #{old => <<"{}">>}, b => <<"{}">>})}
)
),
?assertEqual(
#{payload => #{<<"a">> => #{<<"new">> => v1}}},
nested_put(
?path([payload, <<"a">>, <<"new">>]),
v1,
#{payload => <<"{}">>}
)
),
ok.
t_nested_put_index(_) ->
?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])),