Merge pull request #8042 from terry-xiaoyu/rename_rule_outputs_to_actions

refactor(rule): rename outputs -> actions
This commit is contained in:
Xinyu Liu 2022-05-26 09:06:31 +08:00 committed by GitHub
commit fa38e19187
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 269 additions and 266 deletions

View File

@ -28,8 +28,8 @@
# retain = false # retain = false
#} #}
# #
## HTTP bridges to an HTTP server ## WebHook to an HTTP server
#bridges.http.my_http_bridge { #bridges.webhook.my_webhook {
# enable = true # enable = true
# direction = egress # direction = egress
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url # ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url

View File

@ -72,14 +72,14 @@ In config files, you can find the corresponding config entry for a connector by
} }
} }
bridges_http { bridges_webhook {
desc { desc {
en: """HTTP bridges to an HTTP server.""" en: """WebHook to an HTTP server."""
zh: """转发消息到 HTTP 服务器的 HTTP Bridge""" zh: """转发消息到 HTTP 服务器的 WebHook"""
} }
label: { label: {
en: "HTTP Bridge" en: "WebHook"
zh: "HTTP Bridge" zh: "WebHook"
} }
} }

View File

@ -1,4 +1,4 @@
emqx_bridge_http_schema { emqx_bridge_webhook_schema {
config_enable { config_enable {
desc { desc {
@ -52,7 +52,7 @@ HTTP Bridge 的 URL。</br>
en: """ en: """
The MQTT topic filter to be forwarded to the HTTP server. All MQTT 'PUBLISH' messages with the topic The MQTT topic filter to be forwarded to the HTTP server. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.</br> matching the local_topic will be forwarded.</br>
NOTE: if this bridge is used as the output of a rule (EMQX rule engine), and also local_topic is NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded. will be forwarded.
""" """

View File

@ -263,7 +263,7 @@ get_matched_bridges(Topic) ->
(_BName, #{direction := ingress}, Acc1) -> (_BName, #{direction := ingress}, Acc1) ->
Acc1; Acc1;
(BName, #{direction := egress} = Egress, Acc1) -> (BName, #{direction := egress} = Egress, Acc1) ->
%% HTTP, MySQL bridges only have egress direction %% WebHook, MySQL bridges only have egress direction
get_matched_bridge_id(Egress, Topic, BType, BName, Acc1) get_matched_bridge_id(Egress, Topic, BType, BName, Acc1)
end, end,
Acc0, Acc0,

View File

@ -42,8 +42,6 @@
-export([lookup_from_local_node/2]). -export([lookup_from_local_node/2]).
-define(TYPES, [mqtt, http]).
-define(CONN_TYPES, [mqtt]). -define(CONN_TYPES, [mqtt]).
-define(TRY_PARSE_ID(ID, EXPR), -define(TRY_PARSE_ID(ID, EXPR),
@ -148,7 +146,7 @@ param_path_id() ->
#{ #{
in => path, in => path,
required => true, required => true,
example => <<"http:my_http_bridge">>, example => <<"webhook:my_webhook">>,
desc => ?DESC("desc_param_path_id") desc => ?DESC("desc_param_path_id")
} }
)}. )}.
@ -158,9 +156,9 @@ bridge_info_array_example(Method) ->
bridge_info_examples(Method) -> bridge_info_examples(Method) ->
maps:merge(conn_bridge_examples(Method), #{ maps:merge(conn_bridge_examples(Method), #{
<<"http_bridge">> => #{ <<"my_webhook">> => #{
summary => <<"HTTP Bridge">>, summary => <<"WebHook">>,
value => info_example(http, awesome, Method) value => info_example(webhook, awesome, Method)
} }
}). }).
@ -196,7 +194,7 @@ method_example(Type, Direction, Method) when Method == get; Method == post ->
SDir = atom_to_list(Direction), SDir = atom_to_list(Direction),
SName = SName =
case Type of case Type of
http -> "my_" ++ SType ++ "_bridge"; webhook -> "my_" ++ SType;
_ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge" _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
end, end,
TypeNameExamp = #{ TypeNameExamp = #{
@ -220,7 +218,7 @@ maybe_with_metrics_example(TypeNameExamp, get) ->
maybe_with_metrics_example(TypeNameExamp, _) -> maybe_with_metrics_example(TypeNameExamp, _) ->
TypeNameExamp. TypeNameExamp.
info_example_basic(http, _) -> info_example_basic(webhook, _) ->
#{ #{
enable => true, enable => true,
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
@ -232,7 +230,7 @@ info_example_basic(http, _) ->
pool_size => 4, pool_size => 4,
enable_pipelining => true, enable_pipelining => true,
ssl => #{enable => false}, ssl => #{enable => false},
local_topic => <<"emqx_http/#">>, local_topic => <<"emqx_webhook/#">>,
method => post, method => post,
body => <<"${payload}">> body => <<"${payload}">>
}; };

View File

@ -43,8 +43,8 @@
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
bridge_to_resource_type(<<"http">>) -> emqx_connector_http; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http;
bridge_to_resource_type(http) -> emqx_connector_http. bridge_to_resource_type(webhook) -> emqx_connector_http.
resource_id(BridgeId) when is_binary(BridgeId) -> resource_id(BridgeId) when is_binary(BridgeId) ->
<<"bridge:", BridgeId/binary>>. <<"bridge:", BridgeId/binary>>.
@ -104,7 +104,7 @@ update(Type, Name, {OldConf, Conf}) ->
%% - if the connection related configs like `servers` is updated, we should restart/start %% - if the connection related configs like `servers` is updated, we should restart/start
%% or stop bridges according to the change. %% or stop bridges according to the change.
%% - if the connection related configs are not update, only non-connection configs like %% - if the connection related configs are not update, only non-connection configs like
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
%% without restarting the bridge. %% without restarting the bridge.
%% %%
case if_only_to_toggle_enable(OldConf, Conf) of case if_only_to_toggle_enable(OldConf, Conf) of
@ -237,7 +237,7 @@ is_tmp_path(TmpPath, File) ->
string:str(str(File), str(TmpPath)) > 0. string:str(str(File), str(TmpPath)) > 0.
parse_confs( parse_confs(
http, webhook,
_Name, _Name,
#{ #{
url := Url, url := Url,

View File

@ -46,7 +46,7 @@ http_schema(Method) ->
?CONN_TYPES ?CONN_TYPES
), ),
hoconsc:union([ hoconsc:union([
ref(emqx_bridge_http_schema, Method) ref(emqx_bridge_webhook_schema, Method)
| Schemas | Schemas
]). ]).
@ -108,10 +108,10 @@ roots() -> [bridges].
fields(bridges) -> fields(bridges) ->
[ [
{http, {webhook,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
#{desc => ?DESC("bridges_http")} #{desc => ?DESC("bridges_webhook")}
)} )}
] ++ ] ++
[ [

View File

@ -1,4 +1,4 @@
-module(emqx_bridge_http_schema). -module(emqx_bridge_webhook_schema).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
@ -81,7 +81,7 @@ fields("get") ->
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for HTTP bridge using `", string:to_upper(Method), "` method."]; ["Configuration for WebHook using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->
undefined. undefined.
@ -111,7 +111,7 @@ basic_config() ->
type_field() -> type_field() ->
{type, {type,
mk( mk(
http, webhook,
#{ #{
required => true, required => true,
desc => ?DESC("desc_type") desc => ?DESC("desc_type")

View File

@ -54,8 +54,8 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) ->
{ok, _} = emqx_bridge:remove(BridgeType, BridgeName) {ok, _} = emqx_bridge:remove(BridgeType, BridgeName)
end, end,
[ [
{http, <<"basic_usage_info_http">>}, {webhook, <<"basic_usage_info_webhook">>},
{http, <<"basic_usage_info_http_disabled">>}, {webhook, <<"basic_usage_info_webhook_disabled">>},
{mqtt, <<"basic_usage_info_mqtt">>} {mqtt, <<"basic_usage_info_mqtt">>}
] ]
), ),
@ -81,7 +81,7 @@ t_get_basic_usage_info_1(_Config) ->
#{ #{
num_bridges => 3, num_bridges => 3,
count_by_type => #{ count_by_type => #{
http => 1, webhook => 1,
mqtt => 2 mqtt => 2
} }
}, },
@ -119,7 +119,7 @@ setup_fake_telemetry_data() ->
url => <<"http://localhost:9901/messages/${topic}">>, url => <<"http://localhost:9901/messages/${topic}">>,
enable => true, enable => true,
direction => egress, direction => egress,
local_topic => "emqx_http/#", local_topic => "emqx_webhook/#",
method => post, method => post,
body => <<"${payload}">>, body => <<"${payload}">>,
headers => #{}, headers => #{},
@ -129,10 +129,10 @@ setup_fake_telemetry_data() ->
#{ #{
<<"bridges">> => <<"bridges">> =>
#{ #{
<<"http">> => <<"webhook">> =>
#{ #{
<<"basic_usage_info_http">> => HTTPConfig, <<"basic_usage_info_webhook">> => HTTPConfig,
<<"basic_usage_info_http_disabled">> => <<"basic_usage_info_webhook_disabled">> =>
HTTPConfig#{enable => false} HTTPConfig#{enable => false}
}, },
<<"mqtt">> => <<"mqtt">> =>

