Merge pull request #13172 from zmstone/0603-add-redis-arg-formatter-to-rule-engine-funcs
feat(redis): add a rule function to help formatting redis args
This commit is contained in:
commit
cda6d5f636
|
@ -128,8 +128,8 @@ on_query(
|
||||||
#{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
|
#{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
|
||||||
),
|
),
|
||||||
Result;
|
Result;
|
||||||
Error ->
|
{error, Reason} ->
|
||||||
Error
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_batch_query(
|
on_batch_query(
|
||||||
|
@ -165,8 +165,8 @@ on_batch_query(
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
Result;
|
Result;
|
||||||
Error ->
|
{error, Reason} ->
|
||||||
Error
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
trace_format_commands(Commands0) ->
|
trace_format_commands(Commands0) ->
|
||||||
|
@ -204,11 +204,15 @@ query(InstId, Query, RedisConnSt) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
proc_command_template(CommandTemplate, Msg) ->
|
proc_command_template(CommandTemplate, Msg) ->
|
||||||
lists:map(
|
lists:reverse(
|
||||||
fun(ArgTks) ->
|
lists:foldl(
|
||||||
emqx_placeholder:proc_tmpl(ArgTks, Msg, #{return => full_binary})
|
fun(ArgTks, Acc) ->
|
||||||
end,
|
New = proc_tmpl(ArgTks, Msg),
|
||||||
CommandTemplate
|
lists:reverse(New, Acc)
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
CommandTemplate
|
||||||
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
preproc_command_template(CommandTemplate) ->
|
preproc_command_template(CommandTemplate) ->
|
||||||
|
@ -216,3 +220,18 @@ preproc_command_template(CommandTemplate) ->
|
||||||
fun emqx_placeholder:preproc_tmpl/1,
|
fun emqx_placeholder:preproc_tmpl/1,
|
||||||
CommandTemplate
|
CommandTemplate
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% This function mimics emqx_placeholder:proc_tmpl/3 but with an
|
||||||
|
%% injected special handling of map_to_redis_hset_args result
|
||||||
|
%% which is a list of redis command args (all in binary string format)
|
||||||
|
proc_tmpl([{var, Phld}], Data) ->
|
||||||
|
case emqx_placeholder:lookup_var(Phld, Data) of
|
||||||
|
[map_to_redis_hset_args | L] ->
|
||||||
|
L;
|
||||||
|
Other ->
|
||||||
|
[emqx_utils_conv:bin(Other)]
|
||||||
|
end;
|
||||||
|
proc_tmpl(Tokens, Data) ->
|
||||||
|
%% more than just a var ref, but a string, or a concatenation of string and a var
|
||||||
|
%% this is must be a single arg, format it into a binary
|
||||||
|
[emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary})].
|
||||||
|
|
|
@ -46,7 +46,8 @@ matrix_testcases() ->
|
||||||
t_start_stop,
|
t_start_stop,
|
||||||
t_create_via_http,
|
t_create_via_http,
|
||||||
t_on_get_status,
|
t_on_get_status,
|
||||||
t_sync_query
|
t_sync_query,
|
||||||
|
t_map_to_redis_hset_args
|
||||||
].
|
].
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -133,7 +134,7 @@ common_init_per_testcase(TestCase, Config) ->
|
||||||
Path = group_path(Config),
|
Path = group_path(Config),
|
||||||
ct:comment(Path),
|
ct:comment(Path),
|
||||||
ConnectorConfig = connector_config(Name, Path, NConfig),
|
ConnectorConfig = connector_config(Name, Path, NConfig),
|
||||||
BridgeConfig = action_config(Name, Path, Name),
|
BridgeConfig = action_config(Name, Path, Name, TestCase),
|
||||||
ok = snabbkaffe:start_trace(),
|
ok = snabbkaffe:start_trace(),
|
||||||
[
|
[
|
||||||
{connector_type, ?CONNECTOR_TYPE},
|
{connector_type, ?CONNECTOR_TYPE},
|
||||||
|
@ -222,7 +223,14 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
|
||||||
ct:pal("parsed config: ~p", [Config]),
|
ct:pal("parsed config: ~p", [Config]),
|
||||||
InnerConfigMap.
|
InnerConfigMap.
|
||||||
|
|
||||||
action_config(Name, Path, ConnectorId) ->
|
action_config(Name, Path, ConnectorId, TestCase) ->
|
||||||
|
Template =
|
||||||
|
try
|
||||||
|
?MODULE:TestCase(command_template)
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
[<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>]
|
||||||
|
end,
|
||||||
[RedisType, _Transport | _] = Path,
|
[RedisType, _Transport | _] = Path,
|
||||||
CommonCfg =
|
CommonCfg =
|
||||||
#{
|
#{
|
||||||
|
@ -230,7 +238,7 @@ action_config(Name, Path, ConnectorId) ->
|
||||||
<<"connector">> => ConnectorId,
|
<<"connector">> => ConnectorId,
|
||||||
<<"parameters">> =>
|
<<"parameters">> =>
|
||||||
#{
|
#{
|
||||||
<<"command_template">> => [<<"RPUSH">>, <<"MSGS/${topic}">>, <<"${payload}">>],
|
<<"command_template">> => Template,
|
||||||
<<"redis_type">> => atom_to_binary(RedisType)
|
<<"redis_type">> => atom_to_binary(RedisType)
|
||||||
},
|
},
|
||||||
<<"local_topic">> => <<"t/redis">>,
|
<<"local_topic">> => <<"t/redis">>,
|
||||||
|
@ -262,8 +270,11 @@ parse_and_check_bridge_config(InnerConfigMap, Name) ->
|
||||||
emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
|
emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
|
||||||
|
|
||||||
make_message() ->
|
make_message() ->
|
||||||
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
||||||
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
make_message_with_payload(Payload).
|
||||||
|
|
||||||
|
make_message_with_payload(Payload) ->
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
#{
|
#{
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
payload => Payload,
|
payload => Payload,
|
||||||
|
@ -290,7 +301,7 @@ t_start_stop(matrix) ->
|
||||||
[sentinel, tcp],
|
[sentinel, tcp],
|
||||||
[cluster, tcp]
|
[cluster, tcp]
|
||||||
]};
|
]};
|
||||||
t_start_stop(Config) ->
|
t_start_stop(Config) when is_list(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped),
|
emqx_bridge_v2_testlib:t_start_stop(Config, redis_bridge_stopped),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -300,7 +311,7 @@ t_create_via_http(matrix) ->
|
||||||
[sentinel, tcp],
|
[sentinel, tcp],
|
||||||
[cluster, tcp]
|
[cluster, tcp]
|
||||||
]};
|
]};
|
||||||
t_create_via_http(Config) ->
|
t_create_via_http(Config) when is_list(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_create_via_http(Config),
|
emqx_bridge_v2_testlib:t_create_via_http(Config),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -310,7 +321,7 @@ t_on_get_status(matrix) ->
|
||||||
[sentinel, tcp],
|
[sentinel, tcp],
|
||||||
[cluster, tcp]
|
[cluster, tcp]
|
||||||
]};
|
]};
|
||||||
t_on_get_status(Config) ->
|
t_on_get_status(Config) when is_list(Config) ->
|
||||||
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
|
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -320,7 +331,7 @@ t_sync_query(matrix) ->
|
||||||
[sentinel, tcp],
|
[sentinel, tcp],
|
||||||
[cluster, tcp]
|
[cluster, tcp]
|
||||||
]};
|
]};
|
||||||
t_sync_query(Config) ->
|
t_sync_query(Config) when is_list(Config) ->
|
||||||
ok = emqx_bridge_v2_testlib:t_sync_query(
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||||
Config,
|
Config,
|
||||||
fun make_message/0,
|
fun make_message/0,
|
||||||
|
@ -328,3 +339,22 @@ t_sync_query(Config) ->
|
||||||
redis_bridge_connector_send_done
|
redis_bridge_connector_send_done
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_map_to_redis_hset_args(matrix) ->
|
||||||
|
{map_to_redis_hset_args, [
|
||||||
|
[single, tcp],
|
||||||
|
[sentinel, tcp],
|
||||||
|
[cluster, tcp]
|
||||||
|
]};
|
||||||
|
t_map_to_redis_hset_args(command_template) ->
|
||||||
|
[<<"HMSET">>, <<"t_map_to_redis_hset_args">>, <<"${payload}">>];
|
||||||
|
t_map_to_redis_hset_args(Config) when is_list(Config) ->
|
||||||
|
Payload = emqx_rule_funcs:map_to_redis_hset_args(#{<<"a">> => 1, <<"b">> => <<"2">>}),
|
||||||
|
MsgFn = fun() -> make_message_with_payload(Payload) end,
|
||||||
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||||
|
Config,
|
||||||
|
MsgFn,
|
||||||
|
fun(Res) -> ?assertMatch({ok, _}, Res) end,
|
||||||
|
redis_bridge_connector_send_done
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -160,6 +160,7 @@
|
||||||
find/3,
|
find/3,
|
||||||
join_to_string/1,
|
join_to_string/1,
|
||||||
join_to_string/2,
|
join_to_string/2,
|
||||||
|
map_to_redis_hset_args/1,
|
||||||
join_to_sql_values_string/1,
|
join_to_sql_values_string/1,
|
||||||
jq/2,
|
jq/2,
|
||||||
jq/3,
|
jq/3,
|
||||||
|
@ -814,6 +815,38 @@ join_to_string(Str) -> emqx_variform_bif:join_to_string(Str).
|
||||||
|
|
||||||
join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List).
|
join_to_string(Sep, List) -> emqx_variform_bif:join_to_string(Sep, List).
|
||||||
|
|
||||||
|
%% @doc Format map key-value pairs as redis HSET (or HMSET) command fields.
|
||||||
|
%% Notes:
|
||||||
|
%% - Non-string keys in the input map are dropped
|
||||||
|
%% - Keys are not quoted
|
||||||
|
%% - String values are always quoted
|
||||||
|
%% - No escape sequence for keys and values
|
||||||
|
%% - Float point values are formatted with fixed (6) decimal point compact-formatting
|
||||||
|
map_to_redis_hset_args(Map) when erlang:is_map(Map) ->
|
||||||
|
[map_to_redis_hset_args | maps:fold(fun redis_hset_acc/3, [], Map)].
|
||||||
|
|
||||||
|
redis_hset_acc(K, V, IoData) ->
|
||||||
|
try
|
||||||
|
[redis_field_name(K), redis_field_value(V) | IoData]
|
||||||
|
catch
|
||||||
|
_:_ ->
|
||||||
|
IoData
|
||||||
|
end.
|
||||||
|
|
||||||
|
redis_field_name(K) when erlang:is_binary(K) ->
|
||||||
|
K;
|
||||||
|
redis_field_name(K) ->
|
||||||
|
throw({bad_redis_field_name, K}).
|
||||||
|
|
||||||
|
redis_field_value(V) when erlang:is_binary(V) ->
|
||||||
|
V;
|
||||||
|
redis_field_value(V) when erlang:is_integer(V) ->
|
||||||
|
integer_to_binary(V);
|
||||||
|
redis_field_value(V) when erlang:is_float(V) ->
|
||||||
|
float2str(V, 6);
|
||||||
|
redis_field_value(V) when erlang:is_boolean(V) ->
|
||||||
|
atom_to_binary(V).
|
||||||
|
|
||||||
join_to_sql_values_string(List) ->
|
join_to_sql_values_string(List) ->
|
||||||
QuotedList =
|
QuotedList =
|
||||||
[
|
[
|
||||||
|
|
|
@ -1376,6 +1376,27 @@ t_parse_date_errors(_) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_map_to_redis_hset_args(_Config) ->
|
||||||
|
Do = fun(Map) -> tl(emqx_rule_funcs:map_to_redis_hset_args(Map)) end,
|
||||||
|
?assertEqual([], Do(#{})),
|
||||||
|
?assertEqual([], Do(#{1 => 2})),
|
||||||
|
?assertEqual([<<"a">>, <<"1">>], Do(#{<<"a">> => 1, 3 => 4})),
|
||||||
|
?assertEqual([<<"a">>, <<"1.1">>], Do(#{<<"a">> => 1.1})),
|
||||||
|
?assertEqual([<<"a">>, <<"true">>], Do(#{<<"a">> => true})),
|
||||||
|
?assertEqual([<<"a">>, <<"false">>], Do(#{<<"a">> => false})),
|
||||||
|
?assertEqual([<<"a">>, <<"">>], Do(#{<<"a">> => <<"">>})),
|
||||||
|
?assertEqual([<<"a">>, <<"i j">>], Do(#{<<"a">> => <<"i j">>})),
|
||||||
|
%% no determined ordering
|
||||||
|
?assert(
|
||||||
|
case Do(#{<<"a">> => 1, <<"b">> => 2}) of
|
||||||
|
[<<"a">>, <<"1">>, <<"b">>, <<"2">>] ->
|
||||||
|
true;
|
||||||
|
[<<"b">>, <<"2">>, <<"a">>, <<"1">>] ->
|
||||||
|
true
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Utility functions
|
%% Utility functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -37,7 +37,8 @@
|
||||||
proc_tmpl_deep/3,
|
proc_tmpl_deep/3,
|
||||||
|
|
||||||
bin/1,
|
bin/1,
|
||||||
sql_data/1
|
sql_data/1,
|
||||||
|
lookup_var/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_utils, [
|
{application, emqx_utils, [
|
||||||
{description, "Miscellaneous utilities for EMQX apps"},
|
{description, "Miscellaneous utilities for EMQX apps"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.2.0"},
|
{vsn, "5.2.1"},
|
||||||
{modules, [
|
{modules, [
|
||||||
emqx_utils,
|
emqx_utils,
|
||||||
emqx_utils_api,
|
emqx_utils_api,
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
Added a rule function `map_to_redis_hset_args` to help preparing redis HSET (or HMSET) multi-fields values.
|
||||||
|
|
||||||
|
For example, if `payload.value` is a map of multiple data fields,
|
||||||
|
this rule `SELECT map_to_redis_hset_args(payload.value) as hset_fields FROM "t/#"` can prepare `hset_fields`
|
||||||
|
for redis action to render the command template like `HMSET name1 ${hset_fields}`.
|
Loading…
Reference in New Issue