fix(republish): to_qos & to_retain, add new util funcs

This commit is contained in:
DDDHuang 2022-06-10 11:37:34 +08:00
parent ac700b8e6f
commit 7d848950c7
4 changed files with 126 additions and 75 deletions

View File

@ -35,23 +35,27 @@
},
target_qos => #{
order => 2,
type => string,
type => editable_select,
enum => [<<"0">>, <<"1">>, <<"2">>],
required => true,
default => <<"0">>,
title => #{en => <<"Target QoS">>,
zh => <<"目的 QoS"/utf8>>},
description => #{en =>
<<"The QoS Level to be uses when republishing the message."
" Set to -1 to use the original QoS."
" Support placeholder variables.">>,
" Support placeholder variables."
" Set to ${qos} to use the original QoS."
" Or other variable, value is 0 or 1 or 2">>,
zh =>
<<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS。"
"支持占位符变量"/utf8>>}
<<"重新发布消息时用的 QoS 级别。"
"支持占位符变量,可以使用 ${qos} 来使用原消息的 QoS"
"或其他值为 0 或 1 或 2 的变量。"/utf8>>}
},
target_retain => #{
order => 3,
type => string,
required => true,
type => editable_select,
enum => [<<"true">>, <<"false">>],
required => false,
default => <<"false">>,
title => #{en => <<"Target Retain">>,
zh => <<"目标保留消息标识"/utf8>>},
@ -226,9 +230,9 @@ on_action_republish(Selected, _Envs = #{
timestamp = erlang:system_time(millisecond)
},
increase_and_publish(ActId, Message);
Error ->
Errors ->
emqx_rule_metrics:inc_actions_error(ActId),
_ = log_error(Error),
_ = log_error(Errors),
{badact, bad_qos_retain}
end.
@ -257,54 +261,53 @@ format_msg([], Data) ->
format_msg(Tokens, Data) ->
emqx_rule_utils:proc_tmpl(Tokens, Data).
get_qos(-1, _Data) -> {ok, 0};
get_qos(0, _Data) -> {ok, 0};
get_qos(1, _Data) -> {ok, 1};
get_qos(2, _Data) -> {ok, 2};
get_qos({path, Path}, Data) ->
to_qos(emqx_rule_maps:nested_get({path, Path}, Data, 0)).
to_qos(0) -> {ok, 0};
to_qos(1) -> {ok, 1};
to_qos(2) -> {ok, 2};
to_qos(<<"-1">>) -> {ok, 0};
to_qos(<<"0">>) -> {ok, 0};
to_qos(<<"1">>) -> {ok, 1};
to_qos(<<"2">>) -> {ok, 2};
to_qos(TargetQoS) ->
case parse_value_or_placeholder(TargetQoS) of
{path, P} ->
{ok, {path, P}};
_ ->
{error, bad_qos}
case get_qos(TargetQoS) of
{ok, QoS} ->
{ok, QoS};
_Error ->
case emqx_rule_utils:preproc_tmpl(TargetQoS) of
Tmpl = [{var, _}] ->
{ok, Tmpl};
_ ->
{error, bad_qos}
end
end.
get_retain(false, _Data) -> {ok, false};
get_retain(true, _Data) -> {ok, true};
get_retain({path, Path}, Data) ->
to_retain(emqx_rule_maps:nested_get({path, Path}, Data, true)).
get_qos(Tmpl, Data) ->
get_qos(emqx_rule_utils:replace_simple_var(Tmpl, Data)).
get_qos(<<"-1">>) -> {ok, 0};
get_qos(<<"0">>) -> {ok, 0};
get_qos(<<"1">>) -> {ok, 1};
get_qos(<<"2">>) -> {ok, 2};
get_qos(0) -> {ok, 0};
get_qos(1) -> {ok, 1};
get_qos(2) -> {ok, 2};
get_qos(_) -> {error, bad_qos}.
to_retain(true) -> {ok, true};
to_retain(false) -> {ok, false};
to_retain(<<"true">>) -> {ok, true};
to_retain(<<"false">>) -> {ok, false};
to_retain(<<"1">>) -> {ok, true};
to_retain(<<"0">>) -> {ok, false};
to_retain(1) -> {ok, true};
to_retain(0) -> {ok, false};
to_retain(TargetRetain) ->
case parse_value_or_placeholder(TargetRetain) of
{path, P} ->
{ok, {path, P}};
_ ->
{error, bad_retain}
case get_retain(TargetRetain) of
{ok, Retain} ->
{ok, Retain};
_Error ->
case emqx_rule_utils:preproc_tmpl(TargetRetain) of
Tmpl = [{var, _}] ->
{ok, Tmpl};
_ ->
{error, bad_retain}
end
end.
parse_value_or_placeholder(ValueOrPlaceholder) ->
case re:run(ValueOrPlaceholder, "^\\$\{.+\}$") of
nomatch ->
ValueOrPlaceholder;
{match, [{0, Length}]} ->
Placeholder = binary:part(ValueOrPlaceholder, 2, Length - 3),
{path, [{key, Key} || Key <- string:lexemes(Placeholder, ". ")]}
end.
get_retain(Tmpl, Data) ->
get_retain(emqx_rule_utils:replace_simple_var(Tmpl, Data)).
get_retain(true) -> {ok, true};
get_retain(false) -> {ok, false};
get_retain(<<"true">>) -> {ok, true};
get_retain(<<"false">>) -> {ok, false};
get_retain(<<"1">>) -> {ok, true};
get_retain(<<"0">>) -> {ok, false};
get_retain(1) -> {ok, true};
get_retain(0) -> {ok, false};
get_retain(_) -> {error, bad_retain}.