View File

@ -23,7 +23,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>). -define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"http">>). -define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, <<"test_bridge">>). -define(BRIDGE_NAME, <<"test_bridge">>).
-define(URL(PORT, PATH), -define(URL(PORT, PATH),
list_to_binary( list_to_binary(
@ -37,7 +37,7 @@
<<"type">> => TYPE, <<"type">> => TYPE,
<<"name">> => NAME, <<"name">> => NAME,
<<"url">> => URL, <<"url">> => URL,
<<"local_topic">> => <<"emqx_http/#">>, <<"local_topic">> => <<"emqx_webhook/#">>,
<<"method">> => <<"post">>, <<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false}, <<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>, <<"body">> => <<"${payload}">>,
@ -158,7 +158,7 @@ t_http_crud_apis(_) ->
%% assert we there's no bridges at first %% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a http bridge, using POST %% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge %% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"), URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request( {ok, 201, Bridge} = request(
@ -182,7 +182,7 @@ t_http_crud_apis(_) ->
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
%% send an message to emqx and the message should be forwarded to the HTTP server %% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>, Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert( ?assert(
receive receive
{http_server, received, #{ {http_server, received, #{
@ -254,7 +254,7 @@ t_http_crud_apis(_) ->
), ),
%% send an message to emqx again, check the path has been changed %% send an message to emqx again, check the path has been changed
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert( ?assert(
receive receive
{http_server, received, #{path := <<"/path2">>}} -> {http_server, received, #{path := <<"/path2">>}} ->

View File

@ -350,7 +350,7 @@ Ingress 模式定义了这个 bridge 如何从远程 MQTT broker 接收消息,
en: """ en: """
The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br> The egress config defines how this bridge forwards messages from the local broker to the remote broker.</br>
Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br> Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded. NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded.
""" """
zh: """ zh: """
Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。</br> Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。</br>

View File

@ -30,7 +30,7 @@
post_request/0 post_request/0
]). ]).
%% the config for http bridges do not need connectors %% the config for webhook bridges do not need connectors
-define(CONN_TYPES, [mqtt]). -define(CONN_TYPES, [mqtt]).
%%====================================================================================== %%======================================================================================

View File

@ -296,7 +296,7 @@ egress_desc() ->
"The egress config defines how this bridge forwards messages from the local broker to the remote\n" "The egress config defines how this bridge forwards messages from the local broker to the remote\n"
"broker.</br>\n" "broker.</br>\n"
"Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>\n" "Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.</br>\n"
"NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic\n" "NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic\n"
"is configured, then both the data got from the rule and the MQTT messages that matches\n" "is configured, then both the data got from the rule and the MQTT messages that matches\n"
"local_topic will be forwarded.\n". "local_topic will be forwarded.\n".

View File

@ -610,7 +610,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
#{ #{
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
<<"enable">> => true, <<"enable">> => true,
<<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], <<"actions">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
} }
), ),
@ -653,14 +653,14 @@ t_ingress_mqtt_bridge_with_rules(_) ->
<<"matched.rate">> := _, <<"matched.rate">> := _,
<<"matched.rate.max">> := _, <<"matched.rate.max">> := _,
<<"matched.rate.last5m">> := _, <<"matched.rate.last5m">> := _,
<<"outputs.total">> := 1, <<"actions.total">> := 1,
<<"outputs.success">> := 1, <<"actions.success">> := 1,
<<"outputs.failed">> := 0, <<"actions.failed">> := 0,
<<"outputs.failed.out_of_service">> := 0, <<"actions.failed.out_of_service">> := 0,
<<"outputs.failed.unknown">> := 0 <<"actions.failed.unknown">> := 0
} }
} = jsx:decode(Rule1), } = jsx:decode(Rule1),
%% we also check if the outputs of the rule is triggered %% we also check if the actions of the rule is triggered
?assertMatch( ?assertMatch(
#{ #{
inspect := #{ inspect := #{
@ -709,7 +709,7 @@ t_egress_mqtt_bridge_with_rules(_) ->
#{ #{
<<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>, <<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>,
<<"enable">> => true, <<"enable">> => true,
<<"outputs">> => [BridgeIDEgress], <<"actions">> => [BridgeIDEgress],
<<"sql">> => <<"SELECT * from \"t/1\"">> <<"sql">> => <<"SELECT * from \"t/1\"">>
} }
), ),
@ -760,11 +760,11 @@ t_egress_mqtt_bridge_with_rules(_) ->
<<"matched.rate">> := _, <<"matched.rate">> := _,
<<"matched.rate.max">> := _, <<"matched.rate.max">> := _,
<<"matched.rate.last5m">> := _, <<"matched.rate.last5m">> := _,
<<"outputs.total">> := 1, <<"actions.total">> := 1,
<<"outputs.success">> := 1, <<"actions.success">> := 1,
<<"outputs.failed">> := 0, <<"actions.failed">> := 0,
<<"outputs.failed.out_of_service">> := 0, <<"actions.failed.out_of_service">> := 0,
<<"outputs.failed.unknown">> := 0 <<"actions.failed.unknown">> := 0
} }
} = jsx:decode(Rule1), } = jsx:decode(Rule1),
%% we should receive a message on the "remote" broker, with specified topic %% we should receive a message on the "remote" broker, with specified topic

View File

