feat: republish support qos & retain placeholder

This commit is contained in:
DDDHuang 2022-06-09 14:37:27 +08:00
parent b305c90b9c
commit 6b1da3bcc8
2 changed files with 167 additions and 54 deletions

View File

@ -35,24 +35,40 @@
},
target_qos => #{
order => 2,
type => number,
enum => [-1, 0, 1, 2],
type => string,
required => true,
default => 0,
default => <<"0">>,
title => #{en => <<"Target QoS">>,
zh => <<"目的 QoS"/utf8>>},
description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>,
zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>}
description => #{en =>
<<"The QoS Level to be uses when republishing the message."
" Set to -1 to use the original QoS."
" Support placeholder variables.">>,
zh =>
<<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS。"
"支持占位符变量"/utf8>>}
},
target_retain => #{
order => 3,
type => string,
required => true,
default => <<"false">>,
title => #{en => <<"Target Retain">>,
zh => <<"目标保留消息标识"/utf8>>},
description => #{en => <<"The Retain flag to be uses when republishing the message."
" Support placeholder variables. Default is false">>,
zh => <<"重新发布消息时用的保留消息标识,支持占位符变量。默认 false"/utf8>>}
},
payload_tmpl => #{
order => 3,
order => 4,
type => string,
input => textarea,
required => false,
default => <<"${payload}">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported">>,
description => #{en => <<"The payload template, "
"variable interpolation is supported">>,
zh => <<"消息内容模板,支持变量"/utf8>>}
}
}).
@ -89,7 +105,8 @@
params => #{},
title => #{en => <<"Do Nothing (debug)">>,
zh => <<"空动作 (调试)"/utf8>>},
description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>,
description => #{en => <<"This action does nothing and never fails. "
"It's for debug purpose">>,
zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>}
}).
@ -113,7 +130,8 @@ on_resource_create(_Name, Conf) ->
%%------------------------------------------------------------------------------
%% Action 'inspect'
%%------------------------------------------------------------------------------
-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) ->
{bindings(), NewParams :: map()}.
on_action_create_inspect(Id, Params) ->
Params.
@ -129,12 +147,15 @@ on_action_inspect(Selected, Envs) ->
%%------------------------------------------------------------------------------
%% Action 'republish'
%%------------------------------------------------------------------------------
-spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-spec on_action_create_republish(action_instance_id(), Params :: map()) ->
{bindings(), NewParams :: map()}.
on_action_create_republish(Id, Params = #{
<<"target_topic">> := TargetTopic,
<<"target_qos">> := TargetQoS,
<<"target_qos">> := TargetQoS0,
<<"payload_tmpl">> := PayloadTmpl
}) ->
{ok, TargetRetain} = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
{ok, TargetQoS} = to_qos(TargetQoS0),
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
Params.
@ -157,20 +178,28 @@ on_action_republish(Selected, _Envs = #{
'TargetQoS' := TargetQoS,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
}}) ->
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
[TargetTopic, Selected]),
increase_and_publish(ActId,
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end,
from = ActId,
flags = Flags,
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = format_msg(PayloadTks, Selected),
timestamp = Timestamp
});
}} = Bindings) ->
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
TargetRetain = maps:get('TargetRetain', Bindings, false),
case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of
{{ok, RQoS}, {ok, Retain}} when is_integer(RQoS) andalso is_boolean(Retain) ->
Message =
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> QoS; true -> RQoS end,
from = ActId,
flags = Flags#{retain => Retain},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = format_msg(PayloadTks, Selected),
timestamp = Timestamp
},
increase_and_publish(ActId, Message);
Error ->
emqx_rule_metrics:inc_actions_error(ActId),
_ = log_error(Error),
{badact, bad_qos_retain}
end;
%% in case this is not a "message.publish" request
on_action_republish(Selected, _Envs = #{
@ -180,27 +209,43 @@ on_action_republish(Selected, _Envs = #{
'TargetQoS' := TargetQoS,
'TopicTks' := TopicTks,
'PayloadTks' := PayloadTks
}}) ->
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
[TargetTopic, Selected]),
increase_and_publish(ActId,
#message{
id = emqx_guid:gen(),
qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end,
from = ActId,
flags = #{dup => false, retain => false},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = format_msg(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
}).
} = Bindings}) ->
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
TargetRetain = maps:get('TargetRetain', Bindings, false),
case {get_qos(TargetQoS, Selected), get_retain(TargetRetain, Selected)} of
{{ok, QoS}, {ok, Retain}} when is_integer(QoS) andalso is_boolean(Retain) ->
Message =
#message{
id = emqx_guid:gen(),
qos = QoS,
from = ActId,
flags = #{dup => false, retain => Retain},
headers = #{republish_by => ActId},
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
payload = format_msg(PayloadTks, Selected),
timestamp = erlang:system_time(millisecond)
},
increase_and_publish(ActId, Message);
Error ->
emqx_rule_metrics:inc_actions_error(ActId),
_ = log_error(Error),
{badact, bad_qos_retain}
end.
log_error({{ok, _}, RetainError}) ->
?LOG(error, "[republish] invalid retain: ~p", [RetainError]);
log_error({QosError, {ok, _}}) ->
?LOG(error, "[republish] invalid qos: ~p", [QosError]);
log_error({QosError, RetainError}) ->
?LOG(error, "[republish] invalid qos: ~p invalid retain: ~p", [QosError, RetainError]).
increase_and_publish(ActId, Msg) ->
_ = emqx_broker:safe_publish(Msg),
emqx_rule_metrics:inc_actions_success(ActId),
emqx_metrics:inc_msg(Msg).
-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) ->
{bindings(), NewParams :: map()}.
on_action_create_do_nothing(ActId, Params) when is_binary(ActId) ->
Params.
@ -211,3 +256,55 @@ format_msg([], Data) ->
emqx_json:encode(Data);
format_msg(Tokens, Data) ->
emqx_rule_utils:proc_tmpl(Tokens, Data).
get_qos(-1, _Data) -> {ok, 0};
get_qos(0, _Data) -> {ok, 0};
get_qos(1, _Data) -> {ok, 1};
get_qos(2, _Data) -> {ok, 2};
get_qos({path, Path}, Data) ->
to_qos(emqx_rule_maps:nested_get({path, Path}, Data, 0)).
to_qos(0) -> {ok, 0};
to_qos(1) -> {ok, 1};
to_qos(2) -> {ok, 2};
to_qos(<<"-1">>) -> {ok, 0};
to_qos(<<"0">>) -> {ok, 0};
to_qos(<<"1">>) -> {ok, 1};
to_qos(<<"2">>) -> {ok, 2};
to_qos(TargetQoS) ->
case parse_value_or_placeholder(TargetQoS) of
{path, P} ->
{ok, {path, P}};
_ ->
{error, bad_qos}
end.
get_retain(false, _Data) -> {ok, false};
get_retain(true, _Data) -> {ok, true};
get_retain({path, Path}, Data) ->
to_retain(emqx_rule_maps:nested_get({path, Path}, Data, true)).
to_retain(true) -> {ok, true};
to_retain(false) -> {ok, false};
to_retain(<<"true">>) -> {ok, true};
to_retain(<<"false">>) -> {ok, false};
to_retain(<<"1">>) -> {ok, true};
to_retain(<<"0">>) -> {ok, false};
to_retain(1) -> {ok, true};
to_retain(0) -> {ok, false};
to_retain(TargetRetain) ->
case parse_value_or_placeholder(TargetRetain) of
{path, P} ->
{ok, {path, P}};
_ ->
{error, bad_retain}
end.
parse_value_or_placeholder(ValueOrPlaceholder) ->
case re:run(ValueOrPlaceholder, "^\\$\{.+\}$") of
nomatch ->
ValueOrPlaceholder;
{match, [{0, Length}]} ->
Placeholder = binary:part(ValueOrPlaceholder, 2, Length - 3),
{path, [{key, Key} || Key <- string:lexemes(Placeholder, ". ")]}
end.

View File

@ -1,9 +1,12 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
[{"4.3.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
@ -12,7 +15,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
@ -25,7 +29,8 @@
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -38,7 +43,8 @@
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -51,7 +57,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -64,7 +71,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{add_module,emqx_rule_date},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -136,9 +144,12 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
[{"4.3.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
@ -147,7 +158,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.8",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -160,7 +172,8 @@
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.7",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
@ -173,7 +186,8 @@
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.6",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -186,7 +200,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.5",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -199,7 +214,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.4",
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},