From 6b1da3bcc832e8e5021d404c05191f431471f510 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 9 Jun 2022 14:37:27 +0800 Subject: [PATCH 01/11] feat: republish support qos & retain placeholder --- .../src/emqx_rule_actions.erl | 177 ++++++++++++++---- .../src/emqx_rule_engine.appup.src | 44 +++-- 2 files changed, 167 insertions(+), 54 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index edeffbde4..ea94bff22 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,24 +35,40 @@ }, target_qos => #{ order => 2, - type => number, - enum => [-1, 0, 1, 2], + type => string, required => true, - default => 0, + default => <<"0">>, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, - description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>, - zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>} + description => #{en => + <<"The QoS Level to be uses when republishing the message." + " Set to -1 to use the original QoS." + " Support placeholder variables.">>, + zh => + <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS。" + "支持占位符变量"/utf8>>} + }, + target_retain => #{ + order => 3, + type => string, + required => true, + default => <<"false">>, + title => #{en => <<"Target Retain">>, + zh => <<"目标保留消息标识"/utf8>>}, + description => #{en => <<"The Retain flag to be uses when republishing the message." + " Support placeholder variables. Default is false">>, + zh => <<"重新发布消息时用的保留消息标识,支持占位符变量。默认 false"/utf8>>} }, payload_tmpl => #{ - order => 3, + order => 4, type => string, input => textarea, required => false, default => <<"${payload}">>, title => #{en => <<"Payload Template">>, zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported">>, + description => #{en => <<"The payload template, " + "variable interpolation is supported">>, zh => <<"消息内容模板,支持变量"/utf8>>} } }). @@ -89,7 +105,8 @@ params => #{}, title => #{en => <<"Do Nothing (debug)">>, zh => <<"空动作 (调试)"/utf8>>}, - description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>, + description => #{en => <<"This action does nothing and never fails. " + "It's for debug purpose">>, zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>} }). @@ -113,7 +130,8 @@ on_resource_create(_Name, Conf) -> %%------------------------------------------------------------------------------ %% Action 'inspect' %%------------------------------------------------------------------------------ --spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_inspect(Id, Params) -> Params. @@ -129,12 +147,15 @@ on_action_inspect(Selected, Envs) -> %%------------------------------------------------------------------------------ %% Action 'republish' %%------------------------------------------------------------------------------ --spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_republish(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_republish(Id, Params = #{ <<"target_topic">> := TargetTopic, - <<"target_qos">> := TargetQoS, + <<"target_qos">> := TargetQoS0, <<"payload_tmpl">> := PayloadTmpl }) -> + {ok, TargetRetain} = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), + {ok, TargetQoS} = to_qos(TargetQoS0), TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -157,20 +178,28 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end, - from = ActId, - flags = Flags, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = Timestamp - }); + }} = Bindings) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of + {{ok, RQoS}, {ok, Retain}} when is_integer(RQoS) andalso is_boolean(Retain) -> + Message = + #message{ + id = emqx_guid:gen(), + qos = if TargetQoS =:= -1 -> QoS; true -> RQoS end, + from = ActId, + flags = Flags#{retain => Retain}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = Timestamp + }, + increase_and_publish(ActId, Message); + Error -> + emqx_rule_metrics:inc_actions_error(ActId), + _ = log_error(Error), + {badact, bad_qos_retain} + end; %% in case this is not a "message.publish" request on_action_republish(Selected, _Envs = #{ @@ -180,27 +209,43 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end, - from = ActId, - flags = #{dup => false, retain => false}, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = erlang:system_time(millisecond) - }). + } = Bindings}) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of + {{ok, QoS}, {ok, Retain}} when is_integer(QoS) andalso is_boolean(Retain) -> + Message = + #message{ + id = emqx_guid:gen(), + qos = QoS, + from = ActId, + flags = #{dup => false, retain => Retain}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = erlang:system_time(millisecond) + }, + increase_and_publish(ActId, Message); + Error -> + emqx_rule_metrics:inc_actions_error(ActId), + _ = log_error(Error), + {badact, bad_qos_retain} + end. + +log_error({{ok, _}, RetainError}) -> + ?LOG(error, "[republish] invalid retain: ~p", [RetainError]); +log_error({QosError, {ok, _}}) -> + ?LOG(error, "[republish] invalid qos: ~p", [QosError]); +log_error({QosError, RetainError}) -> + ?LOG(error, "[republish] invalid qos: ~p invalid retain: ~p", [QosError, RetainError]). increase_and_publish(ActId, Msg) -> _ = emqx_broker:safe_publish(Msg), emqx_rule_metrics:inc_actions_success(ActId), emqx_metrics:inc_msg(Msg). --spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_do_nothing(ActId, Params) when is_binary(ActId) -> Params. @@ -211,3 +256,55 @@ format_msg([], Data) -> emqx_json:encode(Data); format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). + +get_qos(-1, _Data) -> {ok, 0}; +get_qos(0, _Data) -> {ok, 0}; +get_qos(1, _Data) -> {ok, 1}; +get_qos(2, _Data) -> {ok, 2}; +get_qos({path, Path}, Data) -> + to_qos(emqx_rule_maps:nested_get({path, Path}, Data, 0)). + +to_qos(0) -> {ok, 0}; +to_qos(1) -> {ok, 1}; +to_qos(2) -> {ok, 2}; +to_qos(<<"-1">>) -> {ok, 0}; +to_qos(<<"0">>) -> {ok, 0}; +to_qos(<<"1">>) -> {ok, 1}; +to_qos(<<"2">>) -> {ok, 2}; +to_qos(TargetQoS) -> + case parse_value_or_placeholder(TargetQoS) of + {path, P} -> + {ok, {path, P}}; + _ -> + {error, bad_qos} + end. + +get_retain(false, _Data) -> {ok, false}; +get_retain(true, _Data) -> {ok, true}; +get_retain({path, Path}, Data) -> + to_retain(emqx_rule_maps:nested_get({path, Path}, Data, true)). + +to_retain(true) -> {ok, true}; +to_retain(false) -> {ok, false}; +to_retain(<<"true">>) -> {ok, true}; +to_retain(<<"false">>) -> {ok, false}; +to_retain(<<"1">>) -> {ok, true}; +to_retain(<<"0">>) -> {ok, false}; +to_retain(1) -> {ok, true}; +to_retain(0) -> {ok, false}; +to_retain(TargetRetain) -> + case parse_value_or_placeholder(TargetRetain) of + {path, P} -> + {ok, {path, P}}; + _ -> + {error, bad_retain} + end. + +parse_value_or_placeholder(ValueOrPlaceholder) -> + case re:run(ValueOrPlaceholder, "^\\$\{.+\}$") of + nomatch -> + ValueOrPlaceholder; + {match, [{0, Length}]} -> + Placeholder = binary:part(ValueOrPlaceholder, 2, Length - 3), + {path, [{key, Key} || Key <- string:lexemes(Placeholder, ". ")]} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 7b4049254..5202b88b3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,9 +1,12 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -12,7 +15,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -25,7 +29,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -38,7 +43,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -51,7 +57,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -64,7 +71,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -136,9 +144,12 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -147,7 +158,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -160,7 +172,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, @@ -173,7 +186,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -186,7 +200,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -199,7 +214,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, From ac700b8e6f1d05058e55e411d0da5b8addd6b433 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Thu, 9 Jun 2022 17:35:55 +0800 Subject: [PATCH 02/11] fix(rule): replubish SUIT --- .../test/emqx_rule_engine_SUITE.erl | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 6263c8440..f5c379159 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -413,7 +413,8 @@ t_republish_action(_Config) -> #{rawsql => <<"select topic, payload, qos from \"t1\"">>, actions => [#{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"${payload}">>}}], description => <<"builtin-republish-rule">>}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -479,7 +480,8 @@ t_crud_rule_api(_Config) -> {<<"params">>,[ {<<"arg1">>,1}, {<<"target_topic">>, <<"t2">>}, - {<<"target_qos">>, -1}, + {<<"target_qos">>, <<"0">>}, + {<<"target_retain">>, <<"false">>}, {<<"payload_tmpl">>, <<"${payload}">>} ]} ]] @@ -1618,7 +1620,8 @@ t_sqlselect_multi_actoins_1(Config) -> #{name => 'crash_action', args => #{}, fallbacks => []}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1644,7 +1647,8 @@ t_sqlselect_multi_actoins_1_1(Config) -> #{name => 'crash_action', args => #{}, fallbacks => []}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1672,7 +1676,8 @@ t_sqlselect_multi_actoins_2(Config) -> #{name => 'crash_action', args => #{}, fallbacks => []}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1703,7 +1708,8 @@ t_sqlselect_multi_actoins_3(Config) -> ]}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1741,7 +1747,8 @@ t_sqlselect_multi_actoins_3_1(Config) -> ]}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1780,7 +1787,8 @@ t_sqlselect_multi_actoins_4(Config) -> ]}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -2534,7 +2542,8 @@ create_simple_repub_rule(TargetTopic, SQL, Template) -> #{rawsql => SQL, actions => [#{name => 'republish', args => #{<<"target_topic">> => TargetTopic, - <<"target_qos">> => -1, + <<"target_qos">> => <<"-1">>, + <<"target_retain">> => <<"false">>, <<"payload_tmpl">> => Template} }], description => <<"simple repub rule">>}), From 7d848950c77eb598d46990849644b8c559c2c46b Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 10 Jun 2022 11:37:34 +0800 Subject: [PATCH 03/11] fix(republish): to_qos & to_retain, add new util funcs --- .../src/emqx_rule_actions.erl | 109 +++++++++--------- .../src/emqx_rule_engine.appup.src | 68 +++++++---- apps/emqx_rule_engine/src/emqx_rule_utils.erl | 20 ++++ .../src/emqx_rule_validator.erl | 4 + 4 files changed, 126 insertions(+), 75 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index ea94bff22..1ecf134cb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,23 +35,27 @@ }, target_qos => #{ order => 2, - type => string, + type => editable_select, + enum => [<<"0">>, <<"1">>, <<"2">>], required => true, default => <<"0">>, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, description => #{en => <<"The QoS Level to be uses when republishing the message." - " Set to -1 to use the original QoS." - " Support placeholder variables.">>, + " Support placeholder variables." + " Set to ${qos} to use the original QoS." + " Or other variable, value is 0 or 1 or 2">>, zh => - <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS。" - "支持占位符变量"/utf8>>} + <<"重新发布消息时用的 QoS 级别。" + "支持占位符变量,可以使用 ${qos} 来使用原消息的 QoS," + "或其他值为 0 或 1 或 2 的变量。"/utf8>>} }, target_retain => #{ order => 3, - type => string, - required => true, + type => editable_select, + enum => [<<"true">>, <<"false">>], + required => false, default => <<"false">>, title => #{en => <<"Target Retain">>, zh => <<"目标保留消息标识"/utf8>>}, @@ -226,9 +230,9 @@ on_action_republish(Selected, _Envs = #{ timestamp = erlang:system_time(millisecond) }, increase_and_publish(ActId, Message); - Error -> + Errors -> emqx_rule_metrics:inc_actions_error(ActId), - _ = log_error(Error), + _ = log_error(Errors), {badact, bad_qos_retain} end. @@ -257,54 +261,53 @@ format_msg([], Data) -> format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). -get_qos(-1, _Data) -> {ok, 0}; -get_qos(0, _Data) -> {ok, 0}; -get_qos(1, _Data) -> {ok, 1}; -get_qos(2, _Data) -> {ok, 2}; -get_qos({path, Path}, Data) -> - to_qos(emqx_rule_maps:nested_get({path, Path}, Data, 0)). - -to_qos(0) -> {ok, 0}; -to_qos(1) -> {ok, 1}; -to_qos(2) -> {ok, 2}; -to_qos(<<"-1">>) -> {ok, 0}; -to_qos(<<"0">>) -> {ok, 0}; -to_qos(<<"1">>) -> {ok, 1}; -to_qos(<<"2">>) -> {ok, 2}; to_qos(TargetQoS) -> - case parse_value_or_placeholder(TargetQoS) of - {path, P} -> - {ok, {path, P}}; - _ -> - {error, bad_qos} + case get_qos(TargetQoS) of + {ok, QoS} -> + {ok, QoS}; + _Error -> + case emqx_rule_utils:preproc_tmpl(TargetQoS) of + Tmpl = [{var, _}] -> + {ok, Tmpl}; + _ -> + {error, bad_qos} + end end. -get_retain(false, _Data) -> {ok, false}; -get_retain(true, _Data) -> {ok, true}; -get_retain({path, Path}, Data) -> - to_retain(emqx_rule_maps:nested_get({path, Path}, Data, true)). +get_qos(Tmpl, Data) -> + get_qos(emqx_rule_utils:replace_simple_var(Tmpl, Data)). + +get_qos(<<"-1">>) -> {ok, 0}; +get_qos(<<"0">>) -> {ok, 0}; +get_qos(<<"1">>) -> {ok, 1}; +get_qos(<<"2">>) -> {ok, 2}; +get_qos(0) -> {ok, 0}; +get_qos(1) -> {ok, 1}; +get_qos(2) -> {ok, 2}; +get_qos(_) -> {error, bad_qos}. -to_retain(true) -> {ok, true}; -to_retain(false) -> {ok, false}; -to_retain(<<"true">>) -> {ok, true}; -to_retain(<<"false">>) -> {ok, false}; -to_retain(<<"1">>) -> {ok, true}; -to_retain(<<"0">>) -> {ok, false}; -to_retain(1) -> {ok, true}; -to_retain(0) -> {ok, false}; to_retain(TargetRetain) -> - case parse_value_or_placeholder(TargetRetain) of - {path, P} -> - {ok, {path, P}}; - _ -> - {error, bad_retain} + case get_retain(TargetRetain) of + {ok, Retain} -> + {ok, Retain}; + _Error -> + case emqx_rule_utils:preproc_tmpl(TargetRetain) of + Tmpl = [{var, _}] -> + {ok, Tmpl}; + _ -> + {error, bad_retain} + end end. -parse_value_or_placeholder(ValueOrPlaceholder) -> - case re:run(ValueOrPlaceholder, "^\\$\{.+\}$") of - nomatch -> - ValueOrPlaceholder; - {match, [{0, Length}]} -> - Placeholder = binary:part(ValueOrPlaceholder, 2, Length - 3), - {path, [{key, Key} || Key <- string:lexemes(Placeholder, ". ")]} - end. +get_retain(Tmpl, Data) -> + get_retain(emqx_rule_utils:replace_simple_var(Tmpl, Data)). + +get_retain(true) -> {ok, true}; +get_retain(false) -> {ok, false}; +get_retain(<<"true">>) -> {ok, true}; +get_retain(<<"false">>) -> {ok, false}; +get_retain(<<"1">>) -> {ok, true}; +get_retain(<<"0">>) -> {ok, false}; +get_retain(1) -> {ok, true}; +get_retain(0) -> {ok, false}; +get_retain(_) -> {error, bad_retain}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 5202b88b3..b92d67e4f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,10 +2,13 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.10", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -15,7 +18,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -29,7 +33,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -43,7 +48,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -57,7 +63,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -71,7 +78,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -85,7 +93,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -99,7 +108,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -114,7 +124,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -129,7 +140,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -145,10 +157,13 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.10", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -158,7 +173,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -172,7 +188,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -186,7 +203,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -200,7 +218,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -214,7 +233,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", - [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -228,7 +248,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.3", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -242,7 +263,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.2", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -257,7 +279,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.1", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -272,7 +295,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.0", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index d287f1ad0..047b9d1af 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -16,6 +16,10 @@ -module(emqx_rule_utils). +-export([ replace_vars_in_str/2 + , replace_simple_var/2 + ]). + %% preprocess and process tempalte string with place holders -export([ preproc_tmpl/1 , proc_tmpl/2 @@ -87,6 +91,22 @@ preproc_tmpl([[Str, Phld]| Tokens], Acc) -> preproc_tmpl([[Str]| Tokens], Acc) -> preproc_tmpl(Tokens, put_head(str, Str, Acc)). +%% Replace a string contains vars to another string in which the placeholders are replace by the +%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: +%% "a: 1". +replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> + proc_tmpl(Tokens, Data, #{return => full_binary}); +replace_vars_in_str(Val, _Data) -> + Val. + +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +replace_simple_var(Tokens, Data) when is_list(Tokens) -> + [Var] = proc_tmpl(Tokens, Data, #{return => rawlist}), + Var; +replace_simple_var(Val, _Data) -> + Val. + put_head(_Type, <<>>, List) -> List; put_head(Type, Term, List) -> [{Type, Term} | List]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl index e32ec66ab..57d5eb465 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_validator.erl @@ -47,6 +47,7 @@ , array , file , cfgselect %% TODO: [5.0] refactor this + , editable_select ]). %%------------------------------------------------------------------------------ @@ -84,6 +85,9 @@ validate_spec(ParamsSepc) -> %% Internal Functions %%------------------------------------------------------------------------------ +%% Validate editable_select first, because editable_select has enum selection. +validate_value(Val, #{type := editable_select}) -> + Val; validate_value(Val, #{enum := Enum}) -> validate_enum(Val, Enum); validate_value(Val, #{type := object} = Spec) -> From e1e2fd50fd4f142c9820e50083c315b9b9682881 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 10 Jun 2022 16:08:33 +0800 Subject: [PATCH 04/11] fix(republish): action input editable_select --- .../src/emqx_rule_actions.erl | 181 ++++++++---------- .../src/emqx_rule_validator.erl | 18 +- 2 files changed, 98 insertions(+), 101 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 1ecf134cb..aa5739da2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,10 +35,11 @@ }, target_qos => #{ order => 2, - type => editable_select, - enum => [<<"0">>, <<"1">>, <<"2">>], + input => editable_select, + type => [number, string], + enum => [0, 1, 2, <<"${qos}">>], required => true, - default => <<"0">>, + default => 0, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, description => #{en => @@ -48,20 +49,24 @@ " Or other variable, value is 0 or 1 or 2">>, zh => <<"重新发布消息时用的 QoS 级别。" - "支持占位符变量,可以使用 ${qos} 来使用原消息的 QoS," + "支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS," "或其他值为 0 或 1 或 2 的变量。"/utf8>>} }, target_retain => #{ order => 3, - type => editable_select, - enum => [<<"true">>, <<"false">>], + input => editable_select, + type => [boolean, string], + enum => [true, false, <<"${flags.retain}">>], required => false, - default => <<"false">>, + default => false, title => #{en => <<"Target Retain">>, zh => <<"目标保留消息标识"/utf8>>}, description => #{en => <<"The Retain flag to be uses when republishing the message." + " Set to ${flags.retain} to use the original Retain." " Support placeholder variables. Default is false">>, - zh => <<"重新发布消息时用的保留消息标识,支持占位符变量。默认 false"/utf8>>} + zh => <<"重新发布消息时用的保留消息标识。" + "支持占位符变量,可以填写 ${flags.retain} 来使用原消息的 Retain。" + "默认 false"/utf8>>} }, payload_tmpl => #{ order => 4, @@ -158,8 +163,8 @@ on_action_create_republish(Id, Params = #{ <<"target_qos">> := TargetQoS0, <<"payload_tmpl">> := PayloadTmpl }) -> - {ok, TargetRetain} = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), - {ok, TargetQoS} = to_qos(TargetQoS0), + TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), + TargetQoS = to_qos(TargetQoS0), TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -182,28 +187,21 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }} = Bindings) -> + } = Bindings}) -> ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), TargetRetain = maps:get('TargetRetain', Bindings, false), - case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of - {{ok, RQoS}, {ok, Retain}} when is_integer(RQoS) andalso is_boolean(Retain) -> - Message = - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> RQoS end, - from = ActId, - flags = Flags#{retain => Retain}, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = Timestamp - }, - increase_and_publish(ActId, Message); - Error -> - emqx_rule_metrics:inc_actions_error(ActId), - _ = log_error(Error), - {badact, bad_qos_retain} - end; + Message = + #message{ + id = emqx_guid:gen(), + qos = get_qos(TargetQoS, Selected, QoS), + from = ActId, + flags = Flags#{retain => get_retain(TargetRetain, Selected)}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = Timestamp + }, + increase_and_publish(ActId, Message); %% in case this is not a "message.publish" request on_action_republish(Selected, _Envs = #{ @@ -216,32 +214,18 @@ on_action_republish(Selected, _Envs = #{ } = Bindings}) -> ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), TargetRetain = maps:get('TargetRetain', Bindings, false), - case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of - {{ok, QoS}, {ok, Retain}} when is_integer(QoS) andalso is_boolean(Retain) -> - Message = - #message{ - id = emqx_guid:gen(), - qos = QoS, - from = ActId, - flags = #{dup => false, retain => Retain}, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = erlang:system_time(millisecond) - }, - increase_and_publish(ActId, Message); - Errors -> - emqx_rule_metrics:inc_actions_error(ActId), - _ = log_error(Errors), - {badact, bad_qos_retain} - end. - -log_error({{ok, _}, RetainError}) -> - ?LOG(error, "[republish] invalid retain: ~p", [RetainError]); -log_error({QosError, {ok, _}}) -> - ?LOG(error, "[republish] invalid qos: ~p", [QosError]); -log_error({QosError, RetainError}) -> - ?LOG(error, "[republish] invalid qos: ~p invalid retain: ~p", [QosError, RetainError]). + Message = + #message{ + id = emqx_guid:gen(), + qos = get_qos(TargetQoS, Selected, 0), + from = ActId, + flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = erlang:system_time(millisecond) + }, + increase_and_publish(ActId, Message). increase_and_publish(ActId, Msg) -> _ = emqx_broker:safe_publish(Msg), @@ -261,53 +245,56 @@ format_msg([], Data) -> format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). +%% -1 for old version. +to_qos(<<"-1">>) -> -1; +to_qos(-1) -> -1; to_qos(TargetQoS) -> - case get_qos(TargetQoS) of - {ok, QoS} -> - {ok, QoS}; - _Error -> - case emqx_rule_utils:preproc_tmpl(TargetQoS) of - Tmpl = [{var, _}] -> - {ok, Tmpl}; - _ -> - {error, bad_qos} - end + try + qos(TargetQoS) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetQoS) of + Tmpl = [{var, _}] -> + Tmpl; + _BadQoS -> + error({bad_qos, TargetQoS}) + end end. -get_qos(Tmpl, Data) -> - get_qos(emqx_rule_utils:replace_simple_var(Tmpl, Data)). +get_qos(-1, _Data, Default) -> Default; +get_qos(TargetQoS, Data, _Default) -> + qos(emqx_rule_utils:replace_simple_var(TargetQoS, Data)). -get_qos(<<"-1">>) -> {ok, 0}; -get_qos(<<"0">>) -> {ok, 0}; -get_qos(<<"1">>) -> {ok, 1}; -get_qos(<<"2">>) -> {ok, 2}; -get_qos(0) -> {ok, 0}; -get_qos(1) -> {ok, 1}; -get_qos(2) -> {ok, 2}; -get_qos(_) -> {error, bad_qos}. +qos(<<"0">>) -> 0; +qos(<<"1">>) -> 1; +qos(<<"2">>) -> 2; +qos(0) -> 0; +qos(1) -> 1; +qos(2) -> 2; +qos(BadQoS) -> error({bad_qos, BadQoS}). to_retain(TargetRetain) -> - case get_retain(TargetRetain) of - {ok, Retain} -> - {ok, Retain}; - _Error -> - case emqx_rule_utils:preproc_tmpl(TargetRetain) of - Tmpl = [{var, _}] -> - {ok, Tmpl}; - _ -> - {error, bad_retain} - end + try + retain(TargetRetain) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetRetain) of + Tmpl = [{var, _}] -> + Tmpl; + _BadRetain -> + error({bad_retain, TargetRetain}) + end end. -get_retain(Tmpl, Data) -> - get_retain(emqx_rule_utils:replace_simple_var(Tmpl, Data)). +get_retain(TargetRetain, Data) -> + retain(emqx_rule_utils:replace_simple_var(TargetRetain, Data)). -get_retain(true) -> {ok, true}; -get_retain(false) -> {ok, false}; -get_retain(<<"true">>) -> {ok, true}; -get_retain(<<"false">>) -> {ok, false}; -get_retain(<<"1">>) -> {ok, true}; -get_retain(<<"0">>) -> {ok, false}; -get_retain(1) -> {ok, true}; -get_retain(0) -> {ok, false}; -get_retain(_) -> {error, bad_retain}. +retain(true) -> true; +retain(false) -> false; +retain(<<"true">>) -> true; +retain(<<"false">>) -> false; +retain(<<"1">>) -> true; +retain(<<"0">>) -> false; +retain(1) -> true; +retain(0) -> false; +retain(BadRetain) -> error({bad_retain, BadRetain}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl index 57d5eb465..237e5dfba 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_validator.erl @@ -47,7 +47,6 @@ , array , file , cfgselect %% TODO: [5.0] refactor this - , editable_select ]). %%------------------------------------------------------------------------------ @@ -85,9 +84,8 @@ validate_spec(ParamsSepc) -> %% Internal Functions %%------------------------------------------------------------------------------ -%% Validate editable_select first, because editable_select has enum selection. -validate_value(Val, #{type := editable_select}) -> - Val; +validate_value(Val, #{type := Types} = Spec) when is_list(Types) -> + validate_types(Val, Types, Spec); validate_value(Val, #{enum := Enum}) -> validate_enum(Val, Enum); validate_value(Val, #{type := object} = Spec) -> @@ -95,6 +93,15 @@ validate_value(Val, #{type := object} = Spec) -> validate_value(Val, #{type := Type} = Spec) -> validate_type(Val, Type, Spec). +validate_types(Val, [], _Spec) -> + throw({invalid_data_type, Val}); +validate_types(Val, [Type | Types], Spec) -> + try + validate_type(Val, Type, Spec) + catch _:_ -> + validate_types(Val, Types, Spec) + end. + validate_type(Val, file, _Spec) -> validate_file(Val); validate_type(Val, String, Spec) when String =:= string; @@ -161,6 +168,9 @@ do_validate_spec(Name, #{type := array} = Spec) -> fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); (Items) -> do_validate_spec(Name, Items) end); +do_validate_spec(_Name, #{type := Types}) when is_list(Types) -> + _ = [ok = supported_data_type(Type, ?DATA_TYPES) || Type <- Types], + ok; do_validate_spec(_Name, #{type := Type}) -> _ = supported_data_type(Type, ?DATA_TYPES); From c5f754c3b88c47fe7e53fd2a841b05cb9c53cace Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 10 Jun 2022 17:16:14 +0800 Subject: [PATCH 05/11] fix(suite): add republish qos & retain test case --- .../src/emqx_rule_actions.erl | 10 ++-- .../test/emqx_rule_engine_SUITE.erl | 51 +++++++++++-------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index aa5739da2..83c13a3af 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -43,14 +43,12 @@ title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, description => #{en => - <<"The QoS Level to be uses when republishing the message." + <<"The QoS Level to be used when republishing the message." " Support placeholder variables." - " Set to ${qos} to use the original QoS." - " Or other variable, value is 0 or 1 or 2">>, + " Set to ${qos} to use the original QoS. Default is 0">>, zh => <<"重新发布消息时用的 QoS 级别。" - "支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS," - "或其他值为 0 或 1 或 2 的变量。"/utf8>>} + "支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS。默认 0"/utf8>>} }, target_retain => #{ order => 3, @@ -61,7 +59,7 @@ default => false, title => #{en => <<"Target Retain">>, zh => <<"目标保留消息标识"/utf8>>}, - description => #{en => <<"The Retain flag to be uses when republishing the message." + description => #{en => <<"The Retain flag to be used when republishing the message." " Set to ${flags.retain} to use the original Retain." " Support placeholder variables. Default is false">>, zh => <<"重新发布消息时用的保留消息标识。" diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index f5c379159..663eddd5a 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -405,16 +405,31 @@ t_reset_metrics(_Config) -> ok. t_republish_action(_Config) -> - Qos0Received = emqx_metrics:val('messages.qos0.received'), + TargetQoSList = [-1, 0, 1, 2, <<"${qos}">>], + TargetRetainList = [true, false, <<"${flags.retain}">>], + [[republish_action_test(TargetQoS, TargetRetain) || TargetRetain <- TargetRetainList] + || TargetQoS <- TargetQoSList], + ok. + +republish_action_test(TargetQoS, TargetRetain) -> + {QoSReceivedMetricsName, PubQoS} = + case TargetQoS of + <<"${qos}">> -> {'messages.qos0.received', 0}; + -1 -> {'messages.qos0.received', 0}; + 0 -> {'messages.qos0.received', 0}; + 1 -> {'messages.qos1.received', 1}; + 2 -> {'messages.qos2.received', 2} + end, + QosReceived = emqx_metrics:val(QoSReceivedMetricsName), Received = emqx_metrics:val('messages.received'), ok = emqx_rule_engine:load_providers(), {ok, #rule{id = Id, for = [<<"t1">>]}} = emqx_rule_engine:create_rule( - #{rawsql => <<"select topic, payload, qos from \"t1\"">>, + #{rawsql => <<"select * from \"t1\"">>, actions => [#{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => TargetQoS, + <<"target_retain">> => TargetRetain, <<"payload_tmpl">> => <<"${payload}">>}}], description => <<"builtin-republish-rule">>}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -422,7 +437,7 @@ t_republish_action(_Config) -> {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>, - emqtt:publish(Client, <<"t1">>, Msg, 0), + emqtt:publish(Client, <<"t1">>, Msg, PubQoS), receive {publish, #{topic := <<"t2">>, payload := Payload}} -> ?assertEqual(Msg, Payload) after 1000 -> @@ -430,7 +445,7 @@ t_republish_action(_Config) -> end, emqtt:stop(Client), emqx_rule_registry:remove_rule(Id), - ?assertEqual(2, emqx_metrics:val('messages.qos0.received') - Qos0Received), + ?assertEqual(2, emqx_metrics:val(QoSReceivedMetricsName) - QosReceived), ?assertEqual(2, emqx_metrics:val('messages.received') - Received), ok. @@ -480,8 +495,7 @@ t_crud_rule_api(_Config) -> {<<"params">>,[ {<<"arg1">>,1}, {<<"target_topic">>, <<"t2">>}, - {<<"target_qos">>, <<"0">>}, - {<<"target_retain">>, <<"false">>}, + {<<"target_qos">>, 0}, {<<"payload_tmpl">>, <<"${payload}">>} ]} ]] @@ -1620,8 +1634,7 @@ t_sqlselect_multi_actoins_1(Config) -> #{name => 'crash_action', args => #{}, fallbacks => []}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1647,8 +1660,7 @@ t_sqlselect_multi_actoins_1_1(Config) -> #{name => 'crash_action', args => #{}, fallbacks => []}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1676,8 +1688,7 @@ t_sqlselect_multi_actoins_2(Config) -> #{name => 'crash_action', args => #{}, fallbacks => []}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1708,8 +1719,7 @@ t_sqlselect_multi_actoins_3(Config) -> ]}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1747,8 +1757,7 @@ t_sqlselect_multi_actoins_3_1(Config) -> ]}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -1787,8 +1796,7 @@ t_sqlselect_multi_actoins_4(Config) -> ]}, #{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => <<"clientid=${clientid}">> }, fallbacks => []} @@ -2542,8 +2550,7 @@ create_simple_repub_rule(TargetTopic, SQL, Template) -> #{rawsql => SQL, actions => [#{name => 'republish', args => #{<<"target_topic">> => TargetTopic, - <<"target_qos">> => <<"-1">>, - <<"target_retain">> => <<"false">>, + <<"target_qos">> => -1, <<"payload_tmpl">> => Template} }], description => <<"simple repub rule">>}), From 1ba8ad4c25d4f66f00a72e190a3bd56f3aa37271 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 10 Jun 2022 15:00:57 -0300 Subject: [PATCH 06/11] fix(metrics): inc `connack.auth_error` when using MQTT 3.1 Since MQTT 3.1 uses a different reason code for auth failures, it was failing to increase the corresponding metric that works for MQTT 5.0. --- src/emqx_metrics.erl | 8 ++++++-- test/emqx_broker_SUITE.erl | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 7bec83465..c919e25eb 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -429,8 +429,12 @@ inc_sent(Packet) -> do_inc_sent(?CONNACK_PACKET(ReasonCode)) -> (ReasonCode == ?RC_SUCCESS) orelse inc('packets.connack.error'), - (ReasonCode == ?RC_NOT_AUTHORIZED) andalso inc('packets.connack.auth_error'), - (ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) andalso inc('packets.connack.auth_error'), + ((ReasonCode == ?RC_NOT_AUTHORIZED) + orelse (ReasonCode == ?CONNACK_AUTH)) + andalso inc('packets.connack.auth_error'), + ((ReasonCode == ?RC_BAD_USER_NAME_OR_PASSWORD) + orelse (ReasonCode == ?CONNACK_CREDENTIALS)) + andalso inc('packets.connack.auth_error'), inc('packets.connack.sent'); do_inc_sent(?PUBLISH_PACKET(QoS)) -> diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index e8d19c5c0..ec46ff8e1 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -277,6 +277,38 @@ t_stats_fun({'end', _Config}) -> ok = emqx_broker:unsubscribe(<<"topic">>), ok = emqx_broker:unsubscribe(<<"topic2">>). +t_connack_auth_error({init, Config}) -> + process_flag(trap_exit, true), + emqx_ct_helpers:stop_apps([]), + emqx_ct_helpers:boot_modules(all), + Handler = + fun(emqx) -> + application:set_env(emqx, acl_nomatch, deny), + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + ok; + (_) -> + ok + end, + emqx_ct_helpers:start_apps([], Handler), + Config; +t_connack_auth_error({'end', _Config}) -> + emqx_ct_helpers:stop_apps([]), + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + ok; +t_connack_auth_error(Config) when is_list(Config) -> + %% MQTT 3.1 + ?assertEqual(0, emqx_metrics:val('packets.connack.auth_error')), + {ok, C0} = emqtt:start_link([{proto_ver, v4}]), + ?assertEqual({error, {unauthorized_client, undefined}}, emqtt:connect(C0)), + ?assertEqual(1, emqx_metrics:val('packets.connack.auth_error')), + %% MQTT 5.0 + {ok, C1} = emqtt:start_link([{proto_ver, v5}]), + ?assertEqual({error, {not_authorized, #{}}}, emqtt:connect(C1)), + ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')), + ok. + recv_msgs(Count) -> recv_msgs(Count, []). From 1733f19608ebcdfd7f59c409f945f3ef987bc9c5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 10 Jun 2022 16:15:17 -0300 Subject: [PATCH 07/11] chore: bump version and update changelog --- CHANGES-4.3.md | 5 +++++ src/emqx.app.src | 2 +- src/emqx.appup.src | 6 ++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 00cb8cf1e..3178d817e 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -18,6 +18,11 @@ File format: password-protected private key files used for dashboard and management HTTPS listeners. [#8129] +### Bug-fixes + +- Correctly tally `connack.auth_error` metrics when a client uses MQTT + 3.1. [#8177] + ## v4.3.15 ### Enhancements diff --git a/src/emqx.app.src b/src/emqx.app.src index c57bd635d..5b5017b1a 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -6,7 +6,7 @@ %% the emqx `release' version, which in turn is comprised of several %% apps, one of which is this. See `emqx_release.hrl' for more %% info. - {vsn, "4.3.16"}, % strict semver, bump manually! + {vsn, "4.3.17"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [ kernel diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 844c7fb7a..49fc3abb3 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,7 +1,8 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.15", + [{"4.3.16",[{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, + {"4.3.15", [{add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -555,7 +556,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.15", + [{"4.3.16",[{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, + {"4.3.15", [{delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, From 215286760ac497698c937251c9e7623e8c9ba9d0 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sun, 12 Jun 2022 21:05:36 +0200 Subject: [PATCH 08/11] build: ignore _build sub-dir in release dir --- scripts/update_appup.escript | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index dd8dd9ca2..0ac5084ab 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -55,7 +55,10 @@ app_specific_actions(_) -> []. ignored_apps() -> - [gpb, emqx_dashboard, emqx_management] ++ otp_standard_apps(). + [gpb, %% only a build tool + emqx_dashboard, %% generic appup file for all versions + emqx_management %% generic appup file for all versions + ] ++ otp_standard_apps(). main(Args) -> #{prev_tag := Baseline} = Options = parse_args(Args, default_options()), @@ -530,8 +533,10 @@ contains_contents(File, Upgrade, Downgrade) -> index_apps(ReleaseDir) -> log("INFO: indexing apps in ~s~n", [ReleaseDir]), - Apps0 = maps:from_list([index_app(filename:join(ReleaseDir, AppFile)) || - AppFile <- filelib:wildcard("**/ebin/*.app", ReleaseDir)]), + AppFiles0 = filelib:wildcard("**/ebin/*.app", ReleaseDir), + %% everything in _build sub-dir e.g. cuttlefish/_build should be ignored + AppFiles = lists:filter(fun(File) -> re:run(File, "_build") =:= nomatch end, AppFiles0), + Apps0 = maps:from_list([index_app(filename:join(ReleaseDir, AppFile)) || AppFile <- AppFiles]), maps:without(ignored_apps(), Apps0). index_app(AppFile) -> From 1e02656035f4dd3d0642981c2be6b62bc17298e1 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 13 Jun 2022 10:53:36 +0800 Subject: [PATCH 09/11] fix(rule): better func name for emqx_rule_utils --- .../src/emqx_rule_actions.erl | 4 ++-- apps/emqx_rule_engine/src/emqx_rule_utils.erl | 19 +++++-------------- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 83c13a3af..ae82d56c9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -261,7 +261,7 @@ to_qos(TargetQoS) -> get_qos(-1, _Data, Default) -> Default; get_qos(TargetQoS, Data, _Default) -> - qos(emqx_rule_utils:replace_simple_var(TargetQoS, Data)). + qos(emqx_rule_utils:replace_var(TargetQoS, Data)). qos(<<"0">>) -> 0; qos(<<"1">>) -> 1; @@ -285,7 +285,7 @@ to_retain(TargetRetain) -> end. get_retain(TargetRetain, Data) -> - retain(emqx_rule_utils:replace_simple_var(TargetRetain, Data)). + retain(emqx_rule_utils:replace_var(TargetRetain, Data)). retain(true) -> true; retain(false) -> false; diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 047b9d1af..137e22128 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -16,8 +16,7 @@ -module(emqx_rule_utils). --export([ replace_vars_in_str/2 - , replace_simple_var/2 +-export([ replace_var/2 ]). %% preprocess and process tempalte string with place holders @@ -91,20 +90,12 @@ preproc_tmpl([[Str, Phld]| Tokens], Acc) -> preproc_tmpl([[Str]| Tokens], Acc) -> preproc_tmpl(Tokens, put_head(str, Str, Acc)). -%% Replace a string contains vars to another string in which the placeholders are replace by the -%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: -%% "a: 1". -replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> - proc_tmpl(Tokens, Data, #{return => full_binary}); -replace_vars_in_str(Val, _Data) -> - Val. - %% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result %% value will be an integer 1. -replace_simple_var(Tokens, Data) when is_list(Tokens) -> - [Var] = proc_tmpl(Tokens, Data, #{return => rawlist}), - Var; -replace_simple_var(Val, _Data) -> +replace_var(Tokens, Data) when is_list(Tokens) -> + [Val] = proc_tmpl(Tokens, Data, #{return => rawlist}), + Val; +replace_var(Val, _Data) -> Val. put_head(_Type, <<>>, List) -> List; From 38b7e5fa11d7f79e079ce2285894dfaab2280790 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 13 Jun 2022 15:00:03 +0200 Subject: [PATCH 10/11] chore: bump app vsn for emqx_dashboard --- apps/emqx_management/src/emqx_management.app.src | 2 +- apps/emqx_management/src/emqx_management.appup.src | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_management/src/emqx_management.app.src b/apps/emqx_management/src/emqx_management.app.src index e203c7a56..3d9e6843b 100644 --- a/apps/emqx_management/src/emqx_management.app.src +++ b/apps/emqx_management/src/emqx_management.app.src @@ -1,6 +1,6 @@ {application, emqx_management, [{description, "EMQ X Management API and CLI"}, - {vsn, "4.3.13"}, % strict semver, bump manually! + {vsn, "4.3.14"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_management_sup]}, {applications, [kernel,stdlib,minirest]}, diff --git a/apps/emqx_management/src/emqx_management.appup.src b/apps/emqx_management/src/emqx_management.appup.src index 15aef0463..1463334b4 100644 --- a/apps/emqx_management/src/emqx_management.appup.src +++ b/apps/emqx_management/src/emqx_management.appup.src @@ -1,13 +1,13 @@ %% -*- mode: erlang -*- {VSN, - [ {<<"4\\.3\\.([0-9]|1[0-4])">>, + [ {<<"4\\.3\\.[0-9]+">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} ]}, {<<".*">>, []} ], - [ {<<"4\\.3\\.([0-9]|1[0-4])">>, + [ {<<"4\\.3\\.[0-9]+">>, [ {apply,{minirest,stop_http,['http:management']}}, {apply,{minirest,stop_http,['https:management']}}, {restart_application, emqx_management} From 100aeda83bf5621647723bd57e99cd82e2d56761 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 13 Jun 2022 17:40:41 +0200 Subject: [PATCH 11/11] build: add strict-semver version bump check for apps --- scripts/apps-version-check.sh | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/scripts/apps-version-check.sh b/scripts/apps-version-check.sh index 48cb3218f..a16d6a5ca 100755 --- a/scripts/apps-version-check.sh +++ b/scripts/apps-version-check.sh @@ -8,6 +8,10 @@ bad_app_count=0 no_comment_re='(^[^\s?%])' ## TODO: c source code comments re (in $app_path/c_src dirs) +parse_semver() { + echo "$1" | tr '.|-' ' ' +} + while read -r app; do if [ "$app" != "emqx" ]; then app_path="$app" @@ -15,7 +19,7 @@ while read -r app; do app_path="." fi src_file="$app_path/src/$(basename "$app").app.src" - old_app_version="$(git show "$latest_release":"$src_file" | grep vsn | grep -oE '"[0-9]+.[0-9]+.[0-9]+"' | tr -d '"')" + old_app_version="$(git show "$latest_release":"$src_file" | grep vsn | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')" now_app_version=$(grep -E 'vsn' "$src_file" | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') if [ "$old_app_version" = "$now_app_version" ]; then changed_lines="$(git diff "$latest_release"...HEAD --ignore-blank-lines -G "$no_comment_re" \ @@ -36,6 +40,19 @@ while read -r app; do echo "$src_file needs a vsn bump to ensure plugins loaded after upgrade" bad_app_count=$(( bad_app_count + 1)) fi + else + # shellcheck disable=SC2207 + old_app_version_semver=($(parse_semver "$old_app_version")) + # shellcheck disable=SC2207 + now_app_version_semver=($(parse_semver "$now_app_version")) + if [ "${old_app_version_semver[0]}" = "${now_app_version_semver[0]}" ] && \ + [ "${old_app_version_semver[1]}" = "${now_app_version_semver[1]}" ] && \ + [ "$(( "${old_app_version_semver[2]}" + 1 ))" = "${now_app_version_semver[2]}" ]; then + true + else + echo "$src_file: non-strict semver version bump from $old_app_version to $now_app_version" + bad_app_count=$(( bad_app_count + 1)) + fi fi done < <(./scripts/find-apps.sh)