@ -562,7 +562,7 @@ t_rule_engine_and_data_bridge_info(_Config) ->
#{ #{
data_bridge => data_bridge =>
#{ #{
http => #{num => 1, num_linked_by_rules => 3}, webhook => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 2, num_linked_by_rules => 2} mqtt => #{num => 2, num_linked_by_rules => 2}
}, },
num_data_bridges => 3 num_data_bridges => 3
@ -741,12 +741,12 @@ setup_fake_rule_engine_data() ->
#{ #{
id => <<"rule:t_get_basic_usage_info:1">>, id => <<"rule:t_get_basic_usage_info:1">>,
sql => <<"select 1 from topic">>, sql => <<"select 1 from topic">>,
outputs => actions =>
[ [
#{function => <<"erlang:hibernate">>, args => #{}}, #{function => <<"erlang:hibernate">>, args => #{}},
#{function => console}, #{function => console},
<<"http:my_http_bridge">>, <<"webhook:my_webhook">>,
<<"http:my_http_bridge">> <<"webhook:my_webhook">>
] ]
} }
), ),
@ -755,10 +755,10 @@ setup_fake_rule_engine_data() ->
#{ #{
id => <<"rule:t_get_basic_usage_info:2">>, id => <<"rule:t_get_basic_usage_info:2">>,
sql => <<"select 1 from topic">>, sql => <<"select 1 from topic">>,
outputs => actions =>
[ [
<<"mqtt:my_mqtt_bridge">>, <<"mqtt:my_mqtt_bridge">>,
<<"http:my_http_bridge">> <<"webhook:my_webhook">>
] ]
} }
), ),
@ -767,7 +767,7 @@ setup_fake_rule_engine_data() ->
#{ #{
id => <<"rule:t_get_basic_usage_info:3">>, id => <<"rule:t_get_basic_usage_info:3">>,
sql => <<"select 1 from \"$bridges/mqtt:mqtt_in\"">>, sql => <<"select 1 from \"$bridges/mqtt:mqtt_in\"">>,
outputs => actions =>
[ [
#{function => console} #{function => console}
] ]

View File

@ -8,7 +8,7 @@ rule_engine {
# description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'" # description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'"
# enable = true # enable = true
# sql = "SELECT * FROM \"t/1\"" # sql = "SELECT * FROM \"t/1\""
# outputs = [ # actions = [
# { # {
# function = republish # function = republish
# args = { # args = {

View File

@ -451,57 +451,57 @@ emqx_rule_api_schema {
} }
} }
metrics_outputs_total { metrics_actions_total {
desc { desc {
en: "How much times the outputs are called by the rule. This value may several times of 'matched', depending on the number of the outputs of the rule." en: "How much times the actions are called by the rule. This value may several times of 'matched', depending on the number of the actions of the rule."
zh: "规则调用输出的次数。 该值可能是“sql.matched”的几倍具体取决于规则输出的数量。" zh: "规则调用输出的次数。 该值可能是“sql.matched”的几倍具体取决于规则输出的数量。"
} }
label: { label: {
en: "Output Total" en: "Action Total"
zh: "调用输出次数" zh: "调用输出次数"
} }
} }
metrics_outputs_success { metrics_actions_success {
desc { desc {
en: "How much times the rule success to call the outputs." en: "How much times the rule success to call the actions."
zh: "规则成功调用输出的次数。" zh: "规则成功调用输出的次数。"
} }
label: { label: {
en: "Success Output" en: "Success Action"
zh: "成功调用输出次数" zh: "成功调用输出次数"
} }
} }
metrics_outputs_failed { metrics_actions_failed {
desc { desc {
en: "How much times the rule failed to call the outputs." en: "How much times the rule failed to call the actions."
zh: "规则调用输出失败的次数。" zh: "规则调用输出失败的次数。"
} }
label: { label: {
en: "Failed Output" en: "Failed Action"
zh: "调用输出失败次数" zh: "调用输出失败次数"
} }
} }
metrics_outputs_failed_out_of_service { metrics_actions_failed_out_of_service {
desc { desc {
en: "How much times the rule failed to call outputs due to the output is out of service. For example, a bridge is disabled or stopped." en: "How much times the rule failed to call actions due to the action is out of service. For example, a bridge is disabled or stopped."
zh: "由于输出停止服务而导致规则调用输出失败的次数。 例如,桥接被禁用或停止。" zh: "由于输出停止服务而导致规则调用输出失败的次数。 例如,桥接被禁用或停止。"
} }
label: { label: {
en: "Fail Output" en: "Fail Action"
zh: "调用输出失败次数" zh: "调用输出失败次数"
} }
} }
metrics_outputs_failed_unknown { metrics_actions_failed_unknown {
desc { desc {
en: "How much times the rule failed to call outputs due to to an unknown error." en: "How much times the rule failed to call actions due to to an unknown error."
zh: "由于未知错误,规则调用输出失败的次数。" zh: "由于未知错误,规则调用输出失败的次数。"
} }
label: { label: {
en: "Fail Output" en: "Fail Action"
zh: "调用输出失败次数" zh: "调用输出失败次数"
} }
} }

View File

@ -28,21 +28,21 @@ Example: <code>SELECT * FROM "test/topic" WHERE payload.x = 1</code>
} }
} }
rules_outputs { rules_actions {
desc { desc {
en: """ en: """
A list of outputs of the rule. A list of actions of the rule.
An output can be a string that refers to the channel ID of an EMQX bridge, or an object An action can be a string that refers to the channel ID of an EMQX bridge, or an object
that refers to a function. that refers to a function.
There a some built-in functions like "republish" and "console", and we also support user There a some built-in functions like "republish" and "console", and we also support user
provided functions in the format: "{module}:{function}". provided functions in the format: "{module}:{function}".
The outputs in the list are executed sequentially. The actions in the list are executed sequentially.
This means that if one of the output is executing slowly, all the following outputs will not This means that if one of the action is executing slowly, all the following actions will not
be executed until it returns. be executed until it returns.
If one of the output crashed, all other outputs come after it will still be executed, in the If one of the action crashed, all other actions come after it will still be executed, in the
original order. original order.
If there's any error when running an output, there will be an error message, and the 'failure' If there's any error when running an action, there will be an error message, and the 'failure'
counter of the function output or the bridge channel will increase. counter of the function action or the bridge channel will increase.
""" """
zh: """ zh: """
规则的动作列表。 规则的动作列表。
@ -94,7 +94,7 @@ counter of the function output or the bridge channel will increase.
console_function { console_function {
desc { desc {
en: """Print the outputs to the console""" en: """Print the actions to the console"""
zh: "将输出打印到控制台" zh: "将输出打印到控制台"
} }
label: { label: {
@ -111,12 +111,12 @@ Where {module} is the Erlang callback module and {function} is the Erlang functi
To write your own function, checkout the function <code>console</code> and To write your own function, checkout the function <code>console</code> and
<code>republish</code> in the source file: <code>republish</code> in the source file:
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example. <code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> as an example.
""" """
zh: """ zh: """
用户提供的函数。 格式应为:'{module}:{function}'。 用户提供的函数。 格式应为:'{module}:{function}'。
其中 {module} 是 Erlang 回调模块, {function} 是 Erlang 函数。 其中 {module} 是 Erlang 回调模块, {function} 是 Erlang 函数。
要编写自己的函数,请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。 要编写自己的函数,请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。
""" """
} }
label: { label: {
@ -130,11 +130,11 @@ To write your own function, checkout the function <code>console</code> and
en: """ en: """
The args will be passed as the 3rd argument to module:function/3, The args will be passed as the 3rd argument to module:function/3,
checkout the function <code>console</code> and <code>republish</code> in the source file: checkout the function <code>console</code> and <code>republish</code> in the source file:
<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> as an example. <code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> as an example.
""" """
zh: """ zh: """
用户提供的参数将作为函数 module:function/3 的第三个参数, 用户提供的参数将作为函数 module:function/3 的第三个参数,
请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_outputs.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。 请检查源文件:<code>apps/emqx_rule_engine/src/emqx_rule_actions.erl</code> 中的示例函数 <code>console</code> 和<code>republish</code> 。
""" """
} }
label: { label: {
@ -272,9 +272,9 @@ of the rule, then the string "undefined" is used.
} }
} }
desc_builtin_output_republish { desc_builtin_action_republish {
desc { desc {
en: """Configuration for a built-in output.""" en: """Configuration for a built-in action."""
zh: """配置重新发布。""" zh: """配置重新发布。"""
} }
label: { label: {
@ -283,20 +283,20 @@ of the rule, then the string "undefined" is used.
} }
} }
desc_builtin_output_console { desc_builtin_action_console {
desc { desc {
en: """Configuration for a built-in output.""" en: """Configuration for a built-in action."""
zh: """配置打印到控制台""" zh: """配置打印到控制台"""
} }
label: { label: {
en: "Output Console Configuration" en: "Action Console Configuration"
zh: "配置打印到控制台" zh: "配置打印到控制台"
} }
} }
desc_user_provided_function { desc_user_provided_function {
desc { desc {
en: """Configuration for a built-in output.""" en: """Configuration for a built-in action."""
zh: """配置用户函数""" zh: """配置用户函数"""
} }
label: { label: {
@ -307,7 +307,7 @@ of the rule, then the string "undefined" is used.
desc_republish_args { desc_republish_args {
desc { desc {
en: """The arguments of the built-in 'republish' output.One can use variables in the args. en: """The arguments of the built-in 'republish' action.One can use variables in the args.
The variables are selected by the rule. For example, if the rule SQL is defined as following: The variables are selected by the rule. For example, if the rule SQL is defined as following:
<code> <code>
SELECT clientid, qos, payload FROM "t/1" SELECT clientid, qos, payload FROM "t/1"

View File

@ -31,16 +31,16 @@
-type selected_data() :: map(). -type selected_data() :: map().
-type envs() :: map(). -type envs() :: map().
-type builtin_output_func() :: republish | console. -type builtin_action_func() :: republish | console.
-type builtin_output_module() :: emqx_rule_outputs. -type builtin_action_module() :: emqx_rule_actions.
-type bridge_channel_id() :: binary(). -type bridge_channel_id() :: binary().
-type output_fun_args() :: map(). -type action_fun_args() :: map().
-type output() :: -type action() ::
#{ #{
mod := builtin_output_module() | module(), mod := builtin_action_module() | module(),
func := builtin_output_func() | atom(), func := builtin_action_func() | atom(),
args => output_fun_args() args => action_fun_args()
} }
| bridge_channel_id(). | bridge_channel_id().
@ -49,7 +49,7 @@
id := rule_id(), id := rule_id(),
name := binary(), name := binary(),
sql := binary(), sql := binary(),
outputs := [output()], actions := [action()],
enable := boolean(), enable := boolean(),
description => binary(), description => binary(),
%% epoch in millisecond precision %% epoch in millisecond precision

View File

@ -15,43 +15,43 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Define the default actions. %% Define the default actions.
-module(emqx_rule_outputs). -module(emqx_rule_actions).
-include("rule_engine.hrl"). -include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
%% APIs %% APIs
-export([parse_output/1]). -export([parse_action/1]).
%% callbacks of emqx_rule_output %% callbacks of emqx_rule_action
-export([pre_process_output_args/2]). -export([pre_process_action_args/2]).
%% output functions %% action functions
-export([ -export([
console/3, console/3,
republish/3 republish/3
]). ]).
-optional_callbacks([pre_process_output_args/2]). -optional_callbacks([pre_process_action_args/2]).
-callback pre_process_output_args(FuncName :: atom(), output_fun_args()) -> output_fun_args(). -callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
parse_output(#{function := OutputFunc} = Output) -> parse_action(#{function := ActionFunc} = Action) ->
{Mod, Func} = parse_output_func(OutputFunc), {Mod, Func} = parse_action_func(ActionFunc),
#{ #{
mod => Mod, mod => Mod,
func => Func, func => Func,
args => pre_process_args(Mod, Func, maps:get(args, Output, #{})) args => pre_process_args(Mod, Func, maps:get(args, Action, #{}))
}. }.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% callbacks of emqx_rule_output %% callbacks of emqx_rule_action
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
pre_process_output_args( pre_process_action_args(
republish, republish,
#{ #{
topic := Topic, topic := Topic,
@ -68,17 +68,17 @@ pre_process_output_args(
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
} }
}; };
pre_process_output_args(_, Args) -> pre_process_action_args(_, Args) ->
Args. Args.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% output functions %% action functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec console(map(), map(), map()) -> any(). -spec console(map(), map(), map()) -> any().
console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
?ULOG( ?ULOG(
"[rule output] ~ts~n" "[rule action] ~ts~n"
"\tOutput Data: ~p~n" "\tAction Data: ~p~n"
"\tEnvs: ~p~n", "\tEnvs: ~p~n",
[RuleId, Selected, Envs] [RuleId, Selected, Envs]
). ).
@ -135,36 +135,36 @@ republish(
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% internal functions %% internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
parse_output_func(OutputFunc) -> parse_action_func(ActionFunc) ->
{Mod, Func} = get_output_mod_func(OutputFunc), {Mod, Func} = get_action_mod_func(ActionFunc),
assert_function_supported(Mod, Func), assert_function_supported(Mod, Func),
{Mod, Func}. {Mod, Func}.
get_output_mod_func(OutputFunc) when is_atom(OutputFunc) -> get_action_mod_func(ActionFunc) when is_atom(ActionFunc) ->
{emqx_rule_outputs, OutputFunc}; {emqx_rule_actions, ActionFunc};
get_output_mod_func(OutputFunc) when is_binary(OutputFunc) -> get_action_mod_func(ActionFunc) when is_binary(ActionFunc) ->
ToAtom = fun(Bin) -> ToAtom = fun(Bin) ->
try binary_to_existing_atom(Bin) of try binary_to_existing_atom(Bin) of
Atom -> Atom Atom -> Atom
catch catch
error:badarg -> error({unknown_output_function, OutputFunc}) error:badarg -> error({unknown_action_function, ActionFunc})
end end
end, end,
case string:split(OutputFunc, ":", all) of case string:split(ActionFunc, ":", all) of
[Func1] -> {emqx_rule_outputs, ToAtom(Func1)}; [Func1] -> {emqx_rule_actions, ToAtom(Func1)};
[Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)}; [Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)};
_ -> error({invalid_output_function, OutputFunc}) _ -> error({invalid_action_function, ActionFunc})
end. end.
assert_function_supported(Mod, Func) -> assert_function_supported(Mod, Func) ->
case erlang:function_exported(Mod, Func, 3) of case erlang:function_exported(Mod, Func, 3) of
true -> ok; true -> ok;
false -> error({output_function_not_supported, Func}) false -> error({action_function_not_supported, Func})
end. end.
pre_process_args(Mod, Func, Args) -> pre_process_args(Mod, Func, Args) ->
case erlang:function_exported(Mod, pre_process_output_args, 2) of case erlang:function_exported(Mod, pre_process_action_args, 2) of
true -> Mod:pre_process_output_args(Func, Args); true -> Mod:pre_process_action_args(Func, Args);
false -> Args false -> Args
end. end.

View File

@ -124,25 +124,25 @@ fields("metrics") ->
sc(non_neg_integer(), #{ sc(non_neg_integer(), #{
desc => ?DESC("metrics_sql_failed_unknown") desc => ?DESC("metrics_sql_failed_unknown")
})}, })},
{"outputs.total", {"actions.total",
sc(non_neg_integer(), #{ sc(non_neg_integer(), #{
desc => ?DESC("metrics_outputs_total") desc => ?DESC("metrics_actions_total")
})}, })},
{"outputs.success", {"actions.success",
sc(non_neg_integer(), #{ sc(non_neg_integer(), #{
desc => ?DESC("metrics_outputs_success") desc => ?DESC("metrics_actions_success")
})}, })},
{"outputs.failed", {"actions.failed",
sc(non_neg_integer(), #{ sc(non_neg_integer(), #{
desc => ?DESC("metrics_outputs_failed") desc => ?DESC("metrics_actions_failed")
})}, })},
{"outputs.failed.out_of_service", {"actions.failed.out_of_service",
sc(non_neg_integer(), #{ sc(non_neg_integer(), #{
desc => ?DESC("metrics_outputs_failed_out_of_service") desc => ?DESC("metrics_actions_failed_out_of_service")
})}, })},
{"outputs.failed.unknown", {"actions.failed.unknown",
sc(non_neg_integer(), #{ sc(non_neg_integer(), #{
desc => ?DESC("metrics_outputs_failed_unknown") desc => ?DESC("metrics_actions_failed_unknown")
})} })}
]; ];
fields("node_metrics") -> fields("node_metrics") ->

