diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index bd3de3561..651fd24ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -10,6 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %% ct setup helpers init_per_suite(Config, Apps) -> @@ -211,19 +213,27 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> Res. create_rule_and_action_http(BridgeType, RuleTopic, Config) -> + create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). + +create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> BridgeName = ?config(bridge_name, Config), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>), Params = #{ enable => true, - sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>, + sql => SQL, actions => [BridgeId] }, Path = emqx_mgmt_api_test_util:api_path(["rules"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), ct:pal("rule action params: ~p", [Params]), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; - Error -> Error + {ok, Res0} -> + Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + {ok, Res}; + Error -> + Error end. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index dfd5fd07c..d29e38833 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -379,6 +379,41 @@ t_sync_device_id_missing(Config) -> iotdb_bridge_on_query ). +t_extract_device_id_from_rule_engine_message(Config) -> + BridgeType = ?config(bridge_type, Config), + RuleTopic = <<"t/iotdb">>, + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"), + Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)), + ?check_trace( + begin + {ok, _} = emqx_bridge_testlib:create_bridge(Config), + SQL = << + "SELECT\n" + " payload.measurement, payload.data_type, payload.value, payload.device_id\n" + "FROM\n" + " \"", + RuleTopic/binary, + "\"" + >>, + Opts = #{sql => SQL}, + {ok, _} = emqx_bridge_testlib:create_rule_and_action_http( + BridgeType, RuleTopic, Config, Opts + ), + emqx:publish(Message), + ?block_until(handle_async_reply, 5_000), + ok + end, + fun(Trace) -> + ?assertMatch( + [#{action := ack, result := {ok, 200, _, _}}], + ?of_kind(handle_async_reply, Trace) + ), + ok + end + ), + ok. + t_sync_invalid_data(Config) -> emqx_bridge_testlib:t_sync_query( Config, diff --git a/apps/emqx_rule_engine/src/emqx_rule_maps.erl b/apps/emqx_rule_engine/src/emqx_rule_maps.erl index baf83ff6f..b4dd8fc82 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_maps.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_maps.erl @@ -129,6 +129,15 @@ general_find({index, _}, List, _OrgData, Handler) when not is_list(List) -> do_put({key, Key}, Val, Map, _OrgData) when is_map(Map) -> maps:put(Key, Val, Map); +do_put({key, Key}, Val, Data, _OrgData) when is_binary(Data) -> + case emqx_utils_json:safe_decode(Data, [return_maps]) of + {ok, Map = #{}} -> + %% Avoid losing other keys when the data is an encoded map... + Map#{Key => Val}; + _ -> + %% Fallback to the general case otherwise. + #{Key => Val} + end; do_put({key, Key}, Val, Data, _OrgData) when not is_map(Data) -> #{Key => Val}; do_put({index, {const, Index}}, Val, List, _OrgData) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl index 9fdd60c56..9636072ad 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl @@ -71,7 +71,25 @@ t_nested_put_map(_) -> ?assertEqual( #{k => #{<<"t">> => #{<<"a">> => v1}}}, nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}}) - ). + ), + %% since we currently support passing a binary-encoded json as input... + ?assertEqual( + #{payload => #{<<"a">> => v1, <<"b">> => <<"v2">>}}, + nested_put( + ?path([payload, <<"a">>]), + v1, + #{payload => emqx_utils_json:encode(#{b => <<"v2">>})} + ) + ), + ?assertEqual( + #{payload => #{<<"a">> => #{<<"old">> => <<"v2">>, <<"new">> => v1}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})} + ) + ), + ok. t_nested_put_index(_) -> ?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])),