From bffef386c18f7697055d15ccf8d2146f5cb64ff9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 12 Jul 2023 14:25:15 -0300 Subject: [PATCH 1/3] fix(rule_maps): avoid losing data when using `emqx_rule_maps:nested_put` Fixes https://emqx.atlassian.net/browse/EMQX-10541 --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 16 +++++++-- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 35 +++++++++++++++++++ apps/emqx_rule_engine/src/emqx_rule_maps.erl | 9 +++++ .../test/emqx_rule_maps_SUITE.erl | 20 ++++++++++- 4 files changed, 76 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index bd3de3561..651fd24ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -10,6 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %% ct setup helpers init_per_suite(Config, Apps) -> @@ -211,19 +213,27 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> Res. create_rule_and_action_http(BridgeType, RuleTopic, Config) -> + create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). + +create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> BridgeName = ?config(bridge_name, Config), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>), Params = #{ enable => true, - sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>, + sql => SQL, actions => [BridgeId] }, Path = emqx_mgmt_api_test_util:api_path(["rules"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), ct:pal("rule action params: ~p", [Params]), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; - Error -> Error + {ok, Res0} -> + Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + {ok, Res}; + Error -> + Error end. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index dfd5fd07c..d29e38833 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -379,6 +379,41 @@ t_sync_device_id_missing(Config) -> iotdb_bridge_on_query ). +t_extract_device_id_from_rule_engine_message(Config) -> + BridgeType = ?config(bridge_type, Config), + RuleTopic = <<"t/iotdb">>, + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"), + Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)), + ?check_trace( + begin + {ok, _} = emqx_bridge_testlib:create_bridge(Config), + SQL = << + "SELECT\n" + " payload.measurement, payload.data_type, payload.value, payload.device_id\n" + "FROM\n" + " \"", + RuleTopic/binary, + "\"" + >>, + Opts = #{sql => SQL}, + {ok, _} = emqx_bridge_testlib:create_rule_and_action_http( + BridgeType, RuleTopic, Config, Opts + ), + emqx:publish(Message), + ?block_until(handle_async_reply, 5_000), + ok + end, + fun(Trace) -> + ?assertMatch( + [#{action := ack, result := {ok, 200, _, _}}], + ?of_kind(handle_async_reply, Trace) + ), + ok + end + ), + ok. + t_sync_invalid_data(Config) -> emqx_bridge_testlib:t_sync_query( Config, diff --git a/apps/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl index baf83ff6f..b4dd8fc82 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_maps.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_maps.erl @@ -129,6 +129,15 @@ general_find({index, _}, List, _OrgData, Handler) when not is_list(List) -> do_put({key, Key}, Val, Map, _OrgData) when is_map(Map) -> maps:put(Key, Val, Map); +do_put({key, Key}, Val, Data, _OrgData) when is_binary(Data) -> + case emqx_utils_json:safe_decode(Data, [return_maps]) of + {ok, Map = #{}} -> + %% Avoid losing other keys when the data is an encoded map... + Map#{Key => Val}; + _ -> + %% Fallback to the general case otherwise. + #{Key => Val} + end; do_put({key, Key}, Val, Data, _OrgData) when not is_map(Data) -> #{Key => Val}; do_put({index, {const, Index}}, Val, List, _OrgData) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl index 9fdd60c56..9636072ad 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl @@ -71,7 +71,25 @@ t_nested_put_map(_) -> ?assertEqual( #{k => #{<<"t">> => #{<<"a">> => v1}}}, nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}}) - ). + ), + %% since we currently support passing a binary-encoded json as input... + ?assertEqual( + #{payload => #{<<"a">> => v1, <<"b">> => <<"v2">>}}, + nested_put( + ?path([payload, <<"a">>]), + v1, + #{payload => emqx_utils_json:encode(#{b => <<"v2">>})} + ) + ), + ?assertEqual( + #{payload => #{<<"a">> => #{<<"old">> => <<"v2">>, <<"new">> => v1}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})} + ) + ), + ok. t_nested_put_index(_) -> ?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])), From 4034bcbb265e654571cecdbb119eddb7cda893bf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Jul 2023 13:42:31 -0300 Subject: [PATCH 2/3] fix(rule_runtime): avoid rewriting select clause sources (by @keynslug) Thanks to @keynslug for the insight and the patch. With this, we avoid rewriting the parsed select clause sources, and attempt to reuse the result context when reading values during evaluation. --- apps/emqx_rule_engine/src/emqx_rule_maps.erl | 9 ------ .../src/emqx_rule_runtime.erl | 29 ++++++++++++------- .../test/emqx_rule_maps_SUITE.erl | 27 ++++++++++++++++- 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl index b4dd8fc82..baf83ff6f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_maps.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_maps.erl @@ -129,15 +129,6 @@ general_find({index, _}, List, _OrgData, Handler) when not is_list(List) -> do_put({key, Key}, Val, Map, _OrgData) when is_map(Map) -> maps:put(Key, Val, Map); -do_put({key, Key}, Val, Data, _OrgData) when is_binary(Data) -> - case emqx_utils_json:safe_decode(Data, [return_maps]) of - {ok, Map = #{}} -> - %% Avoid losing other keys when the data is an encoded map... - Map#{Key => Val}; - _ -> - %% Fallback to the general case otherwise. - #{Key => Val} - end; do_put({key, Key}, Val, Data, _OrgData) when not is_map(Data) -> #{Key => Val}; do_put({index, {const, Index}}, Val, List, _OrgData) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f83aa2920..de1e92a3f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -23,7 +23,6 @@ -export([ apply_rule/3, apply_rules/3, - clear_rule_payload/0, inc_action_metrics/2 ]). @@ -196,18 +195,18 @@ select_and_transform([], _Columns, Action) -> select_and_transform(['*' | More], Columns, Action) -> select_and_transform(More, Columns, maps:merge(Action, Columns)); select_and_transform([{as, Field, Alias} | More], Columns, Action) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), select_and_transform( More, - nested_put(Alias, Val, Columns), + Columns, nested_put(Alias, Val, Action) ); select_and_transform([Field | More], Columns, Action) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), Key = alias(Field, Columns), select_and_transform( More, - nested_put(Key, Val, Columns), + Columns, nested_put(Key, Val, Action) ). @@ -217,25 +216,25 @@ select_and_collect(Fields, Columns) -> select_and_collect(Fields, Columns, {#{}, {'item', []}}). select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), {nested_put(Alias, Val, Action), {A, ensure_list(Val)}}; select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), select_and_collect( More, nested_put(Alias, Val, Columns), {nested_put(Alias, Val, Action), LastKV} ); select_and_collect([Field], Columns, {Action, _}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), Key = alias(Field, Columns), {nested_put(Key, Val, Action), {'item', ensure_list(Val)}}; select_and_collect([Field | More], Columns, {Action, LastKV}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), Key = alias(Field, Columns), select_and_collect( More, - nested_put(Key, Val, Columns), + Columns, {nested_put(Key, Val, Action), LastKV} ). @@ -368,6 +367,16 @@ do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, En inc_action_metrics(RuleId, Result), Result. +eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) -> + case Context of + [Columns] -> + eval(Exp, Columns); + [Columns | Rest] -> + case eval(Exp, Columns) of + undefined -> eval(Exp, Rest); + Val -> Val + end + end; eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl index 9636072ad..ddb59bb41 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl @@ -73,8 +73,16 @@ t_nested_put_map(_) -> nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}}) ), %% since we currently support passing a binary-encoded json as input... + %% will always decode as json, even if non-intentional... + + %% note: since we handle json-encoded binaries when evaluating the + %% rule rather than baking the decoding in `nested_put`, we test + %% this corner case that _would_ otherwise lose data to + %% demonstrate this behavior. + + %% loses `b' ! ?assertEqual( - #{payload => #{<<"a">> => v1, <<"b">> => <<"v2">>}}, + #{payload => #{<<"a">> => v1}}, nested_put( ?path([payload, <<"a">>]), v1, @@ -89,6 +97,23 @@ t_nested_put_map(_) -> #{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})} ) ), + %% loses `b' ! + ?assertEqual( + #{payload => #{<<"a">> => #{<<"old">> => <<"{}">>, <<"new">> => v1}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => emqx_utils_json:encode(#{a => #{old => <<"{}">>}, b => <<"{}">>})} + ) + ), + ?assertEqual( + #{payload => #{<<"a">> => #{<<"new">> => v1}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => <<"{}">>} + ) + ), ok. t_nested_put_index(_) -> From f9452241bdb9667a0792449691cc31ff566d1f7c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Jul 2023 17:29:46 -0300 Subject: [PATCH 3/3] test(rules): add a few more tests to assert our rule evaluation behavior --- .../test/emqx_rule_engine_SUITE.erl | 125 ++++++++++++++++++ .../test/emqx_rule_maps_SUITE.erl | 15 +-- 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 822fac067..c8bebab99 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx.hrl"). @@ -583,6 +584,122 @@ t_ensure_action_removed(_) -> %% Test cases for rule runtime %%------------------------------------------------------------------------------ +t_json_payload_decoding(_Config) -> + {ok, C} = emqtt:start_link(), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + + Cases = + [ + #{ + select_fields => + <<"payload.measurement, payload.data_type, payload.value, payload.device_id">>, + payload => emqx_utils_json:encode(#{ + measurement => <<"temp">>, + data_type => <<"FLOAT">>, + value => <<"32.12">>, + device_id => <<"devid">> + }), + expected => #{ + payload => #{ + <<"measurement">> => <<"temp">>, + <<"data_type">> => <<"FLOAT">>, + <<"value">> => <<"32.12">>, + <<"device_id">> => <<"devid">> + } + } + }, + %% "last write wins" examples + #{ + select_fields => <<"payload as p, payload.f as p.answer">>, + payload => emqx_utils_json:encode(#{f => 42, keep => <<"that?">>}), + expected => #{ + <<"p">> => #{ + <<"answer">> => 42 + } + } + }, + #{ + select_fields => <<"payload as p, payload.f as p.jsonlike.f">>, + payload => emqx_utils_json:encode(#{ + jsonlike => emqx_utils_json:encode(#{a => 0}), + f => <<"huh">> + }), + %% behavior from 4.4: jsonlike gets wiped without preserving old "keys" + %% here we overwrite it since we don't explicitly decode it + expected => #{ + <<"p">> => #{ + <<"jsonlike">> => #{<<"f">> => <<"huh">>} + } + } + }, + #{ + select_fields => + <<"payload as p, 42 as p, payload.measurement as p.measurement, 51 as p">>, + payload => emqx_utils_json:encode(#{ + measurement => <<"temp">>, + data_type => <<"FLOAT">>, + value => <<"32.12">>, + device_id => <<"devid">> + }), + expected => #{ + <<"p">> => 51 + } + }, + %% if selected field is already structured, new values are inserted into it + #{ + select_fields => + <<"json_decode(payload) as p, payload.a as p.z">>, + payload => emqx_utils_json:encode(#{ + a => 1, + b => <<"2">> + }), + expected => #{ + <<"p">> => #{ + <<"a">> => 1, + <<"b">> => <<"2">>, + <<"z">> => 1 + } + } + } + ], + ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>, + Topic = <<"some/topic">>, + + ok = snabbkaffe:start_trace(), + on_exit(fun() -> snabbkaffe:stop() end), + on_exit(fun() -> delete_rule(?TMP_RULEID) end), + lists:foreach( + fun(#{select_fields := Fs, payload := P, expected := E} = Case) -> + ct:pal("testing case ~p", [Case]), + SQL = <<"select ", Fs/binary, " from \"", Topic/binary, "\"">>, + delete_rule(?TMP_RULEID), + {ok, _Rule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [#{function => ActionFn}] + } + ), + {_, {ok, Event}} = + ?wait_async_action( + emqtt:publish(C, Topic, P, 0), + #{?snk_kind := action_response}, + 5_000 + ), + ?assertMatch( + #{selected := E}, + Event, + #{payload => P, fields => Fs, expected => E} + ), + ok + end, + Cases + ), + snabbkaffe:stop(), + + ok. + t_events(_Config) -> {ok, Client} = emqtt:start_link( [ @@ -3065,6 +3182,14 @@ republish_action(Topic, Payload, UserProperties) -> } }. +action_response(Selected, Envs, Args) -> + ?tp(action_response, #{ + selected => Selected, + envs => Envs, + args => Args + }), + ok. + make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> SQL = <<"select * from \"simple/topic\"">>, make_simple_rule(RuleId, SQL, Ts). diff --git a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl index ddb59bb41..f206e7fb1 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl @@ -72,15 +72,10 @@ t_nested_put_map(_) -> #{k => #{<<"t">> => #{<<"a">> => v1}}}, nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}}) ), - %% since we currently support passing a binary-encoded json as input... - %% will always decode as json, even if non-intentional... - %% note: since we handle json-encoded binaries when evaluating the %% rule rather than baking the decoding in `nested_put`, we test %% this corner case that _would_ otherwise lose data to %% demonstrate this behavior. - - %% loses `b' ! ?assertEqual( #{payload => #{<<"a">> => v1}}, nested_put( @@ -89,17 +84,21 @@ t_nested_put_map(_) -> #{payload => emqx_utils_json:encode(#{b => <<"v2">>})} ) ), + %% We have an asymmetry in the behavior here because `nested_put' + %% currently, at each key, will use `general_find' to get the + %% current value of the eky, and that attempts JSON decoding the + %% such value... So, the cases below, `old' gets preserved + %% because it's in this direct path. ?assertEqual( - #{payload => #{<<"a">> => #{<<"old">> => <<"v2">>, <<"new">> => v1}}}, + #{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"v2">>}}}, nested_put( ?path([payload, <<"a">>, <<"new">>]), v1, #{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})} ) ), - %% loses `b' ! ?assertEqual( - #{payload => #{<<"a">> => #{<<"old">> => <<"{}">>, <<"new">> => v1}}}, + #{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"{}">>}}}, nested_put( ?path([payload, <<"a">>, <<"new">>]), v1,