View File

@ -88,11 +88,11 @@
'failed', 'failed',
'failed.exception', 'failed.exception',
'failed.no_result', 'failed.no_result',
'outputs.total', 'actions.total',
'outputs.success', 'actions.success',
'outputs.failed', 'actions.failed',
'outputs.failed.out_of_service', 'actions.failed.out_of_service',
'outputs.failed.unknown' 'actions.failed.unknown'
]). ]).
-define(RATE_METRICS, ['matched']). -define(RATE_METRICS, ['matched']).
@ -265,9 +265,9 @@ get_basic_usage_info() ->
NumRules = length(EnabledRules), NumRules = length(EnabledRules),
ReferencedBridges = ReferencedBridges =
lists:foldl( lists:foldl(
fun(#{outputs := Outputs, from := From}, Acc) -> fun(#{actions := Actions, from := From}, Acc) ->
BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From], BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From],
BridgeIDs1 = lists:filter(fun is_binary/1, Outputs), BridgeIDs1 = lists:filter(fun is_binary/1, Actions),
tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc) tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc)
end, end,
#{}, #{},
@ -342,7 +342,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Functions %% Internal Functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, CreatedAt) -> parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
case emqx_rule_sqlparser:parse(Sql) of case emqx_rule_sqlparser:parse(Sql) of
{ok, Select} -> {ok, Select} ->
Rule = #{ Rule = #{
@ -352,7 +352,7 @@ parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, Creat
updated_at => now_ms(), updated_at => now_ms(),
enable => maps:get(enable, Params, true), enable => maps:get(enable, Params, true),
sql => Sql, sql => Sql,
outputs => parse_outputs(Outputs), actions => parse_actions(Actions),
description => maps:get(description, Params, ""), description => maps:get(description, Params, ""),
%% -- calculated fields: %% -- calculated fields:
from => emqx_rule_sqlparser:select_from(Select), from => emqx_rule_sqlparser:select_from(Select),
@ -386,12 +386,12 @@ do_delete_rule(RuleId) ->
ok ok
end. end.
parse_outputs(Outputs) -> parse_actions(Actions) ->
[do_parse_output(Out) || Out <- Outputs]. [do_parse_action(Act) || Act <- Actions].
do_parse_output(Output) when is_map(Output) -> do_parse_action(Action) when is_map(Action) ->
emqx_rule_outputs:parse_output(Output); emqx_rule_actions:parse_action(Action);
do_parse_output(BridgeChannelId) when is_binary(BridgeChannelId) -> do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
BridgeChannelId. BridgeChannelId.
get_all_records(Tab) -> get_all_records(Tab) ->

View File

@ -65,11 +65,11 @@ end).
'failed' => FAIL, 'failed' => FAIL,
'failed.exception' => FAIL_EX, 'failed.exception' => FAIL_EX,
'failed.no_result' => FAIL_NORES, 'failed.no_result' => FAIL_NORES,
'outputs.total' => O_TOTAL, 'actions.total' => O_TOTAL,
'outputs.failed' => O_FAIL, 'actions.failed' => O_FAIL,
'outputs.failed.out_of_service' => O_FAIL_OOS, 'actions.failed.out_of_service' => O_FAIL_OOS,
'outputs.failed.unknown' => O_FAIL_UNKNOWN, 'actions.failed.unknown' => O_FAIL_UNKNOWN,
'outputs.success' => O_SUCC, 'actions.success' => O_SUCC,
'matched.rate' => RATE, 'matched.rate' => RATE,
'matched.rate.max' => RATE_MAX, 'matched.rate.max' => RATE_MAX,
'matched.rate.last5m' => RATE_5 'matched.rate.last5m' => RATE_5
@ -96,11 +96,11 @@ end).
'failed' := FAIL, 'failed' := FAIL,
'failed.exception' := FAIL_EX, 'failed.exception' := FAIL_EX,
'failed.no_result' := FAIL_NORES, 'failed.no_result' := FAIL_NORES,
'outputs.total' := O_TOTAL, 'actions.total' := O_TOTAL,
'outputs.failed' := O_FAIL, 'actions.failed' := O_FAIL,
'outputs.failed.out_of_service' := O_FAIL_OOS, 'actions.failed.out_of_service' := O_FAIL_OOS,
'outputs.failed.unknown' := O_FAIL_UNKNOWN, 'actions.failed.unknown' := O_FAIL_UNKNOWN,
'outputs.success' := O_SUCC, 'actions.success' := O_SUCC,
'matched.rate' := RATE, 'matched.rate' := RATE,
'matched.rate.max' := RATE_MAX, 'matched.rate.max' := RATE_MAX,
'matched.rate.last5m' := RATE_5 'matched.rate.last5m' := RATE_5
@ -362,7 +362,7 @@ format_rule_resp(#{
name := Name, name := Name,
created_at := CreatedAt, created_at := CreatedAt,
from := Topics, from := Topics,
outputs := Output, actions := Action,
sql := SQL, sql := SQL,
enable := Enable, enable := Enable,
description := Descr description := Descr
@ -372,7 +372,7 @@ format_rule_resp(#{
id => Id, id => Id,
name => Name, name => Name,
from => Topics, from => Topics,
outputs => format_output(Output), actions => format_action(Action),
sql => SQL, sql => SQL,
metrics => aggregate_metrics(NodeMetrics), metrics => aggregate_metrics(NodeMetrics),
node_metrics => NodeMetrics, node_metrics => NodeMetrics,
@ -384,18 +384,18 @@ format_rule_resp(#{
format_datetime(Timestamp, Unit) -> format_datetime(Timestamp, Unit) ->
list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])). list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])).
format_output(Outputs) -> format_action(Actions) ->
[do_format_output(Out) || Out <- Outputs]. [do_format_action(Act) || Act <- Actions].
do_format_output(#{mod := Mod, func := Func, args := Args}) -> do_format_action(#{mod := Mod, func := Func, args := Args}) ->
#{ #{
function => printable_function_name(Mod, Func), function => printable_function_name(Mod, Func),
args => maps:remove(preprocessed_tmpl, Args) args => maps:remove(preprocessed_tmpl, Args)
}; };
do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) ->
BridgeChannelId. BridgeChannelId.
printable_function_name(emqx_rule_outputs, Func) -> printable_function_name(emqx_rule_actions, Func) ->
Func; Func;
printable_function_name(Mod, Func) -> printable_function_name(Mod, Func) ->
list_to_binary(lists:concat([Mod, ":", Func])). list_to_binary(lists:concat([Mod, ":", Func])).
@ -411,11 +411,11 @@ get_rule_metrics(Id) ->
'failed' := Failed, 'failed' := Failed,
'failed.exception' := FailedEx, 'failed.exception' := FailedEx,
'failed.no_result' := FailedNoRes, 'failed.no_result' := FailedNoRes,
'outputs.total' := OTotal, 'actions.total' := OTotal,
'outputs.failed' := OFailed, 'actions.failed' := OFailed,
'outputs.failed.out_of_service' := OFailedOOS, 'actions.failed.out_of_service' := OFailedOOS,
'outputs.failed.unknown' := OFailedUnknown, 'actions.failed.unknown' := OFailedUnknown,
'outputs.success' := OFailedSucc 'actions.success' := OFailedSucc
}, },
rate := rate :=
#{ #{

View File

@ -64,14 +64,14 @@ fields("rules") ->
validator => fun ?MODULE:validate_sql/1 validator => fun ?MODULE:validate_sql/1
} }
)}, )},
{"outputs", {"actions",
sc( sc(
hoconsc:array(hoconsc:union(outputs())), hoconsc:array(hoconsc:union(actions())),
#{ #{
desc => ?DESC("rules_outputs"), desc => ?DESC("rules_actions"),
default => [], default => [],
example => [ example => [
<<"http:my_http_bridge">>, <<"webhook:my_webhook">>,
#{ #{
function => republish, function => republish,
args => #{ args => #{
@ -93,16 +93,16 @@ fields("rules") ->
} }
)} )}
]; ];
fields("builtin_output_republish") -> fields("builtin_action_republish") ->
[ [
{function, sc(republish, #{desc => ?DESC("republish_function")})}, {function, sc(republish, #{desc => ?DESC("republish_function")})},
{args, sc(ref("republish_args"), #{default => #{}})} {args, sc(ref("republish_args"), #{default => #{}})}
]; ];
fields("builtin_output_console") -> fields("builtin_action_console") ->
[ [
{function, sc(console, #{desc => ?DESC("console_function")})} {function, sc(console, #{desc => ?DESC("console_function")})}
%% we may support some args for the console output in the future %% we may support some args for the console action in the future
%, {args, sc(map(), #{desc => "The arguments of the built-in 'console' output", %, {args, sc(map(), #{desc => "The arguments of the built-in 'console' action",
% default => #{}})} % default => #{}})}
]; ];
fields("user_provided_function") -> fields("user_provided_function") ->
@ -169,10 +169,10 @@ desc("rule_engine") ->
?DESC("desc_rule_engine"); ?DESC("desc_rule_engine");
desc("rules") -> desc("rules") ->
?DESC("desc_rules"); ?DESC("desc_rules");
desc("builtin_output_republish") -> desc("builtin_action_republish") ->
?DESC("desc_builtin_output_republish"); ?DESC("desc_builtin_action_republish");
desc("builtin_output_console") -> desc("builtin_action_console") ->
?DESC("desc_builtin_output_console"); ?DESC("desc_builtin_action_console");
desc("user_provided_function") -> desc("user_provided_function") ->
?DESC("desc_user_provided_function"); ?DESC("desc_user_provided_function");
desc("republish_args") -> desc("republish_args") ->
@ -207,11 +207,11 @@ validate_rule_name(Name) ->
{error, Reason} {error, Reason}
end. end.
outputs() -> actions() ->
[ [
binary(), binary(),
ref("builtin_output_republish"), ref("builtin_action_republish"),
ref("builtin_output_console"), ref("builtin_action_console"),
ref("user_provided_function") ref("user_provided_function")
]. ].

View File

@ -122,7 +122,7 @@ do_apply_rule(
doeach := DoEach, doeach := DoEach,
incase := InCase, incase := InCase,
conditions := Conditions, conditions := Conditions,
outputs := Outputs actions := Actions
}, },
Input Input
) -> ) ->
@ -145,7 +145,7 @@ do_apply_rule(
_ -> _ ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
end, end,
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; {ok, [handle_action_list(RuleId, Actions, Coll, Input) || Coll <- Collection2]};
false -> false ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch} {error, nomatch}
@ -156,7 +156,7 @@ do_apply_rule(
is_foreach := false, is_foreach := false,
fields := Fields, fields := Fields,
conditions := Conditions, conditions := Conditions,
outputs := Outputs actions := Actions
}, },
Input Input
) -> ) ->
@ -172,7 +172,7 @@ do_apply_rule(
of of
true -> true ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
{ok, handle_output_list(RuleId, Outputs, Selected, Input)}; {ok, handle_action_list(RuleId, Actions, Selected, Input)};
false -> false ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
{error, nomatch} {error, nomatch}
@ -185,24 +185,24 @@ clear_rule_payload() ->
select_and_transform(Fields, Input) -> select_and_transform(Fields, Input) ->
select_and_transform(Fields, Input, #{}). select_and_transform(Fields, Input, #{}).
select_and_transform([], _Input, Output) -> select_and_transform([], _Input, Action) ->
Output; Action;
select_and_transform(['*' | More], Input, Output) -> select_and_transform(['*' | More], Input, Action) ->
select_and_transform(More, Input, maps:merge(Output, Input)); select_and_transform(More, Input, maps:merge(Action, Input));
select_and_transform([{as, Field, Alias} | More], Input, Output) -> select_and_transform([{as, Field, Alias} | More], Input, Action) ->
Val = eval(Field, Input), Val = eval(Field, Input),
select_and_transform( select_and_transform(
More, More,
nested_put(Alias, Val, Input), nested_put(Alias, Val, Input),
nested_put(Alias, Val, Output) nested_put(Alias, Val, Action)
); );
select_and_transform([Field | More], Input, Output) -> select_and_transform([Field | More], Input, Action) ->
Val = eval(Field, Input), Val = eval(Field, Input),
Key = alias(Field), Key = alias(Field),
select_and_transform( select_and_transform(
More, More,
nested_put(Key, Val, Input), nested_put(Key, Val, Input),
nested_put(Key, Val, Output) nested_put(Key, Val, Action)
). ).
%% FOREACH Clause %% FOREACH Clause
@ -210,27 +210,27 @@ select_and_transform([Field | More], Input, Output) ->
select_and_collect(Fields, Input) -> select_and_collect(Fields, Input) ->
select_and_collect(Fields, Input, {#{}, {'item', []}}). select_and_collect(Fields, Input, {#{}, {'item', []}}).
select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) -> select_and_collect([{as, Field, {_, A} = Alias}], Input, {Action, _}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
{nested_put(Alias, Val, Output), {A, ensure_list(Val)}}; {nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
select_and_collect([{as, Field, Alias} | More], Input, {Output, LastKV}) -> select_and_collect([{as, Field, Alias} | More], Input, {Action, LastKV}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
select_and_collect( select_and_collect(
More, More,
nested_put(Alias, Val, Input), nested_put(Alias, Val, Input),
{nested_put(Alias, Val, Output), LastKV} {nested_put(Alias, Val, Action), LastKV}
); );
select_and_collect([Field], Input, {Output, _}) -> select_and_collect([Field], Input, {Action, _}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
Key = alias(Field), Key = alias(Field),
{nested_put(Key, Val, Output), {'item', ensure_list(Val)}}; {nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
select_and_collect([Field | More], Input, {Output, LastKV}) -> select_and_collect([Field | More], Input, {Action, LastKV}) ->
Val = eval(Field, Input), Val = eval(Field, Input),
Key = alias(Field), Key = alias(Field),
select_and_collect( select_and_collect(
More, More,
nested_put(Key, Val, Input), nested_put(Key, Val, Input),
{nested_put(Key, Val, Output), LastKV} {nested_put(Key, Val, Action), LastKV}
). ).
%% Filter each item got from FOREACH %% Filter each item got from FOREACH
@ -312,43 +312,43 @@ number(Bin) ->
error:badarg -> binary_to_float(Bin) error:badarg -> binary_to_float(Bin)
end. end.
handle_output_list(RuleId, Outputs, Selected, Envs) -> handle_action_list(RuleId, Actions, Selected, Envs) ->
[handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. [handle_action(RuleId, Act, Selected, Envs) || Act <- Actions].
handle_output(RuleId, OutId, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.total'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
try try
Result = do_handle_output(OutId, Selected, Envs), Result = do_handle_action(ActId, Selected, Envs),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.success'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'),
Result Result
catch catch
throw:out_of_service -> throw:out_of_service ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
ok = emqx_metrics_worker:inc( ok = emqx_metrics_worker:inc(
rule_metrics, RuleId, 'outputs.failed.out_of_service' rule_metrics, RuleId, 'actions.failed.out_of_service'
), ),
?SLOG(warning, #{msg => "out_of_service", output => OutId}); ?SLOG(warning, #{msg => "out_of_service", action => ActId});
Err:Reason:ST -> Err:Reason:ST ->
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
?SLOG(error, #{ ?SLOG(error, #{
msg => "output_failed", msg => "action_failed",
output => OutId, action => ActId,
exception => Err, exception => Err,
reason => Reason, reason => Reason,
stacktrace => ST stacktrace => ST
}) })
end. end.
do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> do_handle_action(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), ?TRACE("BRIDGE", "bridge_action", #{bridge_id => BridgeId}),
case emqx_bridge:send_message(BridgeId, Selected) of case emqx_bridge:send_message(BridgeId, Selected) of
{error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
throw(out_of_service); throw(out_of_service);
Result -> Result ->
Result Result
end; end;
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
%% the function can also throw 'out_of_service' %% the function can also throw 'out_of_service'
Mod:Func(Selected, Envs, Args). Mod:Func(Selected, Envs, Args).

View File

@ -51,7 +51,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
id => RuleId, id => RuleId,
sql => Sql, sql => Sql,
from => EventTopics, from => EventTopics,
outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}], actions => [#{mod => ?MODULE, func => get_selected_data, args => #{}}],
enable => true, enable => true,
is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), is_foreach => emqx_rule_sqlparser:select_is_foreach(Select),
fields => emqx_rule_sqlparser:select_fields(Select), fields => emqx_rule_sqlparser:select_fields(Select),

View File

@ -35,6 +35,7 @@ all() ->
{group, registry}, {group, registry},
{group, runtime}, {group, runtime},
{group, events}, {group, events},
{group, telemetry},
{group, bugs} {group, bugs}
]. ].
@ -91,6 +92,10 @@ groups() ->
t_sqlparse_invalid_json t_sqlparse_invalid_json
]}, ]},
{events, [], [t_events]}, {events, [], [t_events]},
{telemetry, [], [
t_get_basic_usage_info_0,
t_get_basic_usage_info_1
]},
{bugs, [], [ {bugs, [], [
t_sqlparse_payload_as, t_sqlparse_payload_as,
t_sqlparse_nested_get t_sqlparse_nested_get
@ -166,9 +171,9 @@ init_per_testcase(t_events, Config) ->
#{ #{
id => <<"rule:t_events">>, id => <<"rule:t_events">>,
sql => SQL, sql => SQL,
outputs => [ actions => [
#{ #{
function => <<"emqx_rule_engine_SUITE:output_record_triggered_events">>, function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
args => #{} args => #{}
} }
], ],
@ -194,7 +199,7 @@ t_create_rule(_Config) ->
#{ #{
sql => <<"select * from \"t/a\"">>, sql => <<"select * from \"t/a\"">>,
id => <<"t_create_rule">>, id => <<"t_create_rule">>,
outputs => [#{function => console}], actions => [#{function => console}],
description => <<"debug rule">> description => <<"debug rule">>
} }
), ),
@ -256,7 +261,7 @@ t_create_existing_rule(_Config) ->
#{ #{
id => <<"an_existing_rule">>, id => <<"an_existing_rule">>,
sql => <<"select * from \"t/#\"">>, sql => <<"select * from \"t/#\"">>,
outputs => [#{function => console}] actions => [#{function => console}]
} }
), ),
{ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>), {ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>),
@ -554,12 +559,12 @@ t_match_atom_and_binary(_Config) ->
"SELECT connected_at as ts, * " "SELECT connected_at as ts, * "
"FROM \"$events/client_connected\" " "FROM \"$events/client_connected\" "
"WHERE username = 'emqx2' ", "WHERE username = 'emqx2' ",
Repub = republish_output(<<"t2">>, <<"user:${ts}">>), Repub = republish_action(<<"t2">>, <<"user:${ts}">>),
{ok, TopicRule} = emqx_rule_engine:create_rule( {ok, TopicRule} = emqx_rule_engine:create_rule(
#{ #{
sql => SQL, sql => SQL,
id => ?TMP_RULEID, id => ?TMP_RULEID,
outputs => [Repub] actions => [Repub]
} }
), ),
{ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]),
@ -750,8 +755,8 @@ t_sqlselect_001(_Config) ->
Sql2 = Sql2 =
"SELECT jq('.a|.[]', " "SELECT jq('.a|.[]', "
"'{\"a\": [{\"b\": 1}, {\"b\": 2}, {\"b\": 3}]}') " "'{\"a\": [{\"b\": 1}, {\"b\": 2}, {\"b\": 3}]}') "
"as jq_output, " "as jq_action, "
" jq_output[1].b as first_b from \"t/#\" ", " jq_action[1].b as first_b from \"t/#\" ",
?assertMatch( ?assertMatch(
{ok, #{<<"first_b">> := 1}}, {ok, #{<<"first_b">> := 1}},
emqx_rule_sqltester:test( emqx_rule_sqltester:test(
@ -771,12 +776,12 @@ t_sqlselect_01(_Config) ->
"SELECT json_decode(payload) as p, payload " "SELECT json_decode(payload) as p, payload "
"FROM \"t3/#\", \"t1\" " "FROM \"t3/#\", \"t1\" "
"WHERE p.x = 1", "WHERE p.x = 1",
Repub = republish_output(<<"t2">>), Repub = republish_action(<<"t2">>),
{ok, TopicRule1} = emqx_rule_engine:create_rule( {ok, TopicRule1} = emqx_rule_engine:create_rule(
#{ #{
sql => SQL, sql => SQL,
id => ?TMP_RULEID, id => ?TMP_RULEID,
outputs => [Repub] actions => [Repub]
} }
), ),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
@ -817,12 +822,12 @@ t_sqlselect_02(_Config) ->
"SELECT * " "SELECT * "
"FROM \"t3/#\", \"t1\" " "FROM \"t3/#\", \"t1\" "
"WHERE payload.x = 1", "WHERE payload.x = 1",
Repub = republish_output(<<"t2">>), Repub = republish_action(<<"t2">>),
{ok, TopicRule1} = emqx_rule_engine:create_rule( {ok, TopicRule1} = emqx_rule_engine:create_rule(
#{ #{
sql => SQL, sql => SQL,
id => ?TMP_RULEID, id => ?TMP_RULEID,
outputs => [Repub] actions => [Repub]
} }
), ),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
@ -863,12 +868,12 @@ t_sqlselect_1(_Config) ->
"SELECT json_decode(payload) as p, payload " "SELECT json_decode(payload) as p, payload "
"FROM \"t1\" " "FROM \"t1\" "
"WHERE p.x = 1 and p.y = 2", "WHERE p.x = 1 and p.y = 2",
Repub = republish_output(<<"t2">>), Repub = republish_action(<<"t2">>),
{ok, TopicRule} = emqx_rule_engine:create_rule( {ok, TopicRule} = emqx_rule_engine:create_rule(
#{ #{
sql => SQL, sql => SQL,
id => ?TMP_RULEID, id => ?TMP_RULEID,
outputs => [Repub] actions => [Repub]
} }
), ),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
@ -898,12 +903,12 @@ t_sqlselect_1(_Config) ->
t_sqlselect_2(_Config) -> t_sqlselect_2(_Config) ->
%% recursively republish to t2 %% recursively republish to t2
SQL = "SELECT * FROM \"t2\" ", SQL = "SELECT * FROM \"t2\" ",
Repub = republish_output(<<"t2">>), Repub = republish_action(<<"t2">>),
{ok, TopicRule} = emqx_rule_engine:create_rule( {ok, TopicRule} = emqx_rule_engine:create_rule(
#{ #{
sql => SQL, sql => SQL,
id => ?TMP_RULEID, id => ?TMP_RULEID,
outputs => [Repub] actions => [Repub]
} }
), ),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
@ -932,12 +937,12 @@ t_sqlselect_3(_Config) ->
"SELECT * " "SELECT * "
"FROM \"$events/client_connected\" " "FROM \"$events/client_connected\" "
"WHERE username = 'emqx1'", "WHERE username = 'emqx1'",
Repub = republish_output(<<"t2">>, <<"clientid=${clientid}">>), Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
{ok, TopicRule} = emqx_rule_engine:create_rule( {ok, TopicRule} = emqx_rule_engine:create_rule(
#{ #{
sql => SQL, sql => SQL,
id => ?TMP_RULEID, id => ?TMP_RULEID,
outputs => [Repub] actions => [Repub]
} }
), ),
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
@ -2270,12 +2275,12 @@ t_get_basic_usage_info_1(_Config) ->
#{ #{
id => <<"rule:t_get_basic_usage_info:1">>, id => <<"rule:t_get_basic_usage_info:1">>,
sql => <<"select 1 from topic">>, sql => <<"select 1 from topic">>,
outputs => actions =>
[ [
#{function => <<"erlang:hibernate">>, args => #{}}, #{function => <<"erlang:hibernate">>, args => #{}},
#{function => console}, #{function => console},
<<"http:my_http_bridge">>, <<"webhook:my_webhook">>,
<<"http:my_http_bridge">> <<"webhook:my_webhook">>
] ]
} }
), ),
@ -2284,10 +2289,10 @@ t_get_basic_usage_info_1(_Config) ->
#{ #{
id => <<"rule:t_get_basic_usage_info:2">>, id => <<"rule:t_get_basic_usage_info:2">>,
sql => <<"select 1 from topic">>, sql => <<"select 1 from topic">>,
outputs => actions =>
[ [
<<"mqtt:my_mqtt_bridge">>, <<"mqtt:my_mqtt_bridge">>,
<<"http:my_http_bridge">> <<"webhook:my_webhook">>
] ]
} }
), ),
@ -2297,7 +2302,7 @@ t_get_basic_usage_info_1(_Config) ->
referenced_bridges => referenced_bridges =>
#{ #{
mqtt => 1, mqtt => 1,
http => 3 webhook => 3
} }
}, },
emqx_rule_engine:get_basic_usage_info() emqx_rule_engine:get_basic_usage_info()
@ -2308,9 +2313,9 @@ t_get_basic_usage_info_1(_Config) ->
%% Internal helpers %% Internal helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
republish_output(Topic) -> republish_action(Topic) ->
republish_output(Topic, <<"${payload}">>). republish_action(Topic, <<"${payload}">>).
republish_output(Topic, Payload) -> republish_action(Topic, Payload) ->
#{ #{
function => republish, function => republish,
args => #{payload => Payload, topic => Topic, qos => 0, retain => false} args => #{payload => Payload, topic => Topic, qos => 0, retain => false}
@ -2337,13 +2342,13 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) ->
fields => [<<"*">>], fields => [<<"*">>],
is_foreach => false, is_foreach => false,
conditions => {}, conditions => {},
outputs => [#{mod => emqx_rule_outputs, func => console, args => #{}}], actions => [#{mod => emqx_rule_actions, func => console, args => #{}}],
description => <<"simple rule">>, description => <<"simple rule">>,
created_at => Ts created_at => Ts
}. }.
output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) -> action_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) ->
ct:pal("applying output_record_triggered_events: ~p", [Data]), ct:pal("applying action_record_triggered_events: ~p", [Data]),
ets:insert(events_record_tab, {EventName, Data}). ets:insert(events_record_tab, {EventName, Data}).
verify_event(EventName) -> verify_event(EventName) ->

View File

@ -33,7 +33,7 @@ t_crud_rule_api(_Config) ->
<<"description">> => <<"A simple rule">>, <<"description">> => <<"A simple rule">>,
<<"enable">> => true, <<"enable">> => true,
<<"id">> => RuleID, <<"id">> => RuleID,
<<"outputs">> => [#{<<"function">> => <<"console">>}], <<"actions">> => [#{<<"function">> => <<"console">>}],
<<"sql">> => <<"SELECT * from \"t/1\"">>, <<"sql">> => <<"SELECT * from \"t/1\"">>,
<<"name">> => <<"test_rule">> <<"name">> => <<"test_rule">>
}, },