refactor(rule): rename outputs -> actions
This commit is contained in:
parent
0333e6f860
commit
71de9616d3
|
@ -52,7 +52,7 @@ HTTP Bridge 的 URL。</br>
|
|||
en: """
|
||||
The MQTT topic filter to be forwarded to the HTTP server. All MQTT 'PUBLISH' messages with the topic
|
||||
matching the local_topic will be forwarded.</br>
|
||||
NOTE: if this bridge is used as the output of a rule (EMQX rule engine), and also local_topic is
|
||||
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
|
||||
configured, then both the data got from the rule and the MQTT messages that match local_topic
|
||||
will be forwarded.
|
||||
"""
|
||||
|
|
|
@ -350,7 +350,7 @@ Ingress 模式定义了这个 bridge 如何从远程 MQTT broker 接收消息,
|
|||
en: """
|
||||
The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br>
|
||||
Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>
|
||||
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded.
|
||||
NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded.
|
||||
"""
|
||||
zh: """
|
||||
Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。</br>
|
||||
|
|
|
@ -296,7 +296,7 @@ egress_desc() ->
|
|||
"The egress config defines how this bridge forwards messages from the local broker to the remote\n"
|
||||
"broker.</br>\n"
|
||||
"Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>\n"
|
||||
"NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic\n"
|
||||
"NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic\n"
|
||||
"is configured, then both the data got from the rule and the MQTT messages that matches\n"
|
||||
"local_topic will be forwarded.\n".
|
||||
|
||||
|
|
|
@ -610,7 +610,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|||
#{
|
||||
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
|
||||
<<"enable">> => true,
|
||||
<<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
|
||||
<<"actions">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
|
||||
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
|
||||
}
|
||||
),
|
||||
|
@ -653,14 +653,14 @@ t_ingress_mqtt_bridge_with_rules(_) ->
|
|||
<<"matched.rate">> := _,
|
||||
<<"matched.rate.max">> := _,
|
||||
<<"matched.rate.last5m">> := _,
|
||||
<<"outputs.total">> := 1,
|
||||
<<"outputs.success">> := 1,
|
||||
<<"outputs.failed">> := 0,
|
||||
<<"outputs.failed.out_of_service">> := 0,
|
||||
<<"outputs.failed.unknown">> := 0
|
||||
<<"actions.total">> := 1,
|
||||
<<"actions.success">> := 1,
|
||||
<<"actions.failed">> := 0,
|
||||
<<"actions.failed.out_of_service">> := 0,
|
||||
<<"actions.failed.unknown">> := 0
|
||||
}
|
||||
} = jsx:decode(Rule1),
|
||||
%% we also check if the outputs of the rule is triggered
|
||||
%% we also check if the actions of the rule is triggered
|
||||
?assertMatch(
|
||||
#{
|
||||
inspect := #{
|
||||
|
@ -709,7 +709,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|||
#{
|
||||
<<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>,
|
||||
<<"enable">> => true,
|
||||
<<"outputs">> => [BridgeIDEgress],
|
||||
<<"actions">> => [BridgeIDEgress],
|
||||
<<"sql">> => <<"SELECT * from \"t/1\"">>
|
||||
}
|
||||
),
|
||||
|
@ -760,11 +760,11 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|||
<<"matched.rate">> := _,
|
||||
<<"matched.rate.max">> := _,
|
||||
<<"matched.rate.last5m">> := _,
|
||||
<<"outputs.total">> := 1,
|
||||
<<"outputs.success">> := 1,
|
||||
<<"outputs.failed">> := 0,
|
||||
<<"outputs.failed.out_of_service">> := 0,
|
||||
<<"outputs.failed.unknown">> := 0
|
||||
<<"actions.total">> := 1,
|
||||
<<"actions.success">> := 1,
|
||||
<<"actions.failed">> := 0,
|
||||
<<"actions.failed.out_of_service">> := 0,
|
||||
<<"actions.failed.unknown">> := 0
|
||||
}
|
||||
} = jsx:decode(Rule1),
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
|
|
|
@ -741,7 +741,7 @@ setup_fake_rule_engine_data() ->
|
|||
#{
|
||||
id => <<"rule:t_get_basic_usage_info:1">>,
|
||||
sql => <<"select 1 from topic">>,
|
||||
outputs =>
|
||||
actions =>
|
||||
[
|
||||
#{function => <<"erlang:hibernate">>, args => #{}},
|
||||
#{function => console},
|
||||
|
@ -755,7 +755,7 @@ setup_fake_rule_engine_data() ->
|
|||
#{
|
||||
id => <<"rule:t_get_basic_usage_info:2">>,
|
||||
sql => <<"select 1 from topic">>,
|
||||
outputs =>
|
||||
actions =>
|
||||
[
|
||||
<<"mqtt:my_mqtt_bridge">>,
|
||||
<<"http:my_http_bridge">>
|
||||
|
@ -767,7 +767,7 @@ setup_fake_rule_engine_data() ->
|
|||
#{
|
||||
id => <<"rule:t_get_basic_usage_info:3">>,
|
||||
sql => <<"select 1 from \"$bridges/mqtt:mqtt_in\"">>,
|
||||
outputs =>
|
||||
actions =>
|
||||
[
|
||||
#{function => console}
|
||||
]
|
||||
|
|
|
@ -8,7 +8,7 @@ rule_engine {
|
|||
# description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'"
|
||||
# enable = true
|
||||
# sql = "SELECT * FROM \"t/1\""
|
||||
# outputs = [
|
||||
# actions = [
|
||||
# {
|
||||
# function = republish
|
||||
# args = {
|
||||
|
|
|
@ -451,57 +451,57 @@ emqx_rule_api_schema {
|
|||
}
|
||||
}
|
||||
|
||||
metrics_outputs_total {
|
||||
metrics_actions_total {
|
||||
desc {
|
||||
en: "How much times the outputs are called by the rule. This value may several times of 'matched', depending on the number of the outputs of the rule."
|
||||
en: "How much times the actions are called by the rule. This value may several times of 'matched', depending on the number of the actions of the rule."
|
||||
zh: "规则调用输出的次数。 该值可能是“sql.matched”的几倍,具体取决于规则输出的数量。"
|
||||
}
|
||||
label: {
|
||||
en: "Output Total"
|
||||
en: "Action Total"
|
||||
zh: "调用输出次数"
|
||||
}
|
||||
}
|
||||
|
||||
metrics_outputs_success {
|
||||
metrics_actions_success {
|
||||
desc {
|
||||
en: "How much times the rule success to call the outputs."
|
||||
en: "How much times the rule success to call the actions."
|
||||
zh: "规则成功调用输出的次数。"
|
||||
}
|
||||
label: {
|
||||
en: "Success Output"
|
||||
en: "Success Action"
|
||||
zh: "成功调用输出次数"
|
||||
}
|
||||
}
|
||||
|
||||
metrics_outputs_failed {
|
||||
metrics_actions_failed {
|
||||
desc {
|
||||
en: "How much times the rule failed to call the outputs."
|
||||
en: "How much times the rule failed to call the actions."
|
||||
zh: "规则调用输出失败的次数。"
|
||||
}
|
||||
label: {
|
||||
en: "Failed Output"
|
||||
en: "Failed Action"
|
||||
zh: "调用输出失败次数"
|
||||
}
|
||||
}
|
||||
|
||||
metrics_outputs_failed_out_of_service {
|
||||
metrics_actions_failed_out_of_service {
|
||||
desc {
|
||||
en: "How much times the rule failed to call outputs due to the output is out of service. For example, a bridge is disabled or stopped."
|
||||
en: "How much times the rule failed to call actions due to the action is out of service. For example, a bridge is disabled or stopped."
|
||||
zh: "由于输出停止服务而导致规则调用输出失败的次数。 例如,桥接被禁用或停止。"
|
||||
}
|
||||
label: {
|
||||
en: "Fail Output"
|
||||
en: "Fail Action"
|
||||
zh: "调用输出失败次数"
|
||||
}
|
||||
}
|
||||
|
||||
metrics_outputs_failed_unknown {
|
||||
metrics_actions_failed_unknown {
|
||||
desc {
|
||||
en: "How much times the rule failed to call outputs due to to an unknown error."
|
||||
en: "How much times the rule failed to call actions due to to an unknown error."
|
||||
zh: "由于未知错误,规则调用输出失败的次数。"
|
||||
}
|
||||
label: {
|
||||
en: "Fail Output"
|
||||
en: "Fail Action"
|
||||
zh: "调用输出失败次数"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,21 +28,21 @@ Example: <code>SELECT * FROM "test/topic" WHERE payload.x = 1</code>
|
|||
}
|
||||
}
|
||||
|
||||
rules_outputs {
|
||||
rules_actions {
|
||||
desc {
|
||||
en: """
|
||||
A list of outputs of the rule.
|
||||
An output can be a string that refers to the channel ID of an EMQX bridge, or an object
|
||||
A list of actions of the rule.
|
||||
An action can be a string that refers to the channel ID of an EMQX bridge, or an object
|
||||
that refers to a function.
|
||||
There a some built-in functions like "republish" and "console", and we also support user
|
||||
provided functions in the format: "{module}:{function}".
|
||||
The outputs in the list are executed sequentially.
|
||||
This means that if one of the output is executing slowly, all the following outputs will not
|
||||
The actions in the list are executed sequentially.
|
||||
This means that if one of the action is executing slowly, all the following actions will not
|
||||
be executed until it returns.
|
||||
If one of the output crashed, all other outputs come after it will still be executed, in the
|
||||
If one of the action crashed, all other actions come after it will still be executed, in the
|
||||
original order.
|
||||
If there's any error when running an output, there will be an error message, and the 'failure'
|
||||
counter of the function output or the bridge channel will increase.
|
||||
If there's any error when running an action, there will be an error message, and the 'failure'
|
||||
counter of the function action or the bridge channel will increase.
|
||||
"""
|
||||
zh: """
|
||||
规则的动作列表。
|
||||
|
@ -94,7 +94,7 @@ counter of the function output or the bridge channel will increase.
|
|||
|
||||
console_function {
|
||||
desc {
|
||||
en: """Print the outputs to the console"""
|
||||
en: """Print the actions to the console"""
|
||||
zh: "将输出打印到控制台"
|
||||
}
|
||||
label: {
|
||||
|
@ -111,12 +111,12 @@ Where {module} is the Erlang callback module and {function} is the Erlang functi
|
|||
|
||||
To write your own function, checkout the function <code>console</code> and
|
||||
<code>republish</code> in the source file:
|
||||
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.
|
||||
<code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> as an example.
|
||||
"""
|
||||
zh: """
|
||||
用户提供的函数。 格式应为:'{module}:{function}'。
|
||||
其中 {module} 是 Erlang 回调模块, {function} 是 Erlang 函数。
|
||||
要编写自己的函数,请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。
|
||||
要编写自己的函数,请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。
|
||||
"""
|
||||
}
|
||||
label: {
|
||||
|
@ -130,11 +130,11 @@ To write your own function, checkout the function <code>console</code> and
|
|||
en: """
|
||||
The args will be passed as the 3rd argument to module:function/3,
|
||||
checkout the function <code>console</code> and <code>republish</code> in the source file:
|
||||
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example.
|
||||
<code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> as an example.
|
||||
"""
|
||||
zh: """
|
||||
用户提供的参数将作为函数 module:function/3 的第三个参数,
|
||||
请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。
|
||||
请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。
|
||||
"""
|
||||
}
|
||||
label: {
|
||||
|
@ -272,9 +272,9 @@ of the rule, then the string "undefined" is used.
|
|||
}
|
||||
}
|
||||
|
||||
desc_builtin_output_republish {
|
||||
desc_builtin_action_republish {
|
||||
desc {
|
||||
en: """Configuration for a built-in output."""
|
||||
en: """Configuration for a built-in action."""
|
||||
zh: """配置重新发布。"""
|
||||
}
|
||||
label: {
|
||||
|
@ -283,20 +283,20 @@ of the rule, then the string "undefined" is used.
|
|||
}
|
||||
}
|
||||
|
||||
desc_builtin_output_console {
|
||||
desc_builtin_action_console {
|
||||
desc {
|
||||
en: """Configuration for a built-in output."""
|
||||
en: """Configuration for a built-in action."""
|
||||
zh: """配置打印到控制台"""
|
||||
}
|
||||
label: {
|
||||
en: "Output Console Configuration"
|
||||
en: "Action Console Configuration"
|
||||
zh: "配置打印到控制台"
|
||||
}
|
||||
}
|
||||
|
||||
desc_user_provided_function {
|
||||
desc {
|
||||
en: """Configuration for a built-in output."""
|
||||
en: """Configuration for a built-in action."""
|
||||
zh: """配置用户函数"""
|
||||
}
|
||||
label: {
|
||||
|
@ -307,7 +307,7 @@ of the rule, then the string "undefined" is used.
|
|||
|
||||
desc_republish_args {
|
||||
desc {
|
||||
en: """The arguments of the built-in 'republish' output.One can use variables in the args.
|
||||
en: """The arguments of the built-in 'republish' action.One can use variables in the args.
|
||||
The variables are selected by the rule. For example, if the rule SQL is defined as following:
|
||||
<code>
|
||||
SELECT clientid, qos, payload FROM "t/1"
|
||||
|
|
|
@ -31,16 +31,16 @@
|
|||
-type selected_data() :: map().
|
||||
-type envs() :: map().
|
||||
|
||||
-type builtin_output_func() :: republish | console.
|
||||
-type builtin_output_module() :: emqx_rule_outputs.
|
||||
-type builtin_action_func() :: republish | console.
|
||||
-type builtin_action_module() :: emqx_rule_actions.
|
||||
-type bridge_channel_id() :: binary().
|
||||
-type output_fun_args() :: map().
|
||||
-type action_fun_args() :: map().
|
||||
|
||||
-type output() ::
|
||||
-type action() ::
|
||||
#{
|
||||
mod := builtin_output_module() | module(),
|
||||
func := builtin_output_func() | atom(),
|
||||
args => output_fun_args()
|
||||
mod := builtin_action_module() | module(),
|
||||
func := builtin_action_func() | atom(),
|
||||
args => action_fun_args()
|
||||
}
|
||||
| bridge_channel_id().
|
||||
|
||||
|
@ -49,7 +49,7 @@
|
|||
id := rule_id(),
|
||||
name := binary(),
|
||||
sql := binary(),
|
||||
outputs := [output()],
|
||||
actions := [action()],
|
||||
enable := boolean(),
|
||||
description => binary(),
|
||||
%% epoch in millisecond precision
|
||||
|
|
|
@ -15,43 +15,43 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Define the default actions.
|
||||
-module(emqx_rule_outputs).
|
||||
-module(emqx_rule_actions).
|
||||
|
||||
-include("rule_engine.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([parse_output/1]).
|
||||
-export([parse_action/1]).
|
||||
|
||||
%% callbacks of emqx_rule_output
|
||||
-export([pre_process_output_args/2]).
|
||||
%% callbacks of emqx_rule_action
|
||||
-export([pre_process_action_args/2]).
|
||||
|
||||
%% output functions
|
||||
%% action functions
|
||||
-export([
|
||||
console/3,
|
||||
republish/3
|
||||
]).
|
||||
|
||||
-optional_callbacks([pre_process_output_args/2]).
|
||||
-optional_callbacks([pre_process_action_args/2]).
|
||||
|
||||
-callback pre_process_output_args(FuncName :: atom(), output_fun_args()) -> output_fun_args().
|
||||
-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
parse_output(#{function := OutputFunc} = Output) ->
|
||||
{Mod, Func} = parse_output_func(OutputFunc),
|
||||
parse_action(#{function := ActionFunc} = Action) ->
|
||||
{Mod, Func} = parse_action_func(ActionFunc),
|
||||
#{
|
||||
mod => Mod,
|
||||
func => Func,
|
||||
args => pre_process_args(Mod, Func, maps:get(args, Output, #{}))
|
||||
args => pre_process_args(Mod, Func, maps:get(args, Action, #{}))
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% callbacks of emqx_rule_output
|
||||
%% callbacks of emqx_rule_action
|
||||
%%--------------------------------------------------------------------
|
||||
pre_process_output_args(
|
||||
pre_process_action_args(
|
||||
republish,
|
||||
#{
|
||||
topic := Topic,
|
||||
|
@ -68,17 +68,17 @@ pre_process_output_args(
|
|||
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
|
||||
}
|
||||
};
|
||||
pre_process_output_args(_, Args) ->
|
||||
pre_process_action_args(_, Args) ->
|
||||
Args.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% output functions
|
||||
%% action functions
|
||||
%%--------------------------------------------------------------------
|
||||
-spec console(map(), map(), map()) -> any().
|
||||
console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
|
||||
?ULOG(
|
||||
"[rule output] ~ts~n"
|
||||
"\tOutput Data: ~p~n"
|
||||
"[rule action] ~ts~n"
|
||||
"\tAction Data: ~p~n"
|
||||
"\tEnvs: ~p~n",
|
||||
[RuleId, Selected, Envs]
|
||||
).
|
||||
|
@ -135,36 +135,36 @@ republish(
|
|||
%%--------------------------------------------------------------------
|
||||
%% internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
parse_output_func(OutputFunc) ->
|
||||
{Mod, Func} = get_output_mod_func(OutputFunc),
|
||||
parse_action_func(ActionFunc) ->
|
||||
{Mod, Func} = get_action_mod_func(ActionFunc),
|
||||
assert_function_supported(Mod, Func),
|
||||
{Mod, Func}.
|
||||
|
||||
get_output_mod_func(OutputFunc) when is_atom(OutputFunc) ->
|
||||
{emqx_rule_outputs, OutputFunc};
|
||||
get_output_mod_func(OutputFunc) when is_binary(OutputFunc) ->
|
||||
get_action_mod_func(ActionFunc) when is_atom(ActionFunc) ->
|
||||
{emqx_rule_actions, ActionFunc};
|
||||
get_action_mod_func(ActionFunc) when is_binary(ActionFunc) ->
|
||||
ToAtom = fun(Bin) ->
|
||||
try binary_to_existing_atom(Bin) of
|
||||
Atom -> Atom
|
||||
catch
|
||||
error:badarg -> error({unknown_output_function, OutputFunc})
|
||||
error:badarg -> error({unknown_action_function, ActionFunc})
|
||||
end
|
||||
end,
|
||||
case string:split(OutputFunc, ":", all) of
|
||||
[Func1] -> {emqx_rule_outputs, ToAtom(Func1)};
|
||||
case string:split(ActionFunc, ":", all) of
|
||||
[Func1] -> {emqx_rule_actions, ToAtom(Func1)};
|
||||
[Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)};
|
||||
_ -> error({invalid_output_function, OutputFunc})
|
||||
_ -> error({invalid_action_function, ActionFunc})
|
||||
end.
|
||||
|
||||
assert_function_supported(Mod, Func) ->
|
||||
case erlang:function_exported(Mod, Func, 3) of
|
||||
true -> ok;
|
||||
false -> error({output_function_not_supported, Func})
|
||||
false -> error({action_function_not_supported, Func})
|
||||
end.
|
||||
|
||||
pre_process_args(Mod, Func, Args) ->
|
||||
case erlang:function_exported(Mod, pre_process_output_args, 2) of
|
||||
true -> Mod:pre_process_output_args(Func, Args);
|
||||
case erlang:function_exported(Mod, pre_process_action_args, 2) of
|
||||
true -> Mod:pre_process_action_args(Func, Args);
|
||||
false -> Args
|
||||
end.
|
||||
|
|
@ -124,25 +124,25 @@ fields("metrics") ->
|
|||
sc(non_neg_integer(), #{
|
||||
desc => ?DESC("metrics_sql_failed_unknown")
|
||||
})},
|
||||
{"outputs.total",
|
||||
{"actions.total",
|
||||
sc(non_neg_integer(), #{
|
||||
desc => ?DESC("metrics_outputs_total")
|
||||
desc => ?DESC("metrics_actions_total")
|
||||
})},
|
||||
{"outputs.success",
|
||||
{"actions.success",
|
||||
sc(non_neg_integer(), #{
|
||||
desc => ?DESC("metrics_outputs_success")
|
||||
desc => ?DESC("metrics_actions_success")
|
||||
})},
|
||||
{"outputs.failed",
|
||||
{"actions.failed",
|
||||
sc(non_neg_integer(), #{
|
||||
desc => ?DESC("metrics_outputs_failed")
|
||||
desc => ?DESC("metrics_actions_failed")
|
||||
})},
|
||||
{"outputs.failed.out_of_service",
|
||||
{"actions.failed.out_of_service",
|
||||
sc(non_neg_integer(), #{
|
||||
desc => ?DESC("metrics_outputs_failed_out_of_service")
|
||||
desc => ?DESC("metrics_actions_failed_out_of_service")
|
||||
})},
|
||||
{"outputs.failed.unknown",
|
||||
{"actions.failed.unknown",
|
||||
sc(non_neg_integer(), #{
|
||||
desc => ?DESC("metrics_outputs_failed_unknown")
|
||||
desc => ?DESC("metrics_actions_failed_unknown")
|
||||
})}
|
||||
];
|
||||
fields("node_metrics") ->
|
||||
|
|
|
@ -88,11 +88,11 @@
|
|||
'failed',
|
||||
'failed.exception',
|
||||
'failed.no_result',
|
||||
'outputs.total',
|
||||
'outputs.success',
|
||||
'outputs.failed',
|
||||
'outputs.failed.out_of_service',
|
||||
'outputs.failed.unknown'
|
||||
'actions.total',
|
||||
'actions.success',
|
||||
'actions.failed',
|
||||
'actions.failed.out_of_service',
|
||||
'actions.failed.unknown'
|
||||
]).
|
||||
|
||||
-define(RATE_METRICS, ['matched']).
|
||||
|
@ -265,9 +265,9 @@ get_basic_usage_info() ->
|
|||
NumRules = length(EnabledRules),
|
||||
ReferencedBridges =
|
||||
lists:foldl(
|
||||
fun(#{outputs := Outputs, from := From}, Acc) ->
|
||||
fun(#{actions := Actions, from := From}, Acc) ->
|
||||
BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From],
|
||||
BridgeIDs1 = lists:filter(fun is_binary/1, Outputs),
|
||||
BridgeIDs1 = lists:filter(fun is_binary/1, Actions),
|
||||
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
|
||||
end,
|
||||
#{},
|
||||
|
@ -342,7 +342,7 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal Functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, CreatedAt) ->
|
||||
parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
|
||||
case emqx_rule_sqlparser:parse(Sql) of
|
||||
{ok, Select} ->
|
||||
Rule = #{
|
||||
|
@ -352,7 +352,7 @@ parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, Creat
|
|||
updated_at => now_ms(),
|
||||
enable => maps:get(enable, Params, true),
|
||||
sql => Sql,
|
||||
outputs => parse_outputs(Outputs),
|
||||
actions => parse_actions(Actions),
|
||||
description => maps:get(description, Params, ""),
|
||||
%% -- calculated fields:
|
||||
from => emqx_rule_sqlparser:select_from(Select),
|
||||
|
@ -386,12 +386,12 @@ do_delete_rule(RuleId) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
parse_outputs(Outputs) ->
|
||||
[do_parse_output(Out) || Out <- Outputs].
|
||||
parse_actions(Actions) ->
|
||||
[do_parse_action(Act) || Act <- Actions].
|
||||
|
||||
do_parse_output(Output) when is_map(Output) ->
|
||||
emqx_rule_outputs:parse_output(Output);
|
||||
do_parse_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
||||
do_parse_action(Action) when is_map(Action) ->
|
||||
emqx_rule_actions:parse_action(Action);
|
||||
do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
||||
BridgeChannelId.
|
||||
|
||||
get_all_records(Tab) ->
|
||||
|
|
|
@ -65,11 +65,11 @@ end).
|
|||
'failed' => FAIL,
|
||||
'failed.exception' => FAIL_EX,
|
||||
'failed.no_result' => FAIL_NORES,
|
||||
'outputs.total' => O_TOTAL,
|
||||
'outputs.failed' => O_FAIL,
|
||||
'outputs.failed.out_of_service' => O_FAIL_OOS,
|
||||
'outputs.failed.unknown' => O_FAIL_UNKNOWN,
|
||||
'outputs.success' => O_SUCC,
|
||||
'actions.total' => O_TOTAL,
|
||||
'actions.failed' => O_FAIL,
|
||||
'actions.failed.out_of_service' => O_FAIL_OOS,
|
||||
'actions.failed.unknown' => O_FAIL_UNKNOWN,
|
||||
'actions.success' => O_SUCC,
|
||||
'matched.rate' => RATE,
|
||||
'matched.rate.max' => RATE_MAX,
|
||||
'matched.rate.last5m' => RATE_5
|
||||
|
@ -96,11 +96,11 @@ end).
|
|||
'failed' := FAIL,
|
||||
'failed.exception' := FAIL_EX,
|
||||
'failed.no_result' := FAIL_NORES,
|
||||
'outputs.total' := O_TOTAL,
|
||||
'outputs.failed' := O_FAIL,
|
||||
'outputs.failed.out_of_service' := O_FAIL_OOS,
|
||||
'outputs.failed.unknown' := O_FAIL_UNKNOWN,
|
||||
'outputs.success' := O_SUCC,
|
||||
'actions.total' := O_TOTAL,
|
||||
'actions.failed' := O_FAIL,
|
||||
'actions.failed.out_of_service' := O_FAIL_OOS,
|
||||
'actions.failed.unknown' := O_FAIL_UNKNOWN,
|
||||
'actions.success' := O_SUCC,
|
||||
'matched.rate' := RATE,
|
||||
'matched.rate.max' := RATE_MAX,
|
||||
'matched.rate.last5m' := RATE_5
|
||||
|
@ -362,7 +362,7 @@ format_rule_resp(#{
|
|||
name := Name,
|
||||
created_at := CreatedAt,
|
||||
from := Topics,
|
||||
outputs := Output,
|
||||
actions := Action,
|
||||
sql := SQL,
|
||||
enable := Enable,
|
||||
description := Descr
|
||||
|
@ -372,7 +372,7 @@ format_rule_resp(#{
|
|||
id => Id,
|
||||
name => Name,
|
||||
from => Topics,
|
||||
outputs => format_output(Output),
|
||||
actions => format_action(Action),
|
||||
sql => SQL,
|
||||
metrics => aggregate_metrics(NodeMetrics),
|
||||
node_metrics => NodeMetrics,
|
||||
|
@ -384,18 +384,18 @@ format_rule_resp(#{
|
|||
format_datetime(Timestamp, Unit) ->
|
||||
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
|
||||
|
||||
format_output(Outputs) ->
|
||||
[do_format_output(Out) || Out <- Outputs].
|
||||
format_action(Actions) ->
|
||||
[do_format_action(Act) || Act <- Actions].
|
||||
|
||||
do_format_output(#{mod := Mod, func := Func, args := Args}) ->
|
||||
do_format_action(#{mod := Mod, func := Func, args := Args}) ->
|
||||
#{
|
||||
function => printable_function_name(Mod, Func),
|
||||
args => maps:remove(preprocessed_tmpl, Args)
|
||||
};
|
||||
do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
||||
do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
||||
BridgeChannelId.
|
||||
|
||||
printable_function_name(emqx_rule_outputs, Func) ->
|
||||
printable_function_name(emqx_rule_actions, Func) ->
|
||||
Func;
|
||||
printable_function_name(Mod, Func) ->
|
||||
list_to_binary(lists:concat([Mod, ":", Func])).
|
||||
|
@ -411,11 +411,11 @@ get_rule_metrics(Id) ->
|
|||
'failed' := Failed,
|
||||
'failed.exception' := FailedEx,
|
||||
'failed.no_result' := FailedNoRes,
|
||||
'outputs.total' := OTotal,
|
||||
'outputs.failed' := OFailed,
|
||||
'outputs.failed.out_of_service' := OFailedOOS,
|
||||
'outputs.failed.unknown' := OFailedUnknown,
|
||||
'outputs.success' := OFailedSucc
|
||||
'actions.total' := OTotal,
|
||||
'actions.failed' := OFailed,
|
||||
'actions.failed.out_of_service' := OFailedOOS,
|
||||
'actions.failed.unknown' := OFailedUnknown,
|
||||
'actions.success' := OFailedSucc
|
||||
},
|
||||
rate :=
|
||||
#{
|
||||
|
|
|
@ -64,11 +64,11 @@ fields("rules") ->
|
|||
validator => fun ?MODULE:validate_sql/1
|
||||
}
|
||||
)},
|
||||
{"outputs",
|
||||
{"actions",
|
||||
sc(
|
||||
hoconsc:array(hoconsc:union(outputs())),
|
||||
hoconsc:array(hoconsc:union(actions())),
|
||||
#{
|
||||
desc => ?DESC("rules_outputs"),
|
||||
desc => ?DESC("rules_actions"),
|
||||
default => [],
|
||||
example => [
|
||||
<<"http:my_http_bridge">>,
|
||||
|
@ -93,16 +93,16 @@ fields("rules") ->
|
|||
}
|
||||
)}
|
||||
];
|
||||
fields("builtin_output_republish") ->
|
||||
fields("builtin_action_republish") ->
|
||||
[
|
||||
{function, sc(republish, #{desc => ?DESC("republish_function")})},
|
||||
{args, sc(ref("republish_args"), #{default => #{}})}
|
||||
];
|
||||
fields("builtin_output_console") ->
|
||||
fields("builtin_action_console") ->
|
||||
[
|
||||
{function, sc(console, #{desc => ?DESC("console_function")})}
|
||||
%% we may support some args for the console output in the future
|
||||
%, {args, sc(map(), #{desc => "The arguments of the built-in 'console' output",
|
||||
%% we may support some args for the console action in the future
|
||||
%, {args, sc(map(), #{desc => "The arguments of the built-in 'console' action",
|
||||
% default => #{}})}
|
||||
];
|
||||
fields("user_provided_function") ->
|
||||
|
@ -169,10 +169,10 @@ desc("rule_engine") ->
|
|||
?DESC("desc_rule_engine");
|
||||
desc("rules") ->
|
||||
?DESC("desc_rules");
|
||||
desc("builtin_output_republish") ->
|
||||
?DESC("desc_builtin_output_republish");
|
||||
desc("builtin_output_console") ->
|
||||
?DESC("desc_builtin_output_console");
|
||||
desc("builtin_action_republish") ->
|
||||
?DESC("desc_builtin_action_republish");
|
||||
desc("builtin_action_console") ->
|
||||
?DESC("desc_builtin_action_console");
|
||||
desc("user_provided_function") ->
|
||||
?DESC("desc_user_provided_function");
|
||||
desc("republish_args") ->
|
||||
|
@ -207,11 +207,11 @@ validate_rule_name(Name) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
outputs() ->
|
||||
actions() ->
|
||||
[
|
||||
binary(),
|
||||
ref("builtin_output_republish"),
|
||||
ref("builtin_output_console"),
|
||||
ref("builtin_action_republish"),
|
||||
ref("builtin_action_console"),
|
||||
ref("user_provided_function")
|
||||
].
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ do_apply_rule(
|
|||
doeach := DoEach,
|
||||
incase := InCase,
|
||||
conditions := Conditions,
|
||||
outputs := Outputs
|
||||
actions := Actions
|
||||
},
|
||||
Input
|
||||
) ->
|
||||
|
@ -145,7 +145,7 @@ do_apply_rule(
|
|||
_ ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
|
||||
end,
|
||||
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
|
||||
{ok, [handle_action_list(RuleId, Actions, Coll, Input) || Coll <- Collection2]};
|
||||
false ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
||||
{error, nomatch}
|
||||
|
@ -156,7 +156,7 @@ do_apply_rule(
|
|||
is_foreach := false,
|
||||
fields := Fields,
|
||||
conditions := Conditions,
|
||||
outputs := Outputs
|
||||
actions := Actions
|
||||
},
|
||||
Input
|
||||
) ->
|
||||
|
@ -172,7 +172,7 @@ do_apply_rule(
|
|||
of
|
||||
true ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
|
||||
{ok, handle_output_list(RuleId, Outputs, Selected, Input)};
|
||||
{ok, handle_action_list(RuleId, Actions, Selected, Input)};
|
||||
false ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
||||
{error, nomatch}
|
||||
|
@ -185,24 +185,24 @@ clear_rule_payload() ->
|
|||
select_and_transform(Fields, Input) ->
|
||||
select_and_transform(Fields, Input, #{}).
|
||||
|
||||
select_and_transform([], _Input, Output) ->
|
||||
Output;
|
||||
select_and_transform(['*' | More], Input, Output) ->
|
||||
select_and_transform(More, Input, maps:merge(Output, Input));
|
||||
select_and_transform([{as, Field, Alias} | More], Input, Output) ->
|
||||
select_and_transform([], _Input, Action) ->
|
||||
Action;
|
||||
select_and_transform(['*' | More], Input, Action) ->
|
||||
select_and_transform(More, Input, maps:merge(Action, Input));
|
||||
select_and_transform([{as, Field, Alias} | More], Input, Action) ->
|
||||
Val = eval(Field, Input),
|
||||
select_and_transform(
|
||||
More,
|
||||
nested_put(Alias, Val, Input),
|
||||
nested_put(Alias, Val, Output)
|
||||
nested_put(Alias, Val, Action)
|
||||
);
|
||||
select_and_transform([Field | More], Input, Output) ->
|
||||
select_and_transform([Field | More], Input, Action) ->
|
||||
Val = eval(Field, Input),
|
||||
Key = alias(Field),
|
||||
select_and_transform(
|
||||
More,
|
||||
nested_put(Key, Val, Input),
|
||||
nested_put(Key, Val, Output)
|
||||
nested_put(Key, Val, Action)
|
||||
).
|
||||
|
||||
%% FOREACH Clause
|
||||
|
@ -210,27 +210,27 @@ select_and_transform([Field | More], Input, Output) ->
|
|||
select_and_collect(Fields, Input) ->
|
||||
select_and_collect(Fields, Input, {#{}, {'item', []}}).
|
||||
|
||||
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) ->
|
||||
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Action, _}) ->
|
||||
Val = eval(Field, Input),
|
||||
{nested_put(Alias, Val, Output), {A, ensure_list(Val)}};
|
||||
select_and_collect([{as, Field, Alias} | More], Input, {Output, LastKV}) ->
|
||||
{nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
|
||||
select_and_collect([{as, Field, Alias} | More], Input, {Action, LastKV}) ->
|
||||
Val = eval(Field, Input),
|
||||
select_and_collect(
|
||||
More,
|
||||
nested_put(Alias, Val, Input),
|
||||
{nested_put(Alias, Val, Output), LastKV}
|
||||
{nested_put(Alias, Val, Action), LastKV}
|
||||
);
|
||||
select_and_collect([Field], Input, {Output, _}) ->
|
||||
select_and_collect([Field], Input, {Action, _}) ->
|
||||
Val = eval(Field, Input),
|
||||
Key = alias(Field),
|
||||
{nested_put(Key, Val, Output), {'item', ensure_list(Val)}};
|
||||
select_and_collect([Field | More], Input, {Output, LastKV}) ->
|
||||
{nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
|
||||
select_and_collect([Field | More], Input, {Action, LastKV}) ->
|
||||
Val = eval(Field, Input),
|
||||
Key = alias(Field),
|
||||
select_and_collect(
|
||||
More,
|
||||
nested_put(Key, Val, Input),
|
||||
{nested_put(Key, Val, Output), LastKV}
|
||||
{nested_put(Key, Val, Action), LastKV}
|
||||
).
|
||||
|
||||
%% Filter each item got from FOREACH
|
||||
|
@ -312,43 +312,43 @@ number(Bin) ->
|
|||
error:badarg -> binary_to_float(Bin)
|
||||
end.
|
||||
|
||||
handle_output_list(RuleId, Outputs, Selected, Envs) ->
|
||||
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs].
|
||||
handle_action_list(RuleId, Actions, Selected, Envs) ->
|
||||
[handle_action(RuleId, Act, Selected, Envs) || Act <- Actions].
|
||||
|
||||
handle_output(RuleId, OutId, Selected, Envs) ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.total'),
|
||||
handle_action(RuleId, ActId, Selected, Envs) ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
|
||||
try
|
||||
Result = do_handle_output(OutId, Selected, Envs),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.success'),
|
||||
Result = do_handle_action(ActId, Selected, Envs),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'),
|
||||
Result
|
||||
catch
|
||||
throw:out_of_service ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
ok = emqx_metrics_worker:inc(
|
||||
rule_metrics, RuleId, 'outputs.failed.out_of_service'
|
||||
rule_metrics, RuleId, 'actions.failed.out_of_service'
|
||||
),
|
||||
?SLOG(warning, #{msg => "out_of_service", output => OutId});
|
||||
?SLOG(warning, #{msg => "out_of_service", action => ActId});
|
||||
Err:Reason:ST ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed.unknown'),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
|
||||
?SLOG(error, #{
|
||||
msg => "output_failed",
|
||||
output => OutId,
|
||||
msg => "action_failed",
|
||||
action => ActId,
|
||||
exception => Err,
|
||||
reason => Reason,
|
||||
stacktrace => ST
|
||||
})
|
||||
end.
|
||||
|
||||
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
|
||||
?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
|
||||
do_handle_action(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
|
||||
?TRACE("BRIDGE", "bridge_action", #{bridge_id => BridgeId}),
|
||||
case emqx_bridge:send_message(BridgeId, Selected) of
|
||||
{error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
|
||||
throw(out_of_service);
|
||||
Result ->
|
||||
Result
|
||||
end;
|
||||
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
||||
do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
||||
%% the function can also throw 'out_of_service'
|
||||
Mod:Func(Selected, Envs, Args).
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
|
|||
id => RuleId,
|
||||
sql => Sql,
|
||||
from => EventTopics,
|
||||
outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
|
||||
actions => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
|
||||
enable => true,
|
||||
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
|
||||
fields => emqx_rule_sqlparser:select_fields(Select),
|
||||
|
|
|
@ -166,9 +166,9 @@ init_per_testcase(t_events, Config) ->
|
|||
#{
|
||||
id => <<"rule:t_events">>,
|
||||
sql => SQL,
|
||||
outputs => [
|
||||
actions => [
|
||||
#{
|
||||
function => <<"emqx_rule_engine_SUITE:output_record_triggered_events">>,
|
||||
function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
|
||||
args => #{}
|
||||
}
|
||||
],
|
||||
|
@ -194,7 +194,7 @@ t_create_rule(_Config) ->
|
|||
#{
|
||||
sql => <<"select * from \"t/a\"">>,
|
||||
id => <<"t_create_rule">>,
|
||||
outputs => [#{function => console}],
|
||||
actions => [#{function => console}],
|
||||
description => <<"debug rule">>
|
||||
}
|
||||
),
|
||||
|
@ -256,7 +256,7 @@ t_create_existing_rule(_Config) ->
|
|||
#{
|
||||
id => <<"an_existing_rule">>,
|
||||
sql => <<"select * from \"t/#\"">>,
|
||||
outputs => [#{function => console}]
|
||||
actions => [#{function => console}]
|
||||
}
|
||||
),
|
||||
{ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>),
|
||||
|
@ -554,12 +554,12 @@ t_match_atom_and_binary(_Config) ->
|
|||
"SELECT connected_at as ts, * "
|
||||
"FROM \"$events/client_connected\" "
|
||||
"WHERE username = 'emqx2' ",
|
||||
Repub = republish_output(<<"t2">>, <<"user:${ts}">>),
|
||||
Repub = republish_action(<<"t2">>, <<"user:${ts}">>),
|
||||
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
outputs => [Repub]
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
{ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]),
|
||||
|
@ -750,8 +750,8 @@ t_sqlselect_001(_Config) ->
|
|||
Sql2 =
|
||||
"SELECT jq('.a|.[]', "
|
||||
"'{\"a\": [{\"b\": 1}, {\"b\": 2}, {\"b\": 3}]}') "
|
||||
"as jq_output, "
|
||||
" jq_output[1].b as first_b from \"t/#\" ",
|
||||
"as jq_action, "
|
||||
" jq_action[1].b as first_b from \"t/#\" ",
|
||||
?assertMatch(
|
||||
{ok, #{<<"first_b">> := 1}},
|
||||
emqx_rule_sqltester:test(
|
||||
|
@ -771,12 +771,12 @@ t_sqlselect_01(_Config) ->
|
|||
"SELECT json_decode(payload) as p, payload "
|
||||
"FROM \"t3/#\", \"t1\" "
|
||||
"WHERE p.x = 1",
|
||||
Repub = republish_output(<<"t2">>),
|
||||
Repub = republish_action(<<"t2">>),
|
||||
{ok, TopicRule1} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
outputs => [Repub]
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||
|
@ -817,12 +817,12 @@ t_sqlselect_02(_Config) ->
|
|||
"SELECT * "
|
||||
"FROM \"t3/#\", \"t1\" "
|
||||
"WHERE payload.x = 1",
|
||||
Repub = republish_output(<<"t2">>),
|
||||
Repub = republish_action(<<"t2">>),
|
||||
{ok, TopicRule1} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
outputs => [Repub]
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||
|
@ -863,12 +863,12 @@ t_sqlselect_1(_Config) ->
|
|||
"SELECT json_decode(payload) as p, payload "
|
||||
"FROM \"t1\" "
|
||||
"WHERE p.x = 1 and p.y = 2",
|
||||
Repub = republish_output(<<"t2">>),
|
||||
Repub = republish_action(<<"t2">>),
|
||||
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
outputs => [Repub]
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||
|
@ -898,12 +898,12 @@ t_sqlselect_1(_Config) ->
|
|||
t_sqlselect_2(_Config) ->
|
||||
%% recursively republish to t2
|
||||
SQL = "SELECT * FROM \"t2\" ",
|
||||
Repub = republish_output(<<"t2">>),
|
||||
Repub = republish_action(<<"t2">>),
|
||||
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
outputs => [Repub]
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||
|
@ -932,12 +932,12 @@ t_sqlselect_3(_Config) ->
|
|||
"SELECT * "
|
||||
"FROM \"$events/client_connected\" "
|
||||
"WHERE username = 'emqx1'",
|
||||
Repub = republish_output(<<"t2">>, <<"clientid=${clientid}">>),
|
||||
Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
|
||||
{ok, TopicRule} = emqx_rule_engine:create_rule(
|
||||
#{
|
||||
sql => SQL,
|
||||
id => ?TMP_RULEID,
|
||||
outputs => [Repub]
|
||||
actions => [Repub]
|
||||
}
|
||||
),
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
|
||||
|
@ -2270,7 +2270,7 @@ t_get_basic_usage_info_1(_Config) ->
|
|||
#{
|
||||
id => <<"rule:t_get_basic_usage_info:1">>,
|
||||
sql => <<"select 1 from topic">>,
|
||||
outputs =>
|
||||
actions =>
|
||||
[
|
||||
#{function => <<"erlang:hibernate">>, args => #{}},
|
||||
#{function => console},
|
||||
|
@ -2284,7 +2284,7 @@ t_get_basic_usage_info_1(_Config) ->
|
|||
#{
|
||||
id => <<"rule:t_get_basic_usage_info:2">>,
|
||||
sql => <<"select 1 from topic">>,
|
||||
outputs =>
|
||||
actions =>
|
||||
[
|
||||
<<"mqtt:my_mqtt_bridge">>,
|
||||
<<"http:my_http_bridge">>
|
||||
|
@ -2308,9 +2308,9 @@ t_get_basic_usage_info_1(_Config) ->
|
|||
%% Internal helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
republish_output(Topic) ->
|
||||
republish_output(Topic, <<"${payload}">>).
|
||||
republish_output(Topic, Payload) ->
|
||||
republish_action(Topic) ->
|
||||
republish_action(Topic, <<"${payload}">>).
|
||||
republish_action(Topic, Payload) ->
|
||||
#{
|
||||
function => republish,
|
||||
args => #{payload => Payload, topic => Topic, qos => 0, retain => false}
|
||||
|
@ -2337,13 +2337,13 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
|
|||
fields => [<<"*">>],
|
||||
is_foreach => false,
|
||||
conditions => {},
|
||||
outputs => [#{mod => emqx_rule_outputs, func => console, args => #{}}],
|
||||
actions => [#{mod => emqx_rule_actions, func => console, args => #{}}],
|
||||
description => <<"simple rule">>,
|
||||
created_at => Ts
|
||||
}.
|
||||
|
||||
output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->
|
||||
ct:pal("applying output_record_triggered_events: ~p", [Data]),
|
||||
action_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->
|
||||
ct:pal("applying action_record_triggered_events: ~p", [Data]),
|
||||
ets:insert(events_record_tab, {EventName, Data}).
|
||||
|
||||
verify_event(EventName) ->
|
||||
|
|
|
@ -33,7 +33,7 @@ t_crud_rule_api(_Config) ->
|
|||
<<"description">> => <<"A simple rule">>,
|
||||
<<"enable">> => true,
|
||||
<<"id">> => RuleID,
|
||||
<<"outputs">> => [#{<<"function">> => <<"console">>}],
|
||||
<<"actions">> => [#{<<"function">> => <<"console">>}],
|
||||
<<"sql">> => <<"SELECT * from \"t/1\"">>,
|
||||
<<"name">> => <<"test_rule">>
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue