Merge pull request #8166 from DDDHuang/republish
feat: republish support qos & retain placeholder
This commit is contained in:
commit
fbd179b5f5
|
@ -35,24 +35,47 @@
|
||||||
},
|
},
|
||||||
target_qos => #{
|
target_qos => #{
|
||||||
order => 2,
|
order => 2,
|
||||||
type => number,
|
input => editable_select,
|
||||||
enum => [-1, 0, 1, 2],
|
type => [number, string],
|
||||||
|
enum => [0, 1, 2, <<"${qos}">>],
|
||||||
required => true,
|
required => true,
|
||||||
default => 0,
|
default => 0,
|
||||||
title => #{en => <<"Target QoS">>,
|
title => #{en => <<"Target QoS">>,
|
||||||
zh => <<"目的 QoS"/utf8>>},
|
zh => <<"目的 QoS"/utf8>>},
|
||||||
description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>,
|
description => #{en =>
|
||||||
zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>}
|
<<"The QoS Level to be used when republishing the message."
|
||||||
|
" Support placeholder variables."
|
||||||
|
" Set to ${qos} to use the original QoS. Default is 0">>,
|
||||||
|
zh =>
|
||||||
|
<<"重新发布消息时用的 QoS 级别。"
|
||||||
|
"支持占位符变量,可以填写 ${qos} 来使用原消息的 QoS。默认 0"/utf8>>}
|
||||||
|
},
|
||||||
|
target_retain => #{
|
||||||
|
order => 3,
|
||||||
|
input => editable_select,
|
||||||
|
type => [boolean, string],
|
||||||
|
enum => [true, false, <<"${flags.retain}">>],
|
||||||
|
required => false,
|
||||||
|
default => false,
|
||||||
|
title => #{en => <<"Target Retain">>,
|
||||||
|
zh => <<"目标保留消息标识"/utf8>>},
|
||||||
|
description => #{en => <<"The Retain flag to be used when republishing the message."
|
||||||
|
" Set to ${flags.retain} to use the original Retain."
|
||||||
|
" Support placeholder variables. Default is false">>,
|
||||||
|
zh => <<"重新发布消息时用的保留消息标识。"
|
||||||
|
"支持占位符变量,可以填写 ${flags.retain} 来使用原消息的 Retain。"
|
||||||
|
"默认 false"/utf8>>}
|
||||||
},
|
},
|
||||||
payload_tmpl => #{
|
payload_tmpl => #{
|
||||||
order => 3,
|
order => 4,
|
||||||
type => string,
|
type => string,
|
||||||
input => textarea,
|
input => textarea,
|
||||||
required => false,
|
required => false,
|
||||||
default => <<"${payload}">>,
|
default => <<"${payload}">>,
|
||||||
title => #{en => <<"Payload Template">>,
|
title => #{en => <<"Payload Template">>,
|
||||||
zh => <<"消息内容模板"/utf8>>},
|
zh => <<"消息内容模板"/utf8>>},
|
||||||
description => #{en => <<"The payload template, variable interpolation is supported">>,
|
description => #{en => <<"The payload template, "
|
||||||
|
"variable interpolation is supported">>,
|
||||||
zh => <<"消息内容模板,支持变量"/utf8>>}
|
zh => <<"消息内容模板,支持变量"/utf8>>}
|
||||||
}
|
}
|
||||||
}).
|
}).
|
||||||
|
@ -89,7 +112,8 @@
|
||||||
params => #{},
|
params => #{},
|
||||||
title => #{en => <<"Do Nothing (debug)">>,
|
title => #{en => <<"Do Nothing (debug)">>,
|
||||||
zh => <<"空动作 (调试)"/utf8>>},
|
zh => <<"空动作 (调试)"/utf8>>},
|
||||||
description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>,
|
description => #{en => <<"This action does nothing and never fails. "
|
||||||
|
"It's for debug purpose">>,
|
||||||
zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>}
|
zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -113,7 +137,8 @@ on_resource_create(_Name, Conf) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Action 'inspect'
|
%% Action 'inspect'
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
|
-spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) ->
|
||||||
|
{bindings(), NewParams :: map()}.
|
||||||
on_action_create_inspect(Id, Params) ->
|
on_action_create_inspect(Id, Params) ->
|
||||||
Params.
|
Params.
|
||||||
|
|
||||||
|
@ -129,12 +154,15 @@ on_action_inspect(Selected, Envs) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Action 'republish'
|
%% Action 'republish'
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
|
-spec on_action_create_republish(action_instance_id(), Params :: map()) ->
|
||||||
|
{bindings(), NewParams :: map()}.
|
||||||
on_action_create_republish(Id, Params = #{
|
on_action_create_republish(Id, Params = #{
|
||||||
<<"target_topic">> := TargetTopic,
|
<<"target_topic">> := TargetTopic,
|
||||||
<<"target_qos">> := TargetQoS,
|
<<"target_qos">> := TargetQoS0,
|
||||||
<<"payload_tmpl">> := PayloadTmpl
|
<<"payload_tmpl">> := PayloadTmpl
|
||||||
}) ->
|
}) ->
|
||||||
|
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
|
||||||
|
TargetQoS = to_qos(TargetQoS0),
|
||||||
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
|
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
|
||||||
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
||||||
Params.
|
Params.
|
||||||
|
@ -157,20 +185,21 @@ on_action_republish(Selected, _Envs = #{
|
||||||
'TargetQoS' := TargetQoS,
|
'TargetQoS' := TargetQoS,
|
||||||
'TopicTks' := TopicTks,
|
'TopicTks' := TopicTks,
|
||||||
'PayloadTks' := PayloadTks
|
'PayloadTks' := PayloadTks
|
||||||
}}) ->
|
} = Bindings}) ->
|
||||||
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
|
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
||||||
[TargetTopic, Selected]),
|
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
||||||
increase_and_publish(ActId,
|
Message =
|
||||||
#message{
|
#message{
|
||||||
id = emqx_guid:gen(),
|
id = emqx_guid:gen(),
|
||||||
qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end,
|
qos = get_qos(TargetQoS, Selected, QoS),
|
||||||
from = ActId,
|
from = ActId,
|
||||||
flags = Flags,
|
flags = Flags#{retain => get_retain(TargetRetain, Selected)},
|
||||||
headers = #{republish_by => ActId},
|
headers = #{republish_by => ActId},
|
||||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
||||||
payload = format_msg(PayloadTks, Selected),
|
payload = format_msg(PayloadTks, Selected),
|
||||||
timestamp = Timestamp
|
timestamp = Timestamp
|
||||||
});
|
},
|
||||||
|
increase_and_publish(ActId, Message);
|
||||||
|
|
||||||
%% in case this is not a "message.publish" request
|
%% in case this is not a "message.publish" request
|
||||||
on_action_republish(Selected, _Envs = #{
|
on_action_republish(Selected, _Envs = #{
|
||||||
|
@ -180,27 +209,29 @@ on_action_republish(Selected, _Envs = #{
|
||||||
'TargetQoS' := TargetQoS,
|
'TargetQoS' := TargetQoS,
|
||||||
'TopicTks' := TopicTks,
|
'TopicTks' := TopicTks,
|
||||||
'PayloadTks' := PayloadTks
|
'PayloadTks' := PayloadTks
|
||||||
}}) ->
|
} = Bindings}) ->
|
||||||
?LOG(debug, "[republish] republish to: ~p, Payload: ~p",
|
?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]),
|
||||||
[TargetTopic, Selected]),
|
TargetRetain = maps:get('TargetRetain', Bindings, false),
|
||||||
increase_and_publish(ActId,
|
Message =
|
||||||
#message{
|
#message{
|
||||||
id = emqx_guid:gen(),
|
id = emqx_guid:gen(),
|
||||||
qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end,
|
qos = get_qos(TargetQoS, Selected, 0),
|
||||||
from = ActId,
|
from = ActId,
|
||||||
flags = #{dup => false, retain => false},
|
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
|
||||||
headers = #{republish_by => ActId},
|
headers = #{republish_by => ActId},
|
||||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
||||||
payload = format_msg(PayloadTks, Selected),
|
payload = format_msg(PayloadTks, Selected),
|
||||||
timestamp = erlang:system_time(millisecond)
|
timestamp = erlang:system_time(millisecond)
|
||||||
}).
|
},
|
||||||
|
increase_and_publish(ActId, Message).
|
||||||
|
|
||||||
increase_and_publish(ActId, Msg) ->
|
increase_and_publish(ActId, Msg) ->
|
||||||
_ = emqx_broker:safe_publish(Msg),
|
_ = emqx_broker:safe_publish(Msg),
|
||||||
emqx_rule_metrics:inc_actions_success(ActId),
|
emqx_rule_metrics:inc_actions_success(ActId),
|
||||||
emqx_metrics:inc_msg(Msg).
|
emqx_metrics:inc_msg(Msg).
|
||||||
|
|
||||||
-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}.
|
-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) ->
|
||||||
|
{bindings(), NewParams :: map()}.
|
||||||
on_action_create_do_nothing(ActId, Params) when is_binary(ActId) ->
|
on_action_create_do_nothing(ActId, Params) when is_binary(ActId) ->
|
||||||
Params.
|
Params.
|
||||||
|
|
||||||
|
@ -211,3 +242,57 @@ format_msg([], Data) ->
|
||||||
emqx_json:encode(Data);
|
emqx_json:encode(Data);
|
||||||
format_msg(Tokens, Data) ->
|
format_msg(Tokens, Data) ->
|
||||||
emqx_rule_utils:proc_tmpl(Tokens, Data).
|
emqx_rule_utils:proc_tmpl(Tokens, Data).
|
||||||
|
|
||||||
|
%% -1 for old version.
|
||||||
|
to_qos(<<"-1">>) -> -1;
|
||||||
|
to_qos(-1) -> -1;
|
||||||
|
to_qos(TargetQoS) ->
|
||||||
|
try
|
||||||
|
qos(TargetQoS)
|
||||||
|
catch _:_ ->
|
||||||
|
%% Use placeholder.
|
||||||
|
case emqx_rule_utils:preproc_tmpl(TargetQoS) of
|
||||||
|
Tmpl = [{var, _}] ->
|
||||||
|
Tmpl;
|
||||||
|
_BadQoS ->
|
||||||
|
error({bad_qos, TargetQoS})
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_qos(-1, _Data, Default) -> Default;
|
||||||
|
get_qos(TargetQoS, Data, _Default) ->
|
||||||
|
qos(emqx_rule_utils:replace_var(TargetQoS, Data)).
|
||||||
|
|
||||||
|
qos(<<"0">>) -> 0;
|
||||||
|
qos(<<"1">>) -> 1;
|
||||||
|
qos(<<"2">>) -> 2;
|
||||||
|
qos(0) -> 0;
|
||||||
|
qos(1) -> 1;
|
||||||
|
qos(2) -> 2;
|
||||||
|
qos(BadQoS) -> error({bad_qos, BadQoS}).
|
||||||
|
|
||||||
|
to_retain(TargetRetain) ->
|
||||||
|
try
|
||||||
|
retain(TargetRetain)
|
||||||
|
catch _:_ ->
|
||||||
|
%% Use placeholder.
|
||||||
|
case emqx_rule_utils:preproc_tmpl(TargetRetain) of
|
||||||
|
Tmpl = [{var, _}] ->
|
||||||
|
Tmpl;
|
||||||
|
_BadRetain ->
|
||||||
|
error({bad_retain, TargetRetain})
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_retain(TargetRetain, Data) ->
|
||||||
|
retain(emqx_rule_utils:replace_var(TargetRetain, Data)).
|
||||||
|
|
||||||
|
retain(true) -> true;
|
||||||
|
retain(false) -> false;
|
||||||
|
retain(<<"true">>) -> true;
|
||||||
|
retain(<<"false">>) -> false;
|
||||||
|
retain(<<"1">>) -> true;
|
||||||
|
retain(<<"0">>) -> false;
|
||||||
|
retain(1) -> true;
|
||||||
|
retain(0) -> false;
|
||||||
|
retain(BadRetain) -> error({bad_retain, BadRetain}).
|
||||||
|
|
|
@ -1,9 +1,15 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
[{"4.3.10",
|
||||||
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.9",
|
{"4.3.9",
|
||||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{add_module,emqx_rule_date},
|
{add_module,emqx_rule_date},
|
||||||
|
@ -12,7 +18,9 @@
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{add_module,emqx_rule_date},
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
|
@ -25,7 +33,9 @@
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.7",
|
{"4.3.7",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -38,7 +48,9 @@
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -51,7 +63,9 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -64,7 +78,9 @@
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -77,7 +93,8 @@
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -91,7 +108,8 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -106,7 +124,8 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -121,7 +140,8 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{add_module,emqx_rule_date},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -136,9 +156,15 @@
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.10",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
[{"4.3.10",
|
||||||
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.9",
|
{"4.3.9",
|
||||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
|
@ -147,7 +173,9 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.8",
|
{"4.3.8",
|
||||||
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
|
@ -160,7 +188,9 @@
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.7",
|
{"4.3.7",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
@ -173,7 +203,9 @@
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.6",
|
{"4.3.6",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
@ -186,7 +218,9 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.5",
|
{"4.3.5",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
@ -199,7 +233,9 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.4",
|
{"4.3.4",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
@ -212,7 +248,8 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.3",
|
{"4.3.3",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
@ -226,7 +263,8 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.2",
|
{"4.3.2",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
@ -241,7 +279,8 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.1",
|
{"4.3.1",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
|
@ -256,7 +295,8 @@
|
||||||
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
|
||||||
{delete_module,emqx_rule_date}]},
|
{delete_module,emqx_rule_date}]},
|
||||||
{"4.3.0",
|
{"4.3.0",
|
||||||
[{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_cli,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
|
|
||||||
-module(emqx_rule_utils).
|
-module(emqx_rule_utils).
|
||||||
|
|
||||||
|
-export([ replace_var/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% preprocess and process tempalte string with place holders
|
%% preprocess and process tempalte string with place holders
|
||||||
-export([ preproc_tmpl/1
|
-export([ preproc_tmpl/1
|
||||||
, proc_tmpl/2
|
, proc_tmpl/2
|
||||||
|
@ -87,6 +90,14 @@ preproc_tmpl([[Str, Phld]| Tokens], Acc) ->
|
||||||
preproc_tmpl([[Str]| Tokens], Acc) ->
|
preproc_tmpl([[Str]| Tokens], Acc) ->
|
||||||
preproc_tmpl(Tokens, put_head(str, Str, Acc)).
|
preproc_tmpl(Tokens, put_head(str, Str, Acc)).
|
||||||
|
|
||||||
|
%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
|
||||||
|
%% value will be an integer 1.
|
||||||
|
replace_var(Tokens, Data) when is_list(Tokens) ->
|
||||||
|
[Val] = proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||||
|
Val;
|
||||||
|
replace_var(Val, _Data) ->
|
||||||
|
Val.
|
||||||
|
|
||||||
put_head(_Type, <<>>, List) -> List;
|
put_head(_Type, <<>>, List) -> List;
|
||||||
put_head(Type, Term, List) ->
|
put_head(Type, Term, List) ->
|
||||||
[{Type, Term} | List].
|
[{Type, Term} | List].
|
||||||
|
|
|
@ -84,6 +84,8 @@ validate_spec(ParamsSepc) ->
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
validate_value(Val, #{type := Types} = Spec) when is_list(Types) ->
|
||||||
|
validate_types(Val, Types, Spec);
|
||||||
validate_value(Val, #{enum := Enum}) ->
|
validate_value(Val, #{enum := Enum}) ->
|
||||||
validate_enum(Val, Enum);
|
validate_enum(Val, Enum);
|
||||||
validate_value(Val, #{type := object} = Spec) ->
|
validate_value(Val, #{type := object} = Spec) ->
|
||||||
|
@ -91,6 +93,15 @@ validate_value(Val, #{type := object} = Spec) ->
|
||||||
validate_value(Val, #{type := Type} = Spec) ->
|
validate_value(Val, #{type := Type} = Spec) ->
|
||||||
validate_type(Val, Type, Spec).
|
validate_type(Val, Type, Spec).
|
||||||
|
|
||||||
|
validate_types(Val, [], _Spec) ->
|
||||||
|
throw({invalid_data_type, Val});
|
||||||
|
validate_types(Val, [Type | Types], Spec) ->
|
||||||
|
try
|
||||||
|
validate_type(Val, Type, Spec)
|
||||||
|
catch _:_ ->
|
||||||
|
validate_types(Val, Types, Spec)
|
||||||
|
end.
|
||||||
|
|
||||||
validate_type(Val, file, _Spec) ->
|
validate_type(Val, file, _Spec) ->
|
||||||
validate_file(Val);
|
validate_file(Val);
|
||||||
validate_type(Val, String, Spec) when String =:= string;
|
validate_type(Val, String, Spec) when String =:= string;
|
||||||
|
@ -157,6 +168,9 @@ do_validate_spec(Name, #{type := array} = Spec) ->
|
||||||
fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});
|
fun (not_found) -> throw({required_field_missing, {items, {in, Name}}});
|
||||||
(Items) -> do_validate_spec(Name, Items)
|
(Items) -> do_validate_spec(Name, Items)
|
||||||
end);
|
end);
|
||||||
|
do_validate_spec(_Name, #{type := Types}) when is_list(Types) ->
|
||||||
|
_ = [ok = supported_data_type(Type, ?DATA_TYPES) || Type <- Types],
|
||||||
|
ok;
|
||||||
do_validate_spec(_Name, #{type := Type}) ->
|
do_validate_spec(_Name, #{type := Type}) ->
|
||||||
_ = supported_data_type(Type, ?DATA_TYPES);
|
_ = supported_data_type(Type, ?DATA_TYPES);
|
||||||
|
|
||||||
|
|
|
@ -405,15 +405,31 @@ t_reset_metrics(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_republish_action(_Config) ->
|
t_republish_action(_Config) ->
|
||||||
Qos0Received = emqx_metrics:val('messages.qos0.received'),
|
TargetQoSList = [-1, 0, 1, 2, <<"${qos}">>],
|
||||||
|
TargetRetainList = [true, false, <<"${flags.retain}">>],
|
||||||
|
[[republish_action_test(TargetQoS, TargetRetain) || TargetRetain <- TargetRetainList]
|
||||||
|
|| TargetQoS <- TargetQoSList],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
republish_action_test(TargetQoS, TargetRetain) ->
|
||||||
|
{QoSReceivedMetricsName, PubQoS} =
|
||||||
|
case TargetQoS of
|
||||||
|
<<"${qos}">> -> {'messages.qos0.received', 0};
|
||||||
|
-1 -> {'messages.qos0.received', 0};
|
||||||
|
0 -> {'messages.qos0.received', 0};
|
||||||
|
1 -> {'messages.qos1.received', 1};
|
||||||
|
2 -> {'messages.qos2.received', 2}
|
||||||
|
end,
|
||||||
|
QosReceived = emqx_metrics:val(QoSReceivedMetricsName),
|
||||||
Received = emqx_metrics:val('messages.received'),
|
Received = emqx_metrics:val('messages.received'),
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
{ok, #rule{id = Id, for = [<<"t1">>]}} =
|
{ok, #rule{id = Id, for = [<<"t1">>]}} =
|
||||||
emqx_rule_engine:create_rule(
|
emqx_rule_engine:create_rule(
|
||||||
#{rawsql => <<"select topic, payload, qos from \"t1\"">>,
|
#{rawsql => <<"select * from \"t1\"">>,
|
||||||
actions => [#{name => 'republish',
|
actions => [#{name => 'republish',
|
||||||
args => #{<<"target_topic">> => <<"t2">>,
|
args => #{<<"target_topic">> => <<"t2">>,
|
||||||
<<"target_qos">> => -1,
|
<<"target_qos">> => TargetQoS,
|
||||||
|
<<"target_retain">> => TargetRetain,
|
||||||
<<"payload_tmpl">> => <<"${payload}">>}}],
|
<<"payload_tmpl">> => <<"${payload}">>}}],
|
||||||
description => <<"builtin-republish-rule">>}),
|
description => <<"builtin-republish-rule">>}),
|
||||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
|
@ -421,7 +437,7 @@ t_republish_action(_Config) ->
|
||||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||||
|
|
||||||
Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>,
|
Msg = <<"{\"id\": 1, \"name\": \"ha\"}">>,
|
||||||
emqtt:publish(Client, <<"t1">>, Msg, 0),
|
emqtt:publish(Client, <<"t1">>, Msg, PubQoS),
|
||||||
receive {publish, #{topic := <<"t2">>, payload := Payload}} ->
|
receive {publish, #{topic := <<"t2">>, payload := Payload}} ->
|
||||||
?assertEqual(Msg, Payload)
|
?assertEqual(Msg, Payload)
|
||||||
after 1000 ->
|
after 1000 ->
|
||||||
|
@ -429,7 +445,7 @@ t_republish_action(_Config) ->
|
||||||
end,
|
end,
|
||||||
emqtt:stop(Client),
|
emqtt:stop(Client),
|
||||||
emqx_rule_registry:remove_rule(Id),
|
emqx_rule_registry:remove_rule(Id),
|
||||||
?assertEqual(2, emqx_metrics:val('messages.qos0.received') - Qos0Received),
|
?assertEqual(2, emqx_metrics:val(QoSReceivedMetricsName) - QosReceived),
|
||||||
?assertEqual(2, emqx_metrics:val('messages.received') - Received),
|
?assertEqual(2, emqx_metrics:val('messages.received') - Received),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -479,7 +495,7 @@ t_crud_rule_api(_Config) ->
|
||||||
{<<"params">>,[
|
{<<"params">>,[
|
||||||
{<<"arg1">>,1},
|
{<<"arg1">>,1},
|
||||||
{<<"target_topic">>, <<"t2">>},
|
{<<"target_topic">>, <<"t2">>},
|
||||||
{<<"target_qos">>, -1},
|
{<<"target_qos">>, 0},
|
||||||
{<<"payload_tmpl">>, <<"${payload}">>}
|
{<<"payload_tmpl">>, <<"${payload}">>}
|
||||||
]}
|
]}
|
||||||
]]
|
]]
|
||||||
|
|
Loading…
Reference in New Issue