fix(republish): action input editable_select
This commit is contained in:
parent
7d848950c7
commit
e1e2fd50fd
|
@ -35,10 +35,11 @@
|
||||||
},
|
},
|
||||||
target_qos => #{
|
target_qos => #{
|
||||||
order => 2,
|
order => 2,
|
||||||
type => editable_select,
|
input => editable_select,
|
||||||
enum => [<<"0">>, <<"1">>, <<"2">>],
|
type => [number, string],
|
||||||
|
enum => [0, 1, 2, <<"${qos}">>],
|
||||||
required => true,
|
required => true,
|
||||||
default => <<"0">>,
|
default => 0,
|
||||||
title => #{en => <<"Target QoS">>,
|
title => #{en => <<"Target QoS">>,
|
||||||
zh => <<"目的 QoS"/utf8>>},
|
zh => <<"目的 QoS"/utf8>>},
|
||||||
description => #{en =>
|
description => #{en =>
|
||||||
|
@ -48,20 +49,24 @@
|
||||||
" Or other variable, value is 0 or 1 or 2">>,
|
" Or other variable, value is 0 or 1 or 2">>,
|
||||||
zh =>
|
zh =>
|
||||||
<<"重新发布消息时用的 QoS 级别。"
|
<<"重新发布消息时用的 QoS 级别。"
|
||||||
"支持占位符变量,可以使用 ${qos} 来使用原消息的 QoS,"
|
"支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS,"
|
||||||
"或其他值为 0 或 1 或 2 的变量。"/utf8>>}
|
"或其他值为 0 或 1 或 2 的变量。"/utf8>>}
|
||||||
},
|
},
|
||||||
target_retain => #{
|
target_retain => #{
|
||||||
order => 3,
|
order => 3,
|
||||||
type => editable_select,
|
input => editable_select,
|
||||||
enum => [<<"true">>, <<"false">>],
|
type => [boolean, string],
|
||||||
|
enum => [true, false, <<"${flags.retain}">>],
|
||||||
required => false,
|
required => false,
|
||||||
default => <<"false">>,
|
default => false,
|
||||||
title => #{en => <<"Target Retain">>,
|
title => #{en => <<"Target Retain">>,
|
||||||
zh => <<"目标保留消息标识"/utf8>>},
|
zh => <<"目标保留消息标识"/utf8>>},
|
||||||
description => #{en => <<"The Retain flag to be uses when republishing the message."
|
description => #{en => <<"The Retain flag to be uses when republishing the message."
|
||||||
|
" Set to ${flags.retain} to use the original Retain."
|
||||||
" Support placeholder variables. Default is false">>,
|
" Support placeholder variables. Default is false">>,
|
||||||
zh => <<"重新发布消息时用的保留消息标识,支持占位符变量。默认 false"/utf8>>}
|
zh => <<"重新发布消息时用的保留消息标识。"
|
||||||
|
"支持占位符变量,可以填写 ${flags.retain} 来使用原消息的 Retain。"
|
||||||
|
"默认 false"/utf8>>}
|
||||||
},
|
},
|
||||||
payload_tmpl => #{
|
payload_tmpl => #{
|
||||||
order => 4,
|
order => 4,
|
||||||
|
@ -158,8 +163,8 @@ on_action_create_republish(Id, Params = #{
|
||||||
<<"target_qos">> := TargetQoS0,
|
<<"target_qos">> := TargetQoS0,
|
||||||
<<"payload_tmpl">> := PayloadTmpl
|
<<"payload_tmpl">> := PayloadTmpl
|
||||||
}) ->
|
}) ->
|
||||||
{ok, TargetRetain} = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
|
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
|
||||||
{ok, TargetQoS} = to_qos(TargetQoS0),
|
TargetQoS = to_qos(TargetQoS0),
|
||||||
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
|
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
|
||||||
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
||||||
Params.
|
Params.
|
||||||
|
@ -182,28 +187,21 @@ on_action_republish(Selected, _Envs = #{
|
||||||
'TargetQoS' := TargetQoS,
|
'TargetQoS' := TargetQoS,
|
||||||
'TopicTks' := TopicTks,
|
'TopicTks' := TopicTks,
|
||||||
'PayloadTks' := PayloadTks
|
'PayloadTks' := PayloadTks
|
||||||
}} = Bindings) ->
|
} = Bindings}) ->
|
||||||
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
||||||
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
||||||
case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of
|
Message =
|
||||||
{{ok, RQoS}, {ok, Retain}} when is_integer(RQoS) andalso is_boolean(Retain) ->
|
#message{
|
||||||
Message =
|
id = emqx_guid:gen(),
|
||||||
#message{
|
qos = get_qos(TargetQoS, Selected, QoS),
|
||||||
id = emqx_guid:gen(),
|
from = ActId,
|
||||||
qos = if TargetQoS =:= -1 -> QoS; true -> RQoS end,
|
flags = Flags#{retain => get_retain(TargetRetain, Selected)},
|
||||||
from = ActId,
|
headers = #{republish_by => ActId},
|
||||||
flags = Flags#{retain => Retain},
|
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
||||||
headers = #{republish_by => ActId},
|
payload = format_msg(PayloadTks, Selected),
|
||||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
timestamp = Timestamp
|
||||||
payload = format_msg(PayloadTks, Selected),
|
},
|
||||||
timestamp = Timestamp
|
increase_and_publish(ActId, Message);
|
||||||
},
|
|
||||||
increase_and_publish(ActId, Message);
|
|
||||||
Error ->
|
|
||||||
emqx_rule_metrics:inc_actions_error(ActId),
|
|
||||||
_ = log_error(Error),
|
|
||||||
{badact, bad_qos_retain}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% in case this is not a "message.publish" request
|
%% in case this is not a "message.publish" request
|
||||||
on_action_republish(Selected, _Envs = #{
|
on_action_republish(Selected, _Envs = #{
|
||||||
|
@ -216,32 +214,18 @@ on_action_republish(Selected, _Envs = #{
|
||||||
} = Bindings}) ->
|
} = Bindings}) ->
|
||||||
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
||||||
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
||||||
case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of
|
Message =
|
||||||
{{ok, QoS}, {ok, Retain}} when is_integer(QoS) andalso is_boolean(Retain) ->
|
#message{
|
||||||
Message =
|
id = emqx_guid:gen(),
|
||||||
#message{
|
qos = get_qos(TargetQoS, Selected, 0),
|
||||||
id = emqx_guid:gen(),
|
from = ActId,
|
||||||
qos = QoS,
|
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
|
||||||
from = ActId,
|
headers = #{republish_by => ActId},
|
||||||
flags = #{dup => false, retain => Retain},
|
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
||||||
headers = #{republish_by => ActId},
|
payload = format_msg(PayloadTks, Selected),
|
||||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
timestamp = erlang:system_time(millisecond)
|
||||||
payload = format_msg(PayloadTks, Selected),
|
},
|
||||||
timestamp = erlang:system_time(millisecond)
|
increase_and_publish(ActId, Message).
|
||||||
},
|
|
||||||
increase_and_publish(ActId, Message);
|
|
||||||
Errors ->
|
|
||||||
emqx_rule_metrics:inc_actions_error(ActId),
|
|
||||||
_ = log_error(Errors),
|
|
||||||
{badact, bad_qos_retain}
|
|
||||||
end.
|
|
||||||
|
|
||||||
log_error({{ok, _}, RetainError}) ->
|
|
||||||
?LOG(error, "[republish] invalid retain: ~p", [RetainError]);
|
|
||||||
log_error({QosError, {ok, _}}) ->
|
|
||||||
?LOG(error, "[republish] invalid qos: ~p", [QosError]);
|
|
||||||
log_error({QosError, RetainError}) ->
|
|
||||||
?LOG(error, "[republish] invalid qos: ~p invalid retain: ~p", [QosError, RetainError]).
|
|
||||||
|
|
||||||
increase_and_publish(ActId, Msg) ->
|
increase_and_publish(ActId, Msg) ->
|
||||||
_ = emqx_broker:safe_publish(Msg),
|
_ = emqx_broker:safe_publish(Msg),
|
||||||
|
@ -261,53 +245,56 @@ format_msg([], Data) ->
|
||||||
format_msg(Tokens, Data) ->
|
format_msg(Tokens, Data) ->
|
||||||
emqx_rule_utils:proc_tmpl(Tokens, Data).
|
emqx_rule_utils:proc_tmpl(Tokens, Data).
|
||||||
|
|
||||||
|
%% -1 for old version.
|
||||||
|
to_qos(<<"-1">>) -> -1;
|
||||||
|
to_qos(-1) -> -1;
|
||||||
to_qos(TargetQoS) ->
|
to_qos(TargetQoS) ->
|
||||||
case get_qos(TargetQoS) of
|
try
|
||||||
{ok, QoS} ->
|
qos(TargetQoS)
|
||||||
{ok, QoS};
|
catch _:_ ->
|
||||||
_Error ->
|
%% Use placeholder.
|
||||||
case emqx_rule_utils:preproc_tmpl(TargetQoS) of
|
case emqx_rule_utils:preproc_tmpl(TargetQoS) of
|
||||||
Tmpl = [{var, _}] ->
|
Tmpl = [{var, _}] ->
|
||||||
{ok, Tmpl};
|
Tmpl;
|
||||||
_ ->
|
_BadQoS ->
|
||||||
{error, bad_qos}
|
error({bad_qos, TargetQoS})
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_qos(Tmpl, Data) ->
|
get_qos(-1, _Data, Default) -> Default;
|
||||||
get_qos(emqx_rule_utils:replace_simple_var(Tmpl, Data)).
|
get_qos(TargetQoS, Data, _Default) ->
|
||||||
|
qos(emqx_rule_utils:replace_simple_var(TargetQoS, Data)).
|
||||||
|
|
||||||
get_qos(<<"-1">>) -> {ok, 0};
|
qos(<<"0">>) -> 0;
|
||||||
get_qos(<<"0">>) -> {ok, 0};
|
qos(<<"1">>) -> 1;
|
||||||
get_qos(<<"1">>) -> {ok, 1};
|
qos(<<"2">>) -> 2;
|
||||||
get_qos(<<"2">>) -> {ok, 2};
|
qos(0) -> 0;
|
||||||
get_qos(0) -> {ok, 0};
|
qos(1) -> 1;
|
||||||
get_qos(1) -> {ok, 1};
|
qos(2) -> 2;
|
||||||
get_qos(2) -> {ok, 2};
|
qos(BadQoS) -> error({bad_qos, BadQoS}).
|
||||||
get_qos(_) -> {error, bad_qos}.
|
|
||||||
|
|
||||||
to_retain(TargetRetain) ->
|
to_retain(TargetRetain) ->
|
||||||
case get_retain(TargetRetain) of
|
try
|
||||||
{ok, Retain} ->
|
retain(TargetRetain)
|
||||||
{ok, Retain};
|
catch _:_ ->
|
||||||
_Error ->
|
%% Use placeholder.
|
||||||
case emqx_rule_utils:preproc_tmpl(TargetRetain) of
|
case emqx_rule_utils:preproc_tmpl(TargetRetain) of
|
||||||
Tmpl = [{var, _}] ->
|
Tmpl = [{var, _}] ->
|
||||||
{ok, Tmpl};
|
Tmpl;
|
||||||
_ ->
|
_BadRetain ->
|
||||||
{error, bad_retain}
|
error({bad_retain, TargetRetain})
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_retain(Tmpl, Data) ->
|
get_retain(TargetRetain, Data) ->
|
||||||
get_retain(emqx_rule_utils:replace_simple_var(Tmpl, Data)).
|
retain(emqx_rule_utils:replace_simple_var(TargetRetain, Data)).
|
||||||
|
|
||||||
get_retain(true) -> {ok, true};
|
retain(true) -> true;
|
||||||
get_retain(false) -> {ok, false};
|
retain(false) -> false;
|
||||||
get_retain(<<"true">>) -> {ok, true};
|
retain(<<"true">>) -> true;
|
||||||
get_retain(<<"false">>) -> {ok, false};
|
retain(<<"false">>) -> false;
|
||||||
get_retain(<<"1">>) -> {ok, true};
|
retain(<<"1">>) -> true;
|
||||||
get_retain(<<"0">>) -> {ok, false};
|
retain(<<"0">>) -> false;
|
||||||
get_retain(1) -> {ok, true};
|
retain(1) -> true;
|
||||||
get_retain(0) -> {ok, false};
|
retain(0) -> false;
|
||||||
get_retain(_) -> {error, bad_retain}.
|
retain(BadRetain) -> error({bad_retain, BadRetain}).
|
||||||
|
|
|
@ -47,7 +47,6 @@
|
||||||
, array
|
, array
|
||||||
, file
|
, file
|
||||||
, cfgselect %% TODO: [5.0] refactor this
|
, cfgselect %% TODO: [5.0] refactor this
|
||||||
, editable_select
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -85,9 +84,8 @@ validate_spec(ParamsSepc) ->
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% Validate editable_select first, because editable_select has enum selection.
|
validate_value(Val, #{type := Types} = Spec) when is_list(Types) ->
|
||||||
validate_value(Val, #{type := editable_select}) ->
|
validate_types(Val, Types, Spec);
|
||||||
Val;
|
|
||||||
validate_value(Val, #{enum := Enum}) ->
|
validate_value(Val, #{enum := Enum}) ->
|
||||||
validate_enum(Val, Enum);
|
validate_enum(Val, Enum);
|
||||||
validate_value(Val, #{type := object} = Spec) ->
|
validate_value(Val, #{type := object} = Spec) ->
|
||||||
|
@ -95,6 +93,15 @@ validate_value(Val, #{type := object} = Spec) ->
|
||||||
validate_value(Val, #{type := Type} = Spec) ->
|
validate_value(Val, #{type := Type} = Spec) ->
|
||||||
validate_type(Val, Type, Spec).
|
validate_type(Val, Type, Spec).
|
||||||
|
|
||||||
|
validate_types(Val, [], _Spec) ->
|
||||||
|
throw({invalid_data_type, Val});
|
||||||
|
validate_types(Val, [Type | Types], Spec) ->
|
||||||
|
try
|
||||||
|
validate_type(Val, Type, Spec)
|
||||||
|
catch _:_ ->
|
||||||
|
validate_types(Val, Types, Spec)
|
||||||
|
end.
|
||||||
|
|
||||||
validate_type(Val, file, _Spec) ->
|
validate_type(Val, file, _Spec) ->
|
||||||
validate_file(Val);
|
validate_file(Val);
|
||||||
validate_type(Val, String, Spec) when String =:= string;
|
validate_type(Val, String, Spec) when String =:= string;
|
||||||
|
@ -161,6 +168,9 @@ do_validate_spec(Name, #{type := array} = Spec) ->
|
||||||
fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});
|
fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});
|
||||||
(Items) -> do_validate_spec(Name, Items)
|
(Items) -> do_validate_spec(Name, Items)
|
||||||
end);
|
end);
|
||||||
|
do_validate_spec(_Name, #{type := Types}) when is_list(Types) ->
|
||||||
|
_ = [ok = supported_data_type(Type, ?DATA_TYPES) || Type <- Types],
|
||||||
|
ok;
|
||||||
do_validate_spec(_Name, #{type := Type}) ->
|
do_validate_spec(_Name, #{type := Type}) ->
|
||||||
_ = supported_data_type(Type, ?DATA_TYPES);
|
_ = supported_data_type(Type, ?DATA_TYPES);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue