From e3ed7b59dd707872c39a9453db0b8f823bf613b8 Mon Sep 17 00:00:00 2001 From: zmstone Date: Sun, 26 May 2024 01:03:20 +0200 Subject: [PATCH 1/3] feat(redis): add a rule function to help formatting redis args The new function named 'map_to_redis_hset_args' can be used to format a map's key-value pairs into redis HSET (or HMSET) arg list. This new function is dedicated for redis to avoid abuse for other data integrations. --- .../src/emqx_bridge_redis_connector.erl | 37 ++++++++++++++----- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 33 +++++++++++++++++ .../test/emqx_rule_funcs_SUITE.erl | 21 +++++++++++ apps/emqx_utils/src/emqx_placeholder.erl | 3 +- apps/emqx_utils/src/emqx_utils.app.src | 2 +- 5 files changed, 85 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index e12155bb1..f117c4e7a 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -128,8 +128,8 @@ on_query( #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result} ), Result; - Error -> - Error + {error, Reason} -> + {error, Reason} end. on_batch_query( @@ -165,8 +165,8 @@ on_batch_query( } ), Result; - Error -> - Error + {error, Reason} -> + {error, Reason} end. trace_format_commands(Commands0) -> @@ -204,11 +204,15 @@ query(InstId, Query, RedisConnSt) -> end. proc_command_template(CommandTemplate, Msg) -> - lists:map( - fun(ArgTks) -> - emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary}) - end, - CommandTemplate + lists:reverse( + lists:foldl( + fun(ArgTks, Acc) -> + New = proc_tmpl(ArgTks, Msg), + lists:reverse(New, Acc) + end, + [], + CommandTemplate + ) ). preproc_command_template(CommandTemplate) -> @@ -216,3 +220,18 @@ preproc_command_template(CommandTemplate) -> fun emqx_placeholder:preproc_tmpl/1, CommandTemplate ). + +%% This function mimics emqx_placeholder:proc_tmpl/3 but with an +%% injected special handling of map_to_redis_hset_args result +%% which is a list of redis command args (all in binary string format) +proc_tmpl([{var, Phld}], Data) -> + case emqx_placeholder:lookup_var(Phld, Data) of + [map_to_redis_hset_args | L] -> + L; + Other -> + [emqx_utils_conv:bin(Other)] + end; +proc_tmpl(Tokens, Data) -> + %% more than just a var ref, but a string, or a concatenation of string and a var + %% this is must be a single arg, format it into a binary + [emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary})]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 604f43d82..ed838e6d1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -160,6 +160,7 @@ find/3, join_to_string/1, join_to_string/2, + map_to_redis_hset_args/1, join_to_sql_values_string/1, jq/2, jq/3, @@ -814,6 +815,38 @@ join_to_string(Str) -> emqx_variform_bif:join_to_string(Str). join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List). +%% @doc Format map key-value pairs as redis HSET (or HMSET) command fields. +%% Notes: +%% - Non-string keys in the input map are dropped +%% - Keys are not quoted +%% - String values are always quoted +%% - No escape sequence for keys and values +%% - Float point values are formatted with fixed (6) decimal point compact-formatting +map_to_redis_hset_args(Map) when erlang:is_map(Map) -> + [map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)]. + +redis_hset_acc(K, V, IoData) -> + try + [redis_field_name(K), redis_field_value(V) | IoData] + catch + _:_ -> + IoData + end. + +redis_field_name(K) when erlang:is_binary(K) -> + K; +redis_field_name(K) -> + throw({bad_redis_field_name, K}). + +redis_field_value(V) when erlang:is_binary(V) -> + iolist_to_binary([$", V, $"]); +redis_field_value(V) when erlang:is_integer(V) -> + integer_to_binary(V); +redis_field_value(V) when erlang:is_float(V) -> + float2str(V, 6); +redis_field_value(V) when erlang:is_boolean(V) -> + atom_to_binary(V). + join_to_sql_values_string(List) -> QuotedList = [ diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index e260b04e1..fab105f7b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -1376,6 +1376,27 @@ t_parse_date_errors(_) -> ok. +t_map_to_redis_hset_args(_Config) -> + Do = fun(Map) -> tl(emqx_rule_funcs:map_to_redis_hset_args(Map)) end, + ?assertEqual([], Do(#{})), + ?assertEqual([], Do(#{1 => 2})), + ?assertEqual([<<"a">>, <<"1">>], Do(#{<<"a">> => 1, 3 => 4})), + ?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})), + ?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})), + ?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})), + ?assertEqual([<<"a">>, <<"\"\"">>], Do(#{<<"a">> => <<"">>})), + ?assertEqual([<<"a">>, <<"\"i j\"">>], Do(#{<<"a">> => <<"i j">>})), + %% no determined ordering + ?assert( + case Do(#{<<"a">> => 1, <<"b">> => 2}) of + [<<"a">>, <<"1">>, <<"b">>, <<"2">>] -> + true; + [<<"b">>, <<"2">>, <<"a">>, <<"1">>] -> + true + end + ), + ok. + %%------------------------------------------------------------------------------ %% Utility functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 84000669d..ddc32cd0d 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -37,7 +37,8 @@ proc_tmpl_deep/3, bin/1, - sql_data/1 + sql_data/1, + lookup_var/2 ]). -export([ diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index 8aef21479..bac23cefb 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.2.0"}, + {vsn, "5.2.1"}, {modules, [ emqx_utils, emqx_utils_api, From aa7ce1f64121385ebe0f21d2db5969acb2da3d17 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 3 Jun 2024 23:03:15 +0200 Subject: [PATCH 2/3] fix(bridge/redis): add test case for map_to_redis_hset_args --- .../test/emqx_bridge_v2_redis_SUITE.erl | 48 +++++++++++++++---- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 2 +- .../test/emqx_rule_funcs_SUITE.erl | 4 +- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl index f0fb8872d..7d3003bfa 100644 --- a/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl +++ b/apps/emqx_bridge_redis/test/emqx_bridge_v2_redis_SUITE.erl @@ -46,7 +46,8 @@ matrix_testcases() -> t_start_stop, t_create_via_http, t_on_get_status, - t_sync_query + t_sync_query, + t_map_to_redis_hset_args ]. init_per_suite(Config) -> @@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) -> Path = group_path(Config), ct:comment(Path), ConnectorConfig = connector_config(Name, Path, NConfig), - BridgeConfig = action_config(Name, Path, Name), + BridgeConfig = action_config(Name, Path, Name, TestCase), ok = snabbkaffe:start_trace(), [ {connector_type, ?CONNECTOR_TYPE}, @@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) -> ct:pal("parsed config: ~p", [Config]), InnerConfigMap. -action_config(Name, Path, ConnectorId) -> +action_config(Name, Path, ConnectorId, TestCase) -> + Template = + try + ?MODULE:TestCase(command_template) + catch + _:_ -> + [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>] + end, [RedisType, _Transport | _] = Path, CommonCfg = #{ @@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) -> <<"connector">> => ConnectorId, <<"parameters">> => #{ - <<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>], + <<"command_template">> => Template, <<"redis_type">> => atom_to_binary(RedisType) }, <<"local_topic">> => <<"t/redis">>, @@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) -> emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap). make_message() -> - ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + make_message_with_payload(Payload). + +make_message_with_payload(Payload) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), #{ clientid => ClientId, payload => Payload, @@ -290,7 +301,7 @@ t_start_stop(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_start_stop(Config) -> +t_start_stop(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped), ok. @@ -300,7 +311,7 @@ t_create_via_http(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_create_via_http(Config) -> +t_create_via_http(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_create_via_http(Config), ok. @@ -310,7 +321,7 @@ t_on_get_status(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_on_get_status(Config) -> +t_on_get_status(Config) when is_list(Config) -> emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}), ok. @@ -320,7 +331,7 @@ t_sync_query(matrix) -> [sentinel, tcp], [cluster, tcp] ]}; -t_sync_query(Config) -> +t_sync_query(Config) when is_list(Config) -> ok = emqx_bridge_v2_testlib:t_sync_query( Config, fun make_message/0, @@ -328,3 +339,22 @@ t_sync_query(Config) -> redis_bridge_connector_send_done ), ok. + +t_map_to_redis_hset_args(matrix) -> + {map_to_redis_hset_args, [ + [single, tcp], + [sentinel, tcp], + [cluster, tcp] + ]}; +t_map_to_redis_hset_args(command_template) -> + [<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>]; +t_map_to_redis_hset_args(Config) when is_list(Config) -> + Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}), + MsgFn = fun() -> make_message_with_payload(Payload) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, + MsgFn, + fun(Res) -> ?assertMatch({ok, _}, Res) end, + redis_bridge_connector_send_done + ), + ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index ed838e6d1..9de7b0173 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -839,7 +839,7 @@ redis_field_name(K) -> throw({bad_redis_field_name, K}). redis_field_value(V) when erlang:is_binary(V) -> - iolist_to_binary([$", V, $"]); + V; redis_field_value(V) when erlang:is_integer(V) -> integer_to_binary(V); redis_field_value(V) when erlang:is_float(V) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl index fab105f7b..eb7b97b5f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl @@ -1384,8 +1384,8 @@ t_map_to_redis_hset_args(_Config) -> ?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})), ?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})), ?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})), - ?assertEqual([<<"a">>, <<"\"\"">>], Do(#{<<"a">> => <<"">>})), - ?assertEqual([<<"a">>, <<"\"i j\"">>], Do(#{<<"a">> => <<"i j">>})), + ?assertEqual([<<"a">>, <<"">>], Do(#{<<"a">> => <<"">>})), + ?assertEqual([<<"a">>, <<"i j">>], Do(#{<<"a">> => <<"i j">>})), %% no determined ordering ?assert( case Do(#{<<"a">> => 1, <<"b">> => 2}) of From e7fecd5e91bcb8a23fd391f443247d91f5297079 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 3 Jun 2024 23:13:13 +0200 Subject: [PATCH 3/3] docs: add changelog for PR 13172 --- changes/ee/feat-13172.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ee/feat-13172.en.md diff --git a/changes/ee/feat-13172.en.md b/changes/ee/feat-13172.en.md new file mode 100644 index 000000000..259dc075a --- /dev/null +++ b/changes/ee/feat-13172.en.md @@ -0,0 +1,5 @@ +Added a rule function `map_to_redis_hset_args` to help preparing redis HSET (or HMSET) multi-fields values. + +For example, if `payload.value` is a map of multiple data fields, +this rule `SELECT map_to_redis_hset_args(payload.value) as hset_fields FROM "t/#"` can prepare `hset_fields` +for redis action to render the command template like `HMSET name1 ${hset_fields}`.