View File

@ -2,10 +2,13 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -15,7 +18,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
@ -29,7 +33,8 @@
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
@ -43,7 +48,8 @@
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
@ -57,7 +63,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
@ -71,7 +78,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
@ -85,7 +93,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -99,7 +108,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -114,7 +124,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -129,7 +140,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -145,10 +157,13 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -158,7 +173,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.8",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
@ -172,7 +188,8 @@
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.7",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -186,7 +203,8 @@
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.6",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -200,7 +218,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.5",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -214,7 +233,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.4",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -228,7 +248,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.3",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -242,7 +263,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.2",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -257,7 +279,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.1",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -272,7 +295,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.0",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},

View File

@ -16,6 +16,10 @@
-module(emqx_rule_utils).
-export([ replace_vars_in_str/2
, replace_simple_var/2
]).
%% preprocess and process tempalte string with place holders
-export([ preproc_tmpl/1
, proc_tmpl/2
@ -87,6 +91,22 @@ preproc_tmpl([[Str, Phld]| Tokens], Acc) ->
preproc_tmpl([[Str]| Tokens], Acc) ->
preproc_tmpl(Tokens, put_head(str, Str, Acc)).
%% Replace a string contains vars to another string in which the placeholders are replace by the
%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
%% "a: 1".
replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
proc_tmpl(Tokens, Data, #{return => full_binary});
replace_vars_in_str(Val, _Data) ->
Val.
%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
%% value will be an integer 1.
replace_simple_var(Tokens, Data) when is_list(Tokens) ->
[Var] = proc_tmpl(Tokens, Data, #{return => rawlist}),
Var;
replace_simple_var(Val, _Data) ->
Val.
put_head(_Type, <<>>, List) -> List;
put_head(Type, Term, List) ->
[{Type, Term} | List].

View File

@ -47,6 +47,7 @@
, array
, file
, cfgselect %% TODO: [5.0] refactor this
, editable_select
]).
%%------------------------------------------------------------------------------
@ -84,6 +85,9 @@ validate_spec(ParamsSepc) ->
%% Internal Functions
%%------------------------------------------------------------------------------
%% Validate editable_select first, because editable_select has enum selection.
validate_value(Val, #{type := editable_select}) ->
Val;
validate_value(Val, #{enum := Enum}) ->
validate_enum(Val, Enum);
validate_value(Val, #{type := object} = Spec) ->