diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 9a4a0007a..2ca0f410d 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -32,6 +32,8 @@ -define(WORKER_POOL_SIZE, 4). +-import(emqx_common_test_helpers, [on_exit/1]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -106,7 +108,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_conf]), ok. init_per_testcase(_Testcase, Config) -> @@ -123,6 +125,7 @@ end_per_testcase(_Testcase, Config) -> connect_and_clear_table(Config), ok = snabbkaffe:stop(), delete_bridge(Config), + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -142,7 +145,7 @@ common_init(Config0) -> % Ensure EE bridge module is loaded _ = application:load(emqx_ee_bridge), _ = emqx_ee_bridge:module_info(), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_rule_engine]), emqx_mgmt_api_test_util:init_suite(), % Connect to mysql directly and create the table connect_and_create_table(Config0), @@ -212,9 +215,13 @@ parse_and_check(ConfigString, BridgeType, Name) -> Config. create_bridge(Config) -> + create_bridge(Config, _Overrides = #{}). + +create_bridge(Config, Overrides) -> BridgeType = ?config(mysql_bridge_type, Config), Name = ?config(mysql_name, Config), - MysqlConfig = ?config(mysql_config, Config), + MysqlConfig0 = ?config(mysql_config, Config), + MysqlConfig = emqx_utils_maps:deep_merge(MysqlConfig0, Overrides), emqx_bridge:create(BridgeType, Name, MysqlConfig). delete_bridge(Config) -> @@ -323,6 +330,26 @@ connect_and_clear_table(Config) -> connect_and_get_payload(Config) -> query_direct_mysql(Config, ?SQL_SELECT). +create_rule_and_action_http(Config) -> + Name = ?config(mysql_name, Config), + Type = ?config(mysql_bridge_type, Config), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + Params = #{ + enable => true, + sql => <<"SELECT * FROM \"t/topic\"">>, + actions => [BridgeId] + }, + Path = emqx_mgmt_api_test_util:api_path(["rules"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res0} -> + Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + {ok, Res}; + Error -> + Error + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -776,3 +803,44 @@ t_table_removed(Config) -> [] ), ok. + +t_nested_payload_template(Config) -> + Name = ?config(mysql_name, Config), + BridgeType = ?config(mysql_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + Value = integer_to_binary(erlang:unique_integer()), + ?check_trace( + begin + connect_and_create_table(Config), + {ok, _} = create_bridge( + Config, + #{ + <<"sql">> => + "INSERT INTO mqtt_test(payload, arrived) " + "VALUES (${payload.value}, FROM_UNIXTIME(${timestamp}/1000))" + } + ), + {ok, #{<<"from">> := [Topic]}} = create_rule_and_action_http(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)) + ), + %% send message via rule action + Payload = emqx_utils_json:encode(#{value => Value}), + Message = emqx_message:make(Topic, Payload), + {_, {ok, _}} = + ?wait_async_action( + emqx:publish(Message), + #{?snk_kind := mysql_connector_query_return}, + 10_000 + ), + ?assertEqual( + {ok, [<<"payload">>], [[Value]]}, + connect_and_get_payload(Config) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index fd40b906d..edf4123e4 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -257,7 +257,14 @@ quote_mysql(Str) -> lookup_var(Var, Value) when Var == ?PH_VAR_THIS orelse Var == [] -> Value; -lookup_var([Prop | Rest], Data) -> +lookup_var([Prop | Rest], Data0) -> + Data = + case emqx_utils_json:safe_decode(Data0, [return_maps]) of + {ok, Data1} -> + Data1; + {error, _} -> + Data0 + end, case lookup(Prop, Data) of {ok, Value} -> lookup_var(Rest, Value); diff --git a/apps/emqx_utils/test/emqx_placeholder_SUITE.erl b/apps/emqx_utils/test/emqx_placeholder_SUITE.erl index fc431e80c..81bf0853a 100644 --- a/apps/emqx_utils/test/emqx_placeholder_SUITE.erl +++ b/apps/emqx_utils/test/emqx_placeholder_SUITE.erl @@ -39,6 +39,15 @@ t_proc_tmpl_path(_) -> emqx_placeholder:proc_tmpl(Tks, Selected) ). +t_proc_tmpl_path_encoded_json(_) -> + %% when we receive a message from the rule engine, it is a map with an encoded payload + Selected = #{payload => emqx_utils_json:encode(#{d1 => #{d2 => <<"hi">>}})}, + Tks = emqx_placeholder:preproc_tmpl(<<"payload.d1.d2:${payload.d1.d2}">>), + ?assertEqual( + <<"payload.d1.d2:hi">>, + emqx_placeholder:proc_tmpl(Tks, Selected) + ). + t_proc_tmpl_custom_ph(_) -> Selected = #{a => <<"a">>, b => <<"b">>}, Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b}">>, #{placeholders => [<<"${a}">>]}), diff --git a/changes/ce/fix-11164.en.md b/changes/ce/fix-11164.en.md new file mode 100644 index 000000000..2b7e6436a --- /dev/null +++ b/changes/ce/fix-11164.en.md @@ -0,0 +1 @@ +Reintroduced support for nested (i.e.: `${payload.a.b.c}`) placeholders for extracting data from rule action messages without the need for calling `json_decode(payload)` first.