diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index edeffbde4..ea94bff22 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,24 +35,40 @@ }, target_qos => #{ order => 2, - type => number, - enum => [-1, 0, 1, 2], + type => string, required => true, - default => 0, + default => <<"0">>, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, - description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>, - zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>} + description => #{en => + <<"The QoS Level to be uses when republishing the message." + " Set to -1 to use the original QoS." + " Support placeholder variables.">>, + zh => + <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS。" + "支持占位符变量"/utf8>>} + }, + target_retain => #{ + order => 3, + type => string, + required => true, + default => <<"false">>, + title => #{en => <<"Target Retain">>, + zh => <<"目标保留消息标识"/utf8>>}, + description => #{en => <<"The Retain flag to be uses when republishing the message." + " Support placeholder variables. Default is false">>, + zh => <<"重新发布消息时用的保留消息标识,支持占位符变量。默认 false"/utf8>>} }, payload_tmpl => #{ - order => 3, + order => 4, type => string, input => textarea, required => false, default => <<"${payload}">>, title => #{en => <<"Payload Template">>, zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported">>, + description => #{en => <<"The payload template, " + "variable interpolation is supported">>, zh => <<"消息内容模板,支持变量"/utf8>>} } }). @@ -89,7 +105,8 @@ params => #{}, title => #{en => <<"Do Nothing (debug)">>, zh => <<"空动作 (调试)"/utf8>>}, - description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>, + description => #{en => <<"This action does nothing and never fails. " + "It's for debug purpose">>, zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>} }). @@ -113,7 +130,8 @@ on_resource_create(_Name, Conf) -> %%------------------------------------------------------------------------------ %% Action 'inspect' %%------------------------------------------------------------------------------ --spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_inspect(Id, Params) -> Params. @@ -129,12 +147,15 @@ on_action_inspect(Selected, Envs) -> %%------------------------------------------------------------------------------ %% Action 'republish' %%------------------------------------------------------------------------------ --spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_republish(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_republish(Id, Params = #{ <<"target_topic">> := TargetTopic, - <<"target_qos">> := TargetQoS, + <<"target_qos">> := TargetQoS0, <<"payload_tmpl">> := PayloadTmpl }) -> + {ok, TargetRetain} = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), + {ok, TargetQoS} = to_qos(TargetQoS0), TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -157,20 +178,28 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end, - from = ActId, - flags = Flags, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = Timestamp - }); + }} = Bindings) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of + {{ok, RQoS}, {ok, Retain}} when is_integer(RQoS) andalso is_boolean(Retain) -> + Message = + #message{ + id = emqx_guid:gen(), + qos = if TargetQoS =:= -1 -> QoS; true -> RQoS end, + from = ActId, + flags = Flags#{retain => Retain}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = Timestamp + }, + increase_and_publish(ActId, Message); + Error -> + emqx_rule_metrics:inc_actions_error(ActId), + _ = log_error(Error), + {badact, bad_qos_retain} + end; %% in case this is not a "message.publish" request on_action_republish(Selected, _Envs = #{ @@ -180,27 +209,43 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end, - from = ActId, - flags = #{dup => false, retain => false}, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = erlang:system_time(millisecond) - }). + } = Bindings}) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of + {{ok, QoS}, {ok, Retain}} when is_integer(QoS) andalso is_boolean(Retain) -> + Message = + #message{ + id = emqx_guid:gen(), + qos = QoS, + from = ActId, + flags = #{dup => false, retain => Retain}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = erlang:system_time(millisecond) + }, + increase_and_publish(ActId, Message); + Error -> + emqx_rule_metrics:inc_actions_error(ActId), + _ = log_error(Error), + {badact, bad_qos_retain} + end. + +log_error({{ok, _}, RetainError}) -> + ?LOG(error, "[republish] invalid retain: ~p", [RetainError]); +log_error({QosError, {ok, _}}) -> + ?LOG(error, "[republish] invalid qos: ~p", [QosError]); +log_error({QosError, RetainError}) -> + ?LOG(error, "[republish] invalid qos: ~p invalid retain: ~p", [QosError, RetainError]). increase_and_publish(ActId, Msg) -> _ = emqx_broker:safe_publish(Msg), emqx_rule_metrics:inc_actions_success(ActId), emqx_metrics:inc_msg(Msg). --spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_do_nothing(ActId, Params) when is_binary(ActId) -> Params. @@ -211,3 +256,55 @@ format_msg([], Data) -> emqx_json:encode(Data); format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). + +get_qos(-1, _Data) -> {ok, 0}; +get_qos(0, _Data) -> {ok, 0}; +get_qos(1, _Data) -> {ok, 1}; +get_qos(2, _Data) -> {ok, 2}; +get_qos({path, Path}, Data) -> + to_qos(emqx_rule_maps:nested_get({path, Path}, Data, 0)). + +to_qos(0) -> {ok, 0}; +to_qos(1) -> {ok, 1}; +to_qos(2) -> {ok, 2}; +to_qos(<<"-1">>) -> {ok, 0}; +to_qos(<<"0">>) -> {ok, 0}; +to_qos(<<"1">>) -> {ok, 1}; +to_qos(<<"2">>) -> {ok, 2}; +to_qos(TargetQoS) -> + case parse_value_or_placeholder(TargetQoS) of + {path, P} -> + {ok, {path, P}}; + _ -> + {error, bad_qos} + end. + +get_retain(false, _Data) -> {ok, false}; +get_retain(true, _Data) -> {ok, true}; +get_retain({path, Path}, Data) -> + to_retain(emqx_rule_maps:nested_get({path, Path}, Data, true)). + +to_retain(true) -> {ok, true}; +to_retain(false) -> {ok, false}; +to_retain(<<"true">>) -> {ok, true}; +to_retain(<<"false">>) -> {ok, false}; +to_retain(<<"1">>) -> {ok, true}; +to_retain(<<"0">>) -> {ok, false}; +to_retain(1) -> {ok, true}; +to_retain(0) -> {ok, false}; +to_retain(TargetRetain) -> + case parse_value_or_placeholder(TargetRetain) of + {path, P} -> + {ok, {path, P}}; + _ -> + {error, bad_retain} + end. + +parse_value_or_placeholder(ValueOrPlaceholder) -> + case re:run(ValueOrPlaceholder, "^\\$\{.+\}$") of + nomatch -> + ValueOrPlaceholder; + {match, [{0, Length}]} -> + Placeholder = binary:part(ValueOrPlaceholder, 2, Length - 3), + {path, [{key, Key} || Key <- string:lexemes(Placeholder, ". ")]} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 7b4049254..5202b88b3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,9 +1,12 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -12,7 +15,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -25,7 +29,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -38,7 +43,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -51,7 +57,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -64,7 +71,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -136,9 +144,12 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -147,7 +158,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -160,7 +172,8 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, @@ -173,7 +186,8 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -186,7 +200,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -199,7 +214,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},