From e1e2fd50fd4f142c9820e50083c315b9b9682881 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Fri, 10 Jun 2022 16:08:33 +0800 Subject: [PATCH] fix(republish): action input editable_select --- .../src/emqx_rule_actions.erl | 181 ++++++++---------- .../src/emqx_rule_validator.erl | 18 +- 2 files changed, 98 insertions(+), 101 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 1ecf134cb..aa5739da2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -35,10 +35,11 @@ }, target_qos => #{ order => 2, - type => editable_select, - enum => [<<"0">>, <<"1">>, <<"2">>], + input => editable_select, + type => [number, string], + enum => [0, 1, 2, <<"${qos}">>], required => true, - default => <<"0">>, + default => 0, title => #{en => <<"Target QoS">>, zh => <<"目的 QoS"/utf8>>}, description => #{en => @@ -48,20 +49,24 @@ " Or other variable, value is 0 or 1 or 2">>, zh => <<"重新发布消息时用的 QoS 级别。" - "支持占位符变量,可以使用 ${qos} 来使用原消息的 QoS," + "支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS," "或其他值为 0 或 1 或 2 的变量。"/utf8>>} }, target_retain => #{ order => 3, - type => editable_select, - enum => [<<"true">>, <<"false">>], + input => editable_select, + type => [boolean, string], + enum => [true, false, <<"${flags.retain}">>], required => false, - default => <<"false">>, + default => false, title => #{en => <<"Target Retain">>, zh => <<"目标保留消息标识"/utf8>>}, description => #{en => <<"The Retain flag to be uses when republishing the message." + " Set to ${flags.retain} to use the original Retain." " Support placeholder variables. Default is false">>, - zh => <<"重新发布消息时用的保留消息标识,支持占位符变量。默认 false"/utf8>>} + zh => <<"重新发布消息时用的保留消息标识。" + "支持占位符变量,可以填写 ${flags.retain} 来使用原消息的 Retain。" + "默认 false"/utf8>>} }, payload_tmpl => #{ order => 4, @@ -158,8 +163,8 @@ on_action_create_republish(Id, Params = #{ <<"target_qos">> := TargetQoS0, <<"payload_tmpl">> := PayloadTmpl }) -> - {ok, TargetRetain} = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), - {ok, TargetQoS} = to_qos(TargetQoS0), + TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), + TargetQoS = to_qos(TargetQoS0), TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -182,28 +187,21 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - }} = Bindings) -> + } = Bindings}) -> ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), TargetRetain = maps:get('TargetRetain', Bindings, false), - case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of - {{ok, RQoS}, {ok, Retain}} when is_integer(RQoS) andalso is_boolean(Retain) -> - Message = - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> RQoS end, - from = ActId, - flags = Flags#{retain => Retain}, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = Timestamp - }, - increase_and_publish(ActId, Message); - Error -> - emqx_rule_metrics:inc_actions_error(ActId), - _ = log_error(Error), - {badact, bad_qos_retain} - end; + Message = + #message{ + id = emqx_guid:gen(), + qos = get_qos(TargetQoS, Selected, QoS), + from = ActId, + flags = Flags#{retain => get_retain(TargetRetain, Selected)}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = Timestamp + }, + increase_and_publish(ActId, Message); %% in case this is not a "message.publish" request on_action_republish(Selected, _Envs = #{ @@ -216,32 +214,18 @@ on_action_republish(Selected, _Envs = #{ } = Bindings}) -> ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), TargetRetain = maps:get('TargetRetain', Bindings, false), - case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of - {{ok, QoS}, {ok, Retain}} when is_integer(QoS) andalso is_boolean(Retain) -> - Message = - #message{ - id = emqx_guid:gen(), - qos = QoS, - from = ActId, - flags = #{dup => false, retain => Retain}, - headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), - payload = format_msg(PayloadTks, Selected), - timestamp = erlang:system_time(millisecond) - }, - increase_and_publish(ActId, Message); - Errors -> - emqx_rule_metrics:inc_actions_error(ActId), - _ = log_error(Errors), - {badact, bad_qos_retain} - end. - -log_error({{ok, _}, RetainError}) -> - ?LOG(error, "[republish] invalid retain: ~p", [RetainError]); -log_error({QosError, {ok, _}}) -> - ?LOG(error, "[republish] invalid qos: ~p", [QosError]); -log_error({QosError, RetainError}) -> - ?LOG(error, "[republish] invalid qos: ~p invalid retain: ~p", [QosError, RetainError]). + Message = + #message{ + id = emqx_guid:gen(), + qos = get_qos(TargetQoS, Selected, 0), + from = ActId, + flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, + headers = #{republish_by => ActId}, + topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + payload = format_msg(PayloadTks, Selected), + timestamp = erlang:system_time(millisecond) + }, + increase_and_publish(ActId, Message). increase_and_publish(ActId, Msg) -> _ = emqx_broker:safe_publish(Msg), @@ -261,53 +245,56 @@ format_msg([], Data) -> format_msg(Tokens, Data) -> emqx_rule_utils:proc_tmpl(Tokens, Data). +%% -1 for old version. +to_qos(<<"-1">>) -> -1; +to_qos(-1) -> -1; to_qos(TargetQoS) -> - case get_qos(TargetQoS) of - {ok, QoS} -> - {ok, QoS}; - _Error -> - case emqx_rule_utils:preproc_tmpl(TargetQoS) of - Tmpl = [{var, _}] -> - {ok, Tmpl}; - _ -> - {error, bad_qos} - end + try + qos(TargetQoS) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetQoS) of + Tmpl = [{var, _}] -> + Tmpl; + _BadQoS -> + error({bad_qos, TargetQoS}) + end end. -get_qos(Tmpl, Data) -> - get_qos(emqx_rule_utils:replace_simple_var(Tmpl, Data)). +get_qos(-1, _Data, Default) -> Default; +get_qos(TargetQoS, Data, _Default) -> + qos(emqx_rule_utils:replace_simple_var(TargetQoS, Data)). -get_qos(<<"-1">>) -> {ok, 0}; -get_qos(<<"0">>) -> {ok, 0}; -get_qos(<<"1">>) -> {ok, 1}; -get_qos(<<"2">>) -> {ok, 2}; -get_qos(0) -> {ok, 0}; -get_qos(1) -> {ok, 1}; -get_qos(2) -> {ok, 2}; -get_qos(_) -> {error, bad_qos}. +qos(<<"0">>) -> 0; +qos(<<"1">>) -> 1; +qos(<<"2">>) -> 2; +qos(0) -> 0; +qos(1) -> 1; +qos(2) -> 2; +qos(BadQoS) -> error({bad_qos, BadQoS}). to_retain(TargetRetain) -> - case get_retain(TargetRetain) of - {ok, Retain} -> - {ok, Retain}; - _Error -> - case emqx_rule_utils:preproc_tmpl(TargetRetain) of - Tmpl = [{var, _}] -> - {ok, Tmpl}; - _ -> - {error, bad_retain} - end + try + retain(TargetRetain) + catch _:_ -> + %% Use placeholder. + case emqx_rule_utils:preproc_tmpl(TargetRetain) of + Tmpl = [{var, _}] -> + Tmpl; + _BadRetain -> + error({bad_retain, TargetRetain}) + end end. -get_retain(Tmpl, Data) -> - get_retain(emqx_rule_utils:replace_simple_var(Tmpl, Data)). +get_retain(TargetRetain, Data) -> + retain(emqx_rule_utils:replace_simple_var(TargetRetain, Data)). -get_retain(true) -> {ok, true}; -get_retain(false) -> {ok, false}; -get_retain(<<"true">>) -> {ok, true}; -get_retain(<<"false">>) -> {ok, false}; -get_retain(<<"1">>) -> {ok, true}; -get_retain(<<"0">>) -> {ok, false}; -get_retain(1) -> {ok, true}; -get_retain(0) -> {ok, false}; -get_retain(_) -> {error, bad_retain}. +retain(true) -> true; +retain(false) -> false; +retain(<<"true">>) -> true; +retain(<<"false">>) -> false; +retain(<<"1">>) -> true; +retain(<<"0">>) -> false; +retain(1) -> true; +retain(0) -> false; +retain(BadRetain) -> error({bad_retain, BadRetain}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl index 57d5eb465..237e5dfba 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_validator.erl @@ -47,7 +47,6 @@ , array , file , cfgselect %% TODO: [5.0] refactor this - , editable_select ]). %%------------------------------------------------------------------------------ @@ -85,9 +84,8 @@ validate_spec(ParamsSepc) -> %% Internal Functions %%------------------------------------------------------------------------------ -%% Validate editable_select first, because editable_select has enum selection. -validate_value(Val, #{type := editable_select}) -> - Val; +validate_value(Val, #{type := Types} = Spec) when is_list(Types) -> + validate_types(Val, Types, Spec); validate_value(Val, #{enum := Enum}) -> validate_enum(Val, Enum); validate_value(Val, #{type := object} = Spec) -> @@ -95,6 +93,15 @@ validate_value(Val, #{type := object} = Spec) -> validate_value(Val, #{type := Type} = Spec) -> validate_type(Val, Type, Spec). +validate_types(Val, [], _Spec) -> + throw({invalid_data_type, Val}); +validate_types(Val, [Type | Types], Spec) -> + try + validate_type(Val, Type, Spec) + catch _:_ -> + validate_types(Val, Types, Spec) + end. + validate_type(Val, file, _Spec) -> validate_file(Val); validate_type(Val, String, Spec) when String =:= string; @@ -161,6 +168,9 @@ do_validate_spec(Name, #{type := array} = Spec) -> fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); (Items) -> do_validate_spec(Name, Items) end); +do_validate_spec(_Name, #{type := Types}) when is_list(Types) -> + _ = [ok = supported_data_type(Type, ?DATA_TYPES) || Type <- Types], + ok; do_validate_spec(_Name, #{type := Type}) -> _ = supported_data_type(Type, ?DATA_TYPES);