diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index edeffbde4..ae82d56c9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,24 +35,47 @@ }, target_qos => #{ order => 2, - type => number, - enum => [-1, 0, 1, 2], + input => editable_select, + type => [number, string], + enum => [0, 1, 2, <<"${qos}">>], required => true, default => 0, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, - description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>, - zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>} + description => #{en => + <<"The QoS Level to be used when republishing the message." + " Support placeholder variables." + " Set to ${qos} to use the original QoS. Default is 0">>, + zh => + <<"重新发布消息时用的 QoS 级别。" + "支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS。默认 0"/utf8>>} + }, + target_retain => #{ + order => 3, + input => editable_select, + type => [boolean, string], + enum => [true, false, <<"${flags.retain}">>], + required => false, + default => false, + title => #{en => <<"Target Retain">>, + zh => <<"目标保留消息标识"/utf8>>}, + description => #{en => <<"The Retain flag to be used when republishing the message." + " Set to ${flags.retain} to use the original Retain." + " Support placeholder variables. Default is false">>, + zh => <<"重新发布消息时用的保留消息标识。" + "支持占位符变量,可以填写 ${flags.retain} 来使用原消息的 Retain。" + "默认 false"/utf8>>} }, payload_tmpl => #{ - order => 3, + order => 4, type => string, input => textarea, required => false, default => <<"${payload}">>, title => #{en => <<"Payload Template">>, zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported">>, + description => #{en => <<"The payload template, " + "variable interpolation is supported">>, zh => <<"消息内容模板,支持变量"/utf8>>} } }). @@ -89,7 +112,8 @@ params => #{}, title => #{en => <<"Do Nothing (debug)">>, zh => <<"空动作 (调试)"/utf8>>}, - description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>, + description => #{en => <<"This action does nothing and never fails. " + "It's for debug purpose">>, zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>} }). @@ -113,7 +137,8 @@ on_resource_create(_Name, Conf) -> %%------------------------------------------------------------------------------ %% Action 'inspect' %%------------------------------------------------------------------------------ --spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_inspect(Id, Params) -> Params. @@ -129,12 +154,15 @@ on_action_inspect(Selected, Envs) -> %%------------------------------------------------------------------------------ %% Action 'republish' %%------------------------------------------------------------------------------ --spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_republish(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_republish(Id, Params = #{ <<"target_topic">> := TargetTopic, - <<"target_qos">> := TargetQoS, + <<"target_qos">> := TargetQoS0, <<"payload_tmpl">> := PayloadTmpl }) -> + TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), + TargetQoS = to_qos(TargetQoS0), TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -157,20 +185,21 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, + } = Bindings}) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + Message = #message{ id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end, + qos = get_qos(TargetQoS, Selected, QoS), from = ActId, - flags = Flags, + flags = Flags#{retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), payload = format_msg(PayloadTks, Selected), timestamp = Timestamp - }); + }, + increase_and_publish(ActId, Message); %% in case this is not a "message.publish" request on_action_republish(Selected, _Envs = #{ @@ -180,27 +209,29 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, + } = Bindings}) -> + ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + TargetRetain = maps:get('TargetRetain', Bindings, false), + Message = #message{ id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end, + qos = get_qos(TargetQoS, Selected, 0), from = ActId, - flags = #{dup => false, retain => false}, + flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), payload = format_msg(PayloadTks, Selected), timestamp = erlang:system_time(millisecond) - }). + }, + increase_and_publish(ActId, Message). increase_and_publish(ActId, Msg) -> _ = emqx_broker:safe_publish(Msg), emqx_rule_metrics:inc_actions_success(ActId), emqx_metrics:inc_msg(Msg). --spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. +-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> + {bindings(), NewParams :: map()}. on_action_create_do_nothing(ActId, Params) when is_binary(ActId) -> Params. @@ -211,3 +242,57 @@ format_msg([], Data) -> emqx_json:encode(Data); format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). + +%% -1 for old version. +to_qos(<<"-1">>) -> -1; +to_qos(-1) -> -1; +to_qos(TargetQoS) -> + try + qos(TargetQoS) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetQoS) of + Tmpl = [{var, _}] -> + Tmpl; + _BadQoS -> + error({bad_qos, TargetQoS}) + end + end. + +get_qos(-1, _Data, Default) -> Default; +get_qos(TargetQoS, Data, _Default) -> + qos(emqx_rule_utils:replace_var(TargetQoS, Data)). + +qos(<<"0">>) -> 0; +qos(<<"1">>) -> 1; +qos(<<"2">>) -> 2; +qos(0) -> 0; +qos(1) -> 1; +qos(2) -> 2; +qos(BadQoS) -> error({bad_qos, BadQoS}). + +to_retain(TargetRetain) -> + try + retain(TargetRetain) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetRetain) of + Tmpl = [{var, _}] -> + Tmpl; + _BadRetain -> + error({bad_retain, TargetRetain}) + end + end. + +get_retain(TargetRetain, Data) -> + retain(emqx_rule_utils:replace_var(TargetRetain, Data)). + +retain(true) -> true; +retain(false) -> false; +retain(<<"true">>) -> true; +retain(<<"false">>) -> false; +retain(<<"1">>) -> true; +retain(<<"0">>) -> false; +retain(1) -> true; +retain(0) -> false; +retain(BadRetain) -> error({bad_retain, BadRetain}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 7b4049254..b92d67e4f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,9 +1,15 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -12,7 +18,9 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, @@ -25,7 +33,9 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -38,7 +48,9 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -51,7 +63,9 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -64,7 +78,9 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -77,7 +93,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -91,7 +108,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -106,7 +124,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -121,7 +140,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{add_module,emqx_rule_date}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -136,9 +156,15 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.10", + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -147,7 +173,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, @@ -160,7 +188,9 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.7", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, @@ -173,7 +203,9 @@ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.6", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -186,7 +218,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.5", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -199,7 +233,9 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.4", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -212,7 +248,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.3", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -226,7 +263,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.2", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -241,7 +279,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.1", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -256,7 +295,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.0", - [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index d287f1ad0..137e22128 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -16,6 +16,9 @@ -module(emqx_rule_utils). +-export([ replace_var/2 + ]). + %% preprocess and process tempalte string with place holders -export([ preproc_tmpl/1 , proc_tmpl/2 @@ -87,6 +90,14 @@ preproc_tmpl([[Str, Phld]| Tokens], Acc) -> preproc_tmpl([[Str]| Tokens], Acc) -> preproc_tmpl(Tokens, put_head(str, Str, Acc)). +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +replace_var(Tokens, Data) when is_list(Tokens) -> + [Val] = proc_tmpl(Tokens, Data, #{return => rawlist}), + Val; +replace_var(Val, _Data) -> + Val. + put_head(_Type, <<>>, List) -> List; put_head(Type, Term, List) -> [{Type, Term} | List]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl index e32ec66ab..237e5dfba 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_validator.erl @@ -84,6 +84,8 @@ validate_spec(ParamsSepc) -> %% Internal Functions %%------------------------------------------------------------------------------ +validate_value(Val, #{type := Types} = Spec) when is_list(Types) -> + validate_types(Val, Types, Spec); validate_value(Val, #{enum := Enum}) -> validate_enum(Val, Enum); validate_value(Val, #{type := object} = Spec) -> @@ -91,6 +93,15 @@ validate_value(Val, #{type := object} = Spec) -> validate_value(Val, #{type := Type} = Spec) -> validate_type(Val, Type, Spec). +validate_types(Val, [], _Spec) -> + throw({invalid_data_type, Val}); +validate_types(Val, [Type | Types], Spec) -> + try + validate_type(Val, Type, Spec) + catch _:_ -> + validate_types(Val, Types, Spec) + end. + validate_type(Val, file, _Spec) -> validate_file(Val); validate_type(Val, String, Spec) when String =:= string; @@ -157,6 +168,9 @@ do_validate_spec(Name, #{type := array} = Spec) -> fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); (Items) -> do_validate_spec(Name, Items) end); +do_validate_spec(_Name, #{type := Types}) when is_list(Types) -> + _ = [ok = supported_data_type(Type, ?DATA_TYPES) || Type <- Types], + ok; do_validate_spec(_Name, #{type := Type}) -> _ = supported_data_type(Type, ?DATA_TYPES); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 6263c8440..663eddd5a 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -405,15 +405,31 @@ t_reset_metrics(_Config) -> ok. t_republish_action(_Config) -> - Qos0Received = emqx_metrics:val('messages.qos0.received'), + TargetQoSList = [-1, 0, 1, 2, <<"${qos}">>], + TargetRetainList = [true, false, <<"${flags.retain}">>], + [[republish_action_test(TargetQoS, TargetRetain) || TargetRetain <- TargetRetainList] + || TargetQoS <- TargetQoSList], + ok. + +republish_action_test(TargetQoS, TargetRetain) -> + {QoSReceivedMetricsName, PubQoS} = + case TargetQoS of + <<"${qos}">> -> {'messages.qos0.received', 0}; + -1 -> {'messages.qos0.received', 0}; + 0 -> {'messages.qos0.received', 0}; + 1 -> {'messages.qos1.received', 1}; + 2 -> {'messages.qos2.received', 2} + end, + QosReceived = emqx_metrics:val(QoSReceivedMetricsName), Received = emqx_metrics:val('messages.received'), ok = emqx_rule_engine:load_providers(), {ok, #rule{id = Id, for = [<<"t1">>]}} = emqx_rule_engine:create_rule( - #{rawsql => <<"select topic, payload, qos from \"t1\"">>, + #{rawsql => <<"select * from \"t1\"">>, actions => [#{name => 'republish', args => #{<<"target_topic">> => <<"t2">>, - <<"target_qos">> => -1, + <<"target_qos">> => TargetQoS, + <<"target_retain">> => TargetRetain, <<"payload_tmpl">> => <<"${payload}">>}}], description => <<"builtin-republish-rule">>}), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -421,7 +437,7 @@ t_republish_action(_Config) -> {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>, - emqtt:publish(Client, <<"t1">>, Msg, 0), + emqtt:publish(Client, <<"t1">>, Msg, PubQoS), receive {publish, #{topic := <<"t2">>, payload := Payload}} -> ?assertEqual(Msg, Payload) after 1000 -> @@ -429,7 +445,7 @@ t_republish_action(_Config) -> end, emqtt:stop(Client), emqx_rule_registry:remove_rule(Id), - ?assertEqual(2, emqx_metrics:val('messages.qos0.received') - Qos0Received), + ?assertEqual(2, emqx_metrics:val(QoSReceivedMetricsName) - QosReceived), ?assertEqual(2, emqx_metrics:val('messages.received') - Received), ok. @@ -479,7 +495,7 @@ t_crud_rule_api(_Config) -> {<<"params">>,[ {<<"arg1">>,1}, {<<"target_topic">>, <<"t2">>}, - {<<"target_qos">>, -1}, + {<<"target_qos">>, 0}, {<<"payload_tmpl">>, <<"${payload}">>} ]} ]]