From 71de9616d359c8a9dea0ba7156de6f8e2540b831 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 25 May 2022 16:02:07 +0800 Subject: [PATCH 1/3] refactor(rule): rename outputs -> actions --- .../i18n/emqx_bridge_http_schema.conf | 2 +- .../i18n/emqx_connector_mqtt_schema.conf | 2 +- .../src/mqtt/emqx_connector_mqtt_schema.erl | 2 +- .../test/emqx_connector_api_SUITE.erl | 26 +++---- .../test/emqx_telemetry_SUITE.erl | 6 +- .../etc/emqx_rule_engine.conf | 2 +- .../i18n/emqx_rule_api_schema.conf | 30 ++++---- .../i18n/emqx_rule_engine_schema.conf | 40 +++++------ apps/emqx_rule_engine/include/rule_engine.hrl | 16 ++--- ...rule_outputs.erl => emqx_rule_actions.erl} | 56 +++++++-------- .../src/emqx_rule_api_schema.erl | 20 +++--- .../emqx_rule_engine/src/emqx_rule_engine.erl | 28 ++++---- .../src/emqx_rule_engine_api.erl | 44 ++++++------ .../src/emqx_rule_engine_schema.erl | 28 ++++---- .../src/emqx_rule_runtime.erl | 72 +++++++++---------- .../src/emqx_rule_sqltester.erl | 2 +- .../test/emqx_rule_engine_SUITE.erl | 52 +++++++------- .../test/emqx_rule_engine_api_SUITE.erl | 2 +- 18 files changed, 215 insertions(+), 215 deletions(-) rename apps/emqx_rule_engine/src/{emqx_rule_outputs.erl => emqx_rule_actions.erl} (80%) diff --git a/apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf index 7e70eda5c..bbe9d1aab 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf @@ -52,7 +52,7 @@ HTTP Bridge 的 URL。
en: """ The MQTT topic filter to be forwarded to the HTTP server. All MQTT 'PUBLISH' messages with the topic matching the local_topic will be forwarded.
-NOTE: if this bridge is used as the output of a rule (EMQX rule engine), and also local_topic is +NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that match local_topic will be forwarded. """ diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf index 7690596e7..ae4c2ad1d 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf @@ -350,7 +350,7 @@ Ingress 模式定义了这个 bridge 如何从远程 MQTT broker 接收消息, en: """ The egress config defines how this bridge forwards messages from the local broker to the remote broker.
Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded. +NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches local_topic will be forwarded. """ zh: """ Egress 模式定义了 bridge 如何将消息从本地 broker 转发到远程 broker。
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index e4eea39db..8b6994dd4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -296,7 +296,7 @@ egress_desc() -> "The egress config defines how this bridge forwards messages from the local broker to the remote\n" "broker.
\n" "Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.
\n" - "NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic\n" + "NOTE: if this bridge is used as the action of a rule (emqx rule engine), and also local_topic\n" "is configured, then both the data got from the rule and the MQTT messages that matches\n" "local_topic will be forwarded.\n". diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 82d627212..8c2405fde 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -610,7 +610,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> #{ <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, <<"enable">> => true, - <<"outputs">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], + <<"actions">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> } ), @@ -653,14 +653,14 @@ t_ingress_mqtt_bridge_with_rules(_) -> <<"matched.rate">> := _, <<"matched.rate.max">> := _, <<"matched.rate.last5m">> := _, - <<"outputs.total">> := 1, - <<"outputs.success">> := 1, - <<"outputs.failed">> := 0, - <<"outputs.failed.out_of_service">> := 0, - <<"outputs.failed.unknown">> := 0 + <<"actions.total">> := 1, + <<"actions.success">> := 1, + <<"actions.failed">> := 0, + <<"actions.failed.out_of_service">> := 0, + <<"actions.failed.unknown">> := 0 } } = jsx:decode(Rule1), - %% we also check if the outputs of the rule is triggered + %% we also check if the actions of the rule is triggered ?assertMatch( #{ inspect := #{ @@ -709,7 +709,7 @@ t_egress_mqtt_bridge_with_rules(_) -> #{ <<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>, <<"enable">> => true, - <<"outputs">> => [BridgeIDEgress], + <<"actions">> => [BridgeIDEgress], <<"sql">> => <<"SELECT * from \"t/1\"">> } ), @@ -760,11 +760,11 @@ t_egress_mqtt_bridge_with_rules(_) -> <<"matched.rate">> := _, <<"matched.rate.max">> := _, <<"matched.rate.last5m">> := _, - <<"outputs.total">> := 1, - <<"outputs.success">> := 1, - <<"outputs.failed">> := 0, - <<"outputs.failed.out_of_service">> := 0, - <<"outputs.failed.unknown">> := 0 + <<"actions.total">> := 1, + <<"actions.success">> := 1, + <<"actions.failed">> := 0, + <<"actions.failed.out_of_service">> := 0, + <<"actions.failed.unknown">> := 0 } } = jsx:decode(Rule1), %% we should receive a message on the "remote" broker, with specified topic diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index ad195196e..8281fed21 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -741,7 +741,7 @@ setup_fake_rule_engine_data() -> #{ id => <<"rule:t_get_basic_usage_info:1">>, sql => <<"select 1 from topic">>, - outputs => + actions => [ #{function => <<"erlang:hibernate">>, args => #{}}, #{function => console}, @@ -755,7 +755,7 @@ setup_fake_rule_engine_data() -> #{ id => <<"rule:t_get_basic_usage_info:2">>, sql => <<"select 1 from topic">>, - outputs => + actions => [ <<"mqtt:my_mqtt_bridge">>, <<"http:my_http_bridge">> @@ -767,7 +767,7 @@ setup_fake_rule_engine_data() -> #{ id => <<"rule:t_get_basic_usage_info:3">>, sql => <<"select 1 from \"$bridges/mqtt:mqtt_in\"">>, - outputs => + actions => [ #{function => console} ] diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index fba24c6fb..cbecb526c 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -8,7 +8,7 @@ rule_engine { # description = "A simple rule that republishs MQTT messages from topic 't/1' to 't/2'" # enable = true # sql = "SELECT * FROM \"t/1\"" - # outputs = [ + # actions = [ # { # function = republish # args = { diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_api_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_api_schema.conf index b43041bbb..88a5313ad 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_api_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_api_schema.conf @@ -451,57 +451,57 @@ emqx_rule_api_schema { } } - metrics_outputs_total { + metrics_actions_total { desc { - en: "How much times the outputs are called by the rule. This value may several times of 'matched', depending on the number of the outputs of the rule." + en: "How much times the actions are called by the rule. This value may several times of 'matched', depending on the number of the actions of the rule." zh: "规则调用输出的次数。 该值可能是“sql.matched”的几倍,具体取决于规则输出的数量。" } label: { - en: "Output Total" + en: "Action Total" zh: "调用输出次数" } } - metrics_outputs_success { + metrics_actions_success { desc { - en: "How much times the rule success to call the outputs." + en: "How much times the rule success to call the actions." zh: "规则成功调用输出的次数。" } label: { - en: "Success Output" + en: "Success Action" zh: "成功调用输出次数" } } - metrics_outputs_failed { + metrics_actions_failed { desc { - en: "How much times the rule failed to call the outputs." + en: "How much times the rule failed to call the actions." zh: "规则调用输出失败的次数。" } label: { - en: "Failed Output" + en: "Failed Action" zh: "调用输出失败次数" } } - metrics_outputs_failed_out_of_service { + metrics_actions_failed_out_of_service { desc { - en: "How much times the rule failed to call outputs due to the output is out of service. For example, a bridge is disabled or stopped." + en: "How much times the rule failed to call actions due to the action is out of service. For example, a bridge is disabled or stopped." zh: "由于输出停止服务而导致规则调用输出失败的次数。 例如,桥接被禁用或停止。" } label: { - en: "Fail Output" + en: "Fail Action" zh: "调用输出失败次数" } } - metrics_outputs_failed_unknown { + metrics_actions_failed_unknown { desc { - en: "How much times the rule failed to call outputs due to to an unknown error." + en: "How much times the rule failed to call actions due to to an unknown error." zh: "由于未知错误,规则调用输出失败的次数。" } label: { - en: "Fail Output" + en: "Fail Action" zh: "调用输出失败次数" } } diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf index ae2b821c2..7f5a81a70 100644 --- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf +++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf @@ -28,21 +28,21 @@ Example: SELECT * FROM "test/topic" WHERE payload.x = 1 } } - rules_outputs { + rules_actions { desc { en: """ -A list of outputs of the rule. -An output can be a string that refers to the channel ID of an EMQX bridge, or an object +A list of actions of the rule. +An action can be a string that refers to the channel ID of an EMQX bridge, or an object that refers to a function. There a some built-in functions like "republish" and "console", and we also support user provided functions in the format: "{module}:{function}". -The outputs in the list are executed sequentially. -This means that if one of the output is executing slowly, all the following outputs will not +The actions in the list are executed sequentially. +This means that if one of the action is executing slowly, all the following actions will not be executed until it returns. -If one of the output crashed, all other outputs come after it will still be executed, in the +If one of the action crashed, all other actions come after it will still be executed, in the original order. -If there's any error when running an output, there will be an error message, and the 'failure' -counter of the function output or the bridge channel will increase. +If there's any error when running an action, there will be an error message, and the 'failure' +counter of the function action or the bridge channel will increase. """ zh: """ 规则的动作列表。 @@ -94,7 +94,7 @@ counter of the function output or the bridge channel will increase. console_function { desc { - en: """Print the outputs to the console""" + en: """Print the actions to the console""" zh: "将输出打印到控制台" } label: { @@ -111,12 +111,12 @@ Where {module} is the Erlang callback module and {function} is the Erlang functi To write your own function, checkout the function console and republish in the source file: -apps/emqx_rule_engine/src/emqx_rule_outputs.erl as an example. +apps/emqx_rule_engine/src/emqx_rule_actions.erl as an example. """ zh: """ 用户提供的函数。 格式应为:'{module}:{function}'。 其中 {module} 是 Erlang 回调模块, {function} 是 Erlang 函数。 -要编写自己的函数,请检查源文件:apps/emqx_rule_engine/src/emqx_rule_outputs.erl 中的示例函数 consolerepublish 。 +要编写自己的函数,请检查源文件:apps/emqx_rule_engine/src/emqx_rule_actions.erl 中的示例函数 consolerepublish 。 """ } label: { @@ -130,11 +130,11 @@ To write your own function, checkout the function console and en: """ The args will be passed as the 3rd argument to module:function/3, checkout the function console and republish in the source file: -apps/emqx_rule_engine/src/emqx_rule_outputs.erl as an example. +apps/emqx_rule_engine/src/emqx_rule_actions.erl as an example. """ zh: """ 用户提供的参数将作为函数 module:function/3 的第三个参数, -请检查源文件:apps/emqx_rule_engine/src/emqx_rule_outputs.erl 中的示例函数 consolerepublish 。 +请检查源文件:apps/emqx_rule_engine/src/emqx_rule_actions.erl 中的示例函数 consolerepublish 。 """ } label: { @@ -272,9 +272,9 @@ of the rule, then the string "undefined" is used. } } - desc_builtin_output_republish { + desc_builtin_action_republish { desc { - en: """Configuration for a built-in output.""" + en: """Configuration for a built-in action.""" zh: """配置重新发布。""" } label: { @@ -283,20 +283,20 @@ of the rule, then the string "undefined" is used. } } - desc_builtin_output_console { + desc_builtin_action_console { desc { - en: """Configuration for a built-in output.""" + en: """Configuration for a built-in action.""" zh: """配置打印到控制台""" } label: { - en: "Output Console Configuration" + en: "Action Console Configuration" zh: "配置打印到控制台" } } desc_user_provided_function { desc { - en: """Configuration for a built-in output.""" + en: """Configuration for a built-in action.""" zh: """配置用户函数""" } label: { @@ -307,7 +307,7 @@ of the rule, then the string "undefined" is used. desc_republish_args { desc { - en: """The arguments of the built-in 'republish' output.One can use variables in the args. + en: """The arguments of the built-in 'republish' action.One can use variables in the args. The variables are selected by the rule. For example, if the rule SQL is defined as following: SELECT clientid, qos, payload FROM "t/1" diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 20c339377..77d371711 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -31,16 +31,16 @@ -type selected_data() :: map(). -type envs() :: map(). --type builtin_output_func() :: republish | console. --type builtin_output_module() :: emqx_rule_outputs. +-type builtin_action_func() :: republish | console. +-type builtin_action_module() :: emqx_rule_actions. -type bridge_channel_id() :: binary(). --type output_fun_args() :: map(). +-type action_fun_args() :: map(). --type output() :: +-type action() :: #{ - mod := builtin_output_module() | module(), - func := builtin_output_func() | atom(), - args => output_fun_args() + mod := builtin_action_module() | module(), + func := builtin_action_func() | atom(), + args => action_fun_args() } | bridge_channel_id(). @@ -49,7 +49,7 @@ id := rule_id(), name := binary(), sql := binary(), - outputs := [output()], + actions := [action()], enable := boolean(), description => binary(), %% epoch in millisecond precision diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl similarity index 80% rename from apps/emqx_rule_engine/src/emqx_rule_outputs.erl rename to apps/emqx_rule_engine/src/emqx_rule_actions.erl index a0d06c978..998fc1a5e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -15,43 +15,43 @@ %%-------------------------------------------------------------------- %% Define the default actions. --module(emqx_rule_outputs). +-module(emqx_rule_actions). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx.hrl"). %% APIs --export([parse_output/1]). +-export([parse_action/1]). -%% callbacks of emqx_rule_output --export([pre_process_output_args/2]). +%% callbacks of emqx_rule_action +-export([pre_process_action_args/2]). -%% output functions +%% action functions -export([ console/3, republish/3 ]). --optional_callbacks([pre_process_output_args/2]). +-optional_callbacks([pre_process_action_args/2]). --callback pre_process_output_args(FuncName :: atom(), output_fun_args()) -> output_fun_args(). +-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args(). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- -parse_output(#{function := OutputFunc} = Output) -> - {Mod, Func} = parse_output_func(OutputFunc), +parse_action(#{function := ActionFunc} = Action) -> + {Mod, Func} = parse_action_func(ActionFunc), #{ mod => Mod, func => Func, - args => pre_process_args(Mod, Func, maps:get(args, Output, #{})) + args => pre_process_args(Mod, Func, maps:get(args, Action, #{})) }. %%-------------------------------------------------------------------- -%% callbacks of emqx_rule_output +%% callbacks of emqx_rule_action %%-------------------------------------------------------------------- -pre_process_output_args( +pre_process_action_args( republish, #{ topic := Topic, @@ -68,17 +68,17 @@ pre_process_output_args( payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) } }; -pre_process_output_args(_, Args) -> +pre_process_action_args(_, Args) -> Args. %%-------------------------------------------------------------------- -%% output functions +%% action functions %%-------------------------------------------------------------------- -spec console(map(), map(), map()) -> any(). console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) -> ?ULOG( - "[rule output] ~ts~n" - "\tOutput Data: ~p~n" + "[rule action] ~ts~n" + "\tAction Data: ~p~n" "\tEnvs: ~p~n", [RuleId, Selected, Envs] ). @@ -135,36 +135,36 @@ republish( %%-------------------------------------------------------------------- %% internal functions %%-------------------------------------------------------------------- -parse_output_func(OutputFunc) -> - {Mod, Func} = get_output_mod_func(OutputFunc), +parse_action_func(ActionFunc) -> + {Mod, Func} = get_action_mod_func(ActionFunc), assert_function_supported(Mod, Func), {Mod, Func}. -get_output_mod_func(OutputFunc) when is_atom(OutputFunc) -> - {emqx_rule_outputs, OutputFunc}; -get_output_mod_func(OutputFunc) when is_binary(OutputFunc) -> +get_action_mod_func(ActionFunc) when is_atom(ActionFunc) -> + {emqx_rule_actions, ActionFunc}; +get_action_mod_func(ActionFunc) when is_binary(ActionFunc) -> ToAtom = fun(Bin) -> try binary_to_existing_atom(Bin) of Atom -> Atom catch - error:badarg -> error({unknown_output_function, OutputFunc}) + error:badarg -> error({unknown_action_function, ActionFunc}) end end, - case string:split(OutputFunc, ":", all) of - [Func1] -> {emqx_rule_outputs, ToAtom(Func1)}; + case string:split(ActionFunc, ":", all) of + [Func1] -> {emqx_rule_actions, ToAtom(Func1)}; [Mod1, Func1] -> {ToAtom(Mod1), ToAtom(Func1)}; - _ -> error({invalid_output_function, OutputFunc}) + _ -> error({invalid_action_function, ActionFunc}) end. assert_function_supported(Mod, Func) -> case erlang:function_exported(Mod, Func, 3) of true -> ok; - false -> error({output_function_not_supported, Func}) + false -> error({action_function_not_supported, Func}) end. pre_process_args(Mod, Func, Args) -> - case erlang:function_exported(Mod, pre_process_output_args, 2) of - true -> Mod:pre_process_output_args(Func, Args); + case erlang:function_exported(Mod, pre_process_action_args, 2) of + true -> Mod:pre_process_action_args(Func, Args); false -> Args end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index 28f22fd40..5c1ef9cd0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -124,25 +124,25 @@ fields("metrics") -> sc(non_neg_integer(), #{ desc => ?DESC("metrics_sql_failed_unknown") })}, - {"outputs.total", + {"actions.total", sc(non_neg_integer(), #{ - desc => ?DESC("metrics_outputs_total") + desc => ?DESC("metrics_actions_total") })}, - {"outputs.success", + {"actions.success", sc(non_neg_integer(), #{ - desc => ?DESC("metrics_outputs_success") + desc => ?DESC("metrics_actions_success") })}, - {"outputs.failed", + {"actions.failed", sc(non_neg_integer(), #{ - desc => ?DESC("metrics_outputs_failed") + desc => ?DESC("metrics_actions_failed") })}, - {"outputs.failed.out_of_service", + {"actions.failed.out_of_service", sc(non_neg_integer(), #{ - desc => ?DESC("metrics_outputs_failed_out_of_service") + desc => ?DESC("metrics_actions_failed_out_of_service") })}, - {"outputs.failed.unknown", + {"actions.failed.unknown", sc(non_neg_integer(), #{ - desc => ?DESC("metrics_outputs_failed_unknown") + desc => ?DESC("metrics_actions_failed_unknown") })} ]; fields("node_metrics") -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index fc522a59d..fe46b6867 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -88,11 +88,11 @@ 'failed', 'failed.exception', 'failed.no_result', - 'outputs.total', - 'outputs.success', - 'outputs.failed', - 'outputs.failed.out_of_service', - 'outputs.failed.unknown' + 'actions.total', + 'actions.success', + 'actions.failed', + 'actions.failed.out_of_service', + 'actions.failed.unknown' ]). -define(RATE_METRICS, ['matched']). @@ -265,9 +265,9 @@ get_basic_usage_info() -> NumRules = length(EnabledRules), ReferencedBridges = lists:foldl( - fun(#{outputs := Outputs, from := From}, Acc) -> + fun(#{actions := Actions, from := From}, Acc) -> BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From], - BridgeIDs1 = lists:filter(fun is_binary/1, Outputs), + BridgeIDs1 = lists:filter(fun is_binary/1, Actions), tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc) end, #{}, @@ -342,7 +342,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%------------------------------------------------------------------------------ -parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, CreatedAt) -> +parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> Rule = #{ @@ -352,7 +352,7 @@ parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, Creat updated_at => now_ms(), enable => maps:get(enable, Params, true), sql => Sql, - outputs => parse_outputs(Outputs), + actions => parse_actions(Actions), description => maps:get(description, Params, ""), %% -- calculated fields: from => emqx_rule_sqlparser:select_from(Select), @@ -386,12 +386,12 @@ do_delete_rule(RuleId) -> ok end. -parse_outputs(Outputs) -> - [do_parse_output(Out) || Out <- Outputs]. +parse_actions(Actions) -> + [do_parse_action(Act) || Act <- Actions]. -do_parse_output(Output) when is_map(Output) -> - emqx_rule_outputs:parse_output(Output); -do_parse_output(BridgeChannelId) when is_binary(BridgeChannelId) -> +do_parse_action(Action) when is_map(Action) -> + emqx_rule_actions:parse_action(Action); +do_parse_action(BridgeChannelId) when is_binary(BridgeChannelId) -> BridgeChannelId. get_all_records(Tab) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 3f43c2c47..0f4b4a53d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -65,11 +65,11 @@ end). 'failed' => FAIL, 'failed.exception' => FAIL_EX, 'failed.no_result' => FAIL_NORES, - 'outputs.total' => O_TOTAL, - 'outputs.failed' => O_FAIL, - 'outputs.failed.out_of_service' => O_FAIL_OOS, - 'outputs.failed.unknown' => O_FAIL_UNKNOWN, - 'outputs.success' => O_SUCC, + 'actions.total' => O_TOTAL, + 'actions.failed' => O_FAIL, + 'actions.failed.out_of_service' => O_FAIL_OOS, + 'actions.failed.unknown' => O_FAIL_UNKNOWN, + 'actions.success' => O_SUCC, 'matched.rate' => RATE, 'matched.rate.max' => RATE_MAX, 'matched.rate.last5m' => RATE_5 @@ -96,11 +96,11 @@ end). 'failed' := FAIL, 'failed.exception' := FAIL_EX, 'failed.no_result' := FAIL_NORES, - 'outputs.total' := O_TOTAL, - 'outputs.failed' := O_FAIL, - 'outputs.failed.out_of_service' := O_FAIL_OOS, - 'outputs.failed.unknown' := O_FAIL_UNKNOWN, - 'outputs.success' := O_SUCC, + 'actions.total' := O_TOTAL, + 'actions.failed' := O_FAIL, + 'actions.failed.out_of_service' := O_FAIL_OOS, + 'actions.failed.unknown' := O_FAIL_UNKNOWN, + 'actions.success' := O_SUCC, 'matched.rate' := RATE, 'matched.rate.max' := RATE_MAX, 'matched.rate.last5m' := RATE_5 @@ -362,7 +362,7 @@ format_rule_resp(#{ name := Name, created_at := CreatedAt, from := Topics, - outputs := Output, + actions := Action, sql := SQL, enable := Enable, description := Descr @@ -372,7 +372,7 @@ format_rule_resp(#{ id => Id, name => Name, from => Topics, - outputs => format_output(Output), + actions => format_action(Action), sql => SQL, metrics => aggregate_metrics(NodeMetrics), node_metrics => NodeMetrics, @@ -384,18 +384,18 @@ format_rule_resp(#{ format_datetime(Timestamp, Unit) -> list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])). -format_output(Outputs) -> - [do_format_output(Out) || Out <- Outputs]. +format_action(Actions) -> + [do_format_action(Act) || Act <- Actions]. -do_format_output(#{mod := Mod, func := Func, args := Args}) -> +do_format_action(#{mod := Mod, func := Func, args := Args}) -> #{ function => printable_function_name(Mod, Func), args => maps:remove(preprocessed_tmpl, Args) }; -do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> +do_format_action(BridgeChannelId) when is_binary(BridgeChannelId) -> BridgeChannelId. -printable_function_name(emqx_rule_outputs, Func) -> +printable_function_name(emqx_rule_actions, Func) -> Func; printable_function_name(Mod, Func) -> list_to_binary(lists:concat([Mod, ":", Func])). @@ -411,11 +411,11 @@ get_rule_metrics(Id) -> 'failed' := Failed, 'failed.exception' := FailedEx, 'failed.no_result' := FailedNoRes, - 'outputs.total' := OTotal, - 'outputs.failed' := OFailed, - 'outputs.failed.out_of_service' := OFailedOOS, - 'outputs.failed.unknown' := OFailedUnknown, - 'outputs.success' := OFailedSucc + 'actions.total' := OTotal, + 'actions.failed' := OFailed, + 'actions.failed.out_of_service' := OFailedOOS, + 'actions.failed.unknown' := OFailedUnknown, + 'actions.success' := OFailedSucc }, rate := #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 1db1382a7..0c26fd2a7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -64,11 +64,11 @@ fields("rules") -> validator => fun ?MODULE:validate_sql/1 } )}, - {"outputs", + {"actions", sc( - hoconsc:array(hoconsc:union(outputs())), + hoconsc:array(hoconsc:union(actions())), #{ - desc => ?DESC("rules_outputs"), + desc => ?DESC("rules_actions"), default => [], example => [ <<"http:my_http_bridge">>, @@ -93,16 +93,16 @@ fields("rules") -> } )} ]; -fields("builtin_output_republish") -> +fields("builtin_action_republish") -> [ {function, sc(republish, #{desc => ?DESC("republish_function")})}, {args, sc(ref("republish_args"), #{default => #{}})} ]; -fields("builtin_output_console") -> +fields("builtin_action_console") -> [ {function, sc(console, #{desc => ?DESC("console_function")})} - %% we may support some args for the console output in the future - %, {args, sc(map(), #{desc => "The arguments of the built-in 'console' output", + %% we may support some args for the console action in the future + %, {args, sc(map(), #{desc => "The arguments of the built-in 'console' action", % default => #{}})} ]; fields("user_provided_function") -> @@ -169,10 +169,10 @@ desc("rule_engine") -> ?DESC("desc_rule_engine"); desc("rules") -> ?DESC("desc_rules"); -desc("builtin_output_republish") -> - ?DESC("desc_builtin_output_republish"); -desc("builtin_output_console") -> - ?DESC("desc_builtin_output_console"); +desc("builtin_action_republish") -> + ?DESC("desc_builtin_action_republish"); +desc("builtin_action_console") -> + ?DESC("desc_builtin_action_console"); desc("user_provided_function") -> ?DESC("desc_user_provided_function"); desc("republish_args") -> @@ -207,11 +207,11 @@ validate_rule_name(Name) -> {error, Reason} end. -outputs() -> +actions() -> [ binary(), - ref("builtin_output_republish"), - ref("builtin_output_console"), + ref("builtin_action_republish"), + ref("builtin_action_console"), ref("user_provided_function") ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index cbb86e259..5d78d7ab8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -122,7 +122,7 @@ do_apply_rule( doeach := DoEach, incase := InCase, conditions := Conditions, - outputs := Outputs + actions := Actions }, Input ) -> @@ -145,7 +145,7 @@ do_apply_rule( _ -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed') end, - {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; + {ok, [handle_action_list(RuleId, Actions, Coll, Input) || Coll <- Collection2]}; false -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} @@ -156,7 +156,7 @@ do_apply_rule( is_foreach := false, fields := Fields, conditions := Conditions, - outputs := Outputs + actions := Actions }, Input ) -> @@ -172,7 +172,7 @@ do_apply_rule( of true -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'), - {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; + {ok, handle_action_list(RuleId, Actions, Selected, Input)}; false -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'), {error, nomatch} @@ -185,24 +185,24 @@ clear_rule_payload() -> select_and_transform(Fields, Input) -> select_and_transform(Fields, Input, #{}). -select_and_transform([], _Input, Output) -> - Output; -select_and_transform(['*' | More], Input, Output) -> - select_and_transform(More, Input, maps:merge(Output, Input)); -select_and_transform([{as, Field, Alias} | More], Input, Output) -> +select_and_transform([], _Input, Action) -> + Action; +select_and_transform(['*' | More], Input, Action) -> + select_and_transform(More, Input, maps:merge(Action, Input)); +select_and_transform([{as, Field, Alias} | More], Input, Action) -> Val = eval(Field, Input), select_and_transform( More, nested_put(Alias, Val, Input), - nested_put(Alias, Val, Output) + nested_put(Alias, Val, Action) ); -select_and_transform([Field | More], Input, Output) -> +select_and_transform([Field | More], Input, Action) -> Val = eval(Field, Input), Key = alias(Field), select_and_transform( More, nested_put(Key, Val, Input), - nested_put(Key, Val, Output) + nested_put(Key, Val, Action) ). %% FOREACH Clause @@ -210,27 +210,27 @@ select_and_transform([Field | More], Input, Output) -> select_and_collect(Fields, Input) -> select_and_collect(Fields, Input, {#{}, {'item', []}}). -select_and_collect([{as, Field, {_, A} = Alias}], Input, {Output, _}) -> +select_and_collect([{as, Field, {_, A} = Alias}], Input, {Action, _}) -> Val = eval(Field, Input), - {nested_put(Alias, Val, Output), {A, ensure_list(Val)}}; -select_and_collect([{as, Field, Alias} | More], Input, {Output, LastKV}) -> + {nested_put(Alias, Val, Action), {A, ensure_list(Val)}}; +select_and_collect([{as, Field, Alias} | More], Input, {Action, LastKV}) -> Val = eval(Field, Input), select_and_collect( More, nested_put(Alias, Val, Input), - {nested_put(Alias, Val, Output), LastKV} + {nested_put(Alias, Val, Action), LastKV} ); -select_and_collect([Field], Input, {Output, _}) -> +select_and_collect([Field], Input, {Action, _}) -> Val = eval(Field, Input), Key = alias(Field), - {nested_put(Key, Val, Output), {'item', ensure_list(Val)}}; -select_and_collect([Field | More], Input, {Output, LastKV}) -> + {nested_put(Key, Val, Action), {'item', ensure_list(Val)}}; +select_and_collect([Field | More], Input, {Action, LastKV}) -> Val = eval(Field, Input), Key = alias(Field), select_and_collect( More, nested_put(Key, Val, Input), - {nested_put(Key, Val, Output), LastKV} + {nested_put(Key, Val, Action), LastKV} ). %% Filter each item got from FOREACH @@ -312,43 +312,43 @@ number(Bin) -> error:badarg -> binary_to_float(Bin) end. -handle_output_list(RuleId, Outputs, Selected, Envs) -> - [handle_output(RuleId, Out, Selected, Envs) || Out <- Outputs]. +handle_action_list(RuleId, Actions, Selected, Envs) -> + [handle_action(RuleId, Act, Selected, Envs) || Act <- Actions]. -handle_output(RuleId, OutId, Selected, Envs) -> - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.total'), +handle_action(RuleId, ActId, Selected, Envs) -> + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), try - Result = do_handle_output(OutId, Selected, Envs), - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.success'), + Result = do_handle_action(ActId, Selected, Envs), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'), Result catch throw:out_of_service -> - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), ok = emqx_metrics_worker:inc( - rule_metrics, RuleId, 'outputs.failed.out_of_service' + rule_metrics, RuleId, 'actions.failed.out_of_service' ), - ?SLOG(warning, #{msg => "out_of_service", output => OutId}); + ?SLOG(warning, #{msg => "out_of_service", action => ActId}); Err:Reason:ST -> - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed'), - ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'), ?SLOG(error, #{ - msg => "output_failed", - output => OutId, + msg => "action_failed", + action => ActId, exception => Err, reason => Reason, stacktrace => ST }) end. -do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> - ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), +do_handle_action(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> + ?TRACE("BRIDGE", "bridge_action", #{bridge_id => BridgeId}), case emqx_bridge:send_message(BridgeId, Selected) of {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> throw(out_of_service); Result -> Result end; -do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> +do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> %% the function can also throw 'out_of_service' Mod:Func(Selected, Envs, Args). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index cc545a96e..8c1d0cb1d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -51,7 +51,7 @@ test_rule(Sql, Select, Context, EventTopics) -> id => RuleId, sql => Sql, from => EventTopics, - outputs => [#{mod => ?MODULE, func => get_selected_data, args => #{}}], + actions => [#{mod => ?MODULE, func => get_selected_data, args => #{}}], enable => true, is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), fields => emqx_rule_sqlparser:select_fields(Select), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index bf274b624..382933e78 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -166,9 +166,9 @@ init_per_testcase(t_events, Config) -> #{ id => <<"rule:t_events">>, sql => SQL, - outputs => [ + actions => [ #{ - function => <<"emqx_rule_engine_SUITE:output_record_triggered_events">>, + function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>, args => #{} } ], @@ -194,7 +194,7 @@ t_create_rule(_Config) -> #{ sql => <<"select * from \"t/a\"">>, id => <<"t_create_rule">>, - outputs => [#{function => console}], + actions => [#{function => console}], description => <<"debug rule">> } ), @@ -256,7 +256,7 @@ t_create_existing_rule(_Config) -> #{ id => <<"an_existing_rule">>, sql => <<"select * from \"t/#\"">>, - outputs => [#{function => console}] + actions => [#{function => console}] } ), {ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>), @@ -554,12 +554,12 @@ t_match_atom_and_binary(_Config) -> "SELECT connected_at as ts, * " "FROM \"$events/client_connected\" " "WHERE username = 'emqx2' ", - Repub = republish_output(<<"t2">>, <<"user:${ts}">>), + Repub = republish_action(<<"t2">>, <<"user:${ts}">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{ sql => SQL, id => ?TMP_RULEID, - outputs => [Repub] + actions => [Repub] } ), {ok, Client} = emqtt:start_link([{username, <<"emqx1">>}]), @@ -750,8 +750,8 @@ t_sqlselect_001(_Config) -> Sql2 = "SELECT jq('.a|.[]', " "'{\"a\": [{\"b\": 1}, {\"b\": 2}, {\"b\": 3}]}') " - "as jq_output, " - " jq_output[1].b as first_b from \"t/#\" ", + "as jq_action, " + " jq_action[1].b as first_b from \"t/#\" ", ?assertMatch( {ok, #{<<"first_b">> := 1}}, emqx_rule_sqltester:test( @@ -771,12 +771,12 @@ t_sqlselect_01(_Config) -> "SELECT json_decode(payload) as p, payload " "FROM \"t3/#\", \"t1\" " "WHERE p.x = 1", - Repub = republish_output(<<"t2">>), + Repub = republish_action(<<"t2">>), {ok, TopicRule1} = emqx_rule_engine:create_rule( #{ sql => SQL, id => ?TMP_RULEID, - outputs => [Repub] + actions => [Repub] } ), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -817,12 +817,12 @@ t_sqlselect_02(_Config) -> "SELECT * " "FROM \"t3/#\", \"t1\" " "WHERE payload.x = 1", - Repub = republish_output(<<"t2">>), + Repub = republish_action(<<"t2">>), {ok, TopicRule1} = emqx_rule_engine:create_rule( #{ sql => SQL, id => ?TMP_RULEID, - outputs => [Repub] + actions => [Repub] } ), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -863,12 +863,12 @@ t_sqlselect_1(_Config) -> "SELECT json_decode(payload) as p, payload " "FROM \"t1\" " "WHERE p.x = 1 and p.y = 2", - Repub = republish_output(<<"t2">>), + Repub = republish_action(<<"t2">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{ sql => SQL, id => ?TMP_RULEID, - outputs => [Repub] + actions => [Repub] } ), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -898,12 +898,12 @@ t_sqlselect_1(_Config) -> t_sqlselect_2(_Config) -> %% recursively republish to t2 SQL = "SELECT * FROM \"t2\" ", - Repub = republish_output(<<"t2">>), + Repub = republish_action(<<"t2">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{ sql => SQL, id => ?TMP_RULEID, - outputs => [Repub] + actions => [Repub] } ), {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), @@ -932,12 +932,12 @@ t_sqlselect_3(_Config) -> "SELECT * " "FROM \"$events/client_connected\" " "WHERE username = 'emqx1'", - Repub = republish_output(<<"t2">>, <<"clientid=${clientid}">>), + Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>), {ok, TopicRule} = emqx_rule_engine:create_rule( #{ sql => SQL, id => ?TMP_RULEID, - outputs => [Repub] + actions => [Repub] } ), {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), @@ -2270,7 +2270,7 @@ t_get_basic_usage_info_1(_Config) -> #{ id => <<"rule:t_get_basic_usage_info:1">>, sql => <<"select 1 from topic">>, - outputs => + actions => [ #{function => <<"erlang:hibernate">>, args => #{}}, #{function => console}, @@ -2284,7 +2284,7 @@ t_get_basic_usage_info_1(_Config) -> #{ id => <<"rule:t_get_basic_usage_info:2">>, sql => <<"select 1 from topic">>, - outputs => + actions => [ <<"mqtt:my_mqtt_bridge">>, <<"http:my_http_bridge">> @@ -2308,9 +2308,9 @@ t_get_basic_usage_info_1(_Config) -> %% Internal helpers %%------------------------------------------------------------------------------ -republish_output(Topic) -> - republish_output(Topic, <<"${payload}">>). -republish_output(Topic, Payload) -> +republish_action(Topic) -> + republish_action(Topic, <<"${payload}">>). +republish_action(Topic, Payload) -> #{ function => republish, args => #{payload => Payload, topic => Topic, qos => 0, retain => false} @@ -2337,13 +2337,13 @@ make_simple_rule(RuleId, SQL, Topics, Ts) when is_binary(RuleId) -> fields => [<<"*">>], is_foreach => false, conditions => {}, - outputs => [#{mod => emqx_rule_outputs, func => console, args => #{}}], + actions => [#{mod => emqx_rule_actions, func => console, args => #{}}], description => <<"simple rule">>, created_at => Ts }. -output_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) -> - ct:pal("applying output_record_triggered_events: ~p", [Data]), +action_record_triggered_events(Data = #{event := EventName}, _Envs, _Args) -> + ct:pal("applying action_record_triggered_events: ~p", [Data]), ets:insert(events_record_tab, {EventName, Data}). verify_event(EventName) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index b0dbefd27..13db82aa9 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -33,7 +33,7 @@ t_crud_rule_api(_Config) -> <<"description">> => <<"A simple rule">>, <<"enable">> => true, <<"id">> => RuleID, - <<"outputs">> => [#{<<"function">> => <<"console">>}], + <<"actions">> => [#{<<"function">> => <<"console">>}], <<"sql">> => <<"SELECT * from \"t/1\"">>, <<"name">> => <<"test_rule">> }, From a157539710065de37fce57d6f00e6bbaa9d5d188 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 25 May 2022 18:56:12 +0800 Subject: [PATCH 2/3] refactor(rule): rename http_bridge -> webhook --- apps/emqx_bridge/etc/emqx_bridge.conf | 4 ++-- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 10 +++++----- ...hema.conf => emqx_bridge_webhook_schema.conf} | 2 +- apps/emqx_bridge/src/emqx_bridge.erl | 2 +- apps/emqx_bridge/src/emqx_bridge_api.erl | 16 +++++++--------- apps/emqx_bridge/src/emqx_bridge_resource.erl | 8 ++++---- apps/emqx_bridge/src/emqx_bridge_schema.erl | 8 ++++---- ...schema.erl => emqx_bridge_webhook_schema.erl} | 6 +++--- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 14 +++++++------- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 10 +++++----- .../emqx_connector/src/emqx_connector_schema.erl | 2 +- apps/emqx_modules/test/emqx_telemetry_SUITE.erl | 6 +++--- .../src/emqx_rule_engine_schema.erl | 2 +- .../test/emqx_rule_engine_SUITE.erl | 6 +++--- 14 files changed, 47 insertions(+), 49 deletions(-) rename apps/emqx_bridge/i18n/{emqx_bridge_http_schema.conf => emqx_bridge_webhook_schema.conf} (99%) rename apps/emqx_bridge/src/{emqx_bridge_http_schema.erl => emqx_bridge_webhook_schema.erl} (96%) diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf index 93442b573..19eea5d93 100644 --- a/apps/emqx_bridge/etc/emqx_bridge.conf +++ b/apps/emqx_bridge/etc/emqx_bridge.conf @@ -28,8 +28,8 @@ # retain = false #} # -## HTTP bridges to an HTTP server -#bridges.http.my_http_bridge { +## WebHook to an HTTP server +#bridges.webhook.my_webhook { # enable = true # direction = egress # ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index 1c2e55ca0..ff7239cef 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -72,14 +72,14 @@ In config files, you can find the corresponding config entry for a connector by } } - bridges_http { + bridges_webhook { desc { - en: """HTTP bridges to an HTTP server.""" - zh: """转发消息到 HTTP 服务器的 HTTP Bridge""" + en: """WebHook to an HTTP server.""" + zh: """转发消息到 HTTP 服务器的 WebHook""" } label: { - en: "HTTP Bridge" - zh: "HTTP Bridge" + en: "WebHook" + zh: "WebHook" } } diff --git a/apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf similarity index 99% rename from apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf rename to apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf index bbe9d1aab..cd2cafd78 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_http_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_webhook_schema.conf @@ -1,4 +1,4 @@ -emqx_bridge_http_schema { +emqx_bridge_webhook_schema { config_enable { desc { diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 6fe9bd6fe..0fac31625 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -263,7 +263,7 @@ get_matched_bridges(Topic) -> (_BName, #{direction := ingress}, Acc1) -> Acc1; (BName, #{direction := egress} = Egress, Acc1) -> - %% HTTP, MySQL bridges only have egress direction + %% WebHook, MySQL bridges only have egress direction get_matched_bridge_id(Egress, Topic, BType, BName, Acc1) end, Acc0, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 5bc313e42..30bd52fc7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -42,8 +42,6 @@ -export([lookup_from_local_node/2]). --define(TYPES, [mqtt, http]). - -define(CONN_TYPES, [mqtt]). -define(TRY_PARSE_ID(ID, EXPR), @@ -148,7 +146,7 @@ param_path_id() -> #{ in => path, required => true, - example => <<"http:my_http_bridge">>, + example => <<"webhook:my_webhook">>, desc => ?DESC("desc_param_path_id") } )}. @@ -158,9 +156,9 @@ bridge_info_array_example(Method) -> bridge_info_examples(Method) -> maps:merge(conn_bridge_examples(Method), #{ - <<"http_bridge">> => #{ - summary => <<"HTTP Bridge">>, - value => info_example(http, awesome, Method) + <<"my_webhook">> => #{ + summary => <<"WebHook">>, + value => info_example(webhook, awesome, Method) } }). @@ -196,7 +194,7 @@ method_example(Type, Direction, Method) when Method == get; Method == post -> SDir = atom_to_list(Direction), SName = case Type of - http -> "my_" ++ SType ++ "_bridge"; + webhook -> "my_" ++ SType; _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge" end, TypeNameExamp = #{ @@ -220,7 +218,7 @@ maybe_with_metrics_example(TypeNameExamp, get) -> maybe_with_metrics_example(TypeNameExamp, _) -> TypeNameExamp. -info_example_basic(http, _) -> +info_example_basic(webhook, _) -> #{ enable => true, url => <<"http://localhost:9901/messages/${topic}">>, @@ -232,7 +230,7 @@ info_example_basic(http, _) -> pool_size => 4, enable_pipelining => true, ssl => #{enable => false}, - local_topic => <<"emqx_http/#">>, + local_topic => <<"emqx_webhook/#">>, method => post, body => <<"${payload}">> }; diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 9d33f6eee..b3f7ec978 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -43,8 +43,8 @@ bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; -bridge_to_resource_type(<<"http">>) -> emqx_connector_http; -bridge_to_resource_type(http) -> emqx_connector_http. +bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; +bridge_to_resource_type(webhook) -> emqx_connector_http. resource_id(BridgeId) when is_binary(BridgeId) -> <<"bridge:", BridgeId/binary>>. @@ -104,7 +104,7 @@ update(Type, Name, {OldConf, Conf}) -> %% - if the connection related configs like `servers` is updated, we should restart/start %% or stop bridges according to the change. %% - if the connection related configs are not update, only non-connection configs like - %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated + %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated %% without restarting the bridge. %% case if_only_to_toggle_enable(OldConf, Conf) of @@ -237,7 +237,7 @@ is_tmp_path(TmpPath, File) -> string:str(str(File), str(TmpPath)) > 0. parse_confs( - http, + webhook, _Name, #{ url := Url, diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 7c54652ef..584c67b85 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -46,7 +46,7 @@ http_schema(Method) -> ?CONN_TYPES ), hoconsc:union([ - ref(emqx_bridge_http_schema, Method) + ref(emqx_bridge_webhook_schema, Method) | Schemas ]). @@ -108,10 +108,10 @@ roots() -> [bridges]. fields(bridges) -> [ - {http, + {webhook, mk( - hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), - #{desc => ?DESC("bridges_http")} + hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")), + #{desc => ?DESC("bridges_webhook")} )} ] ++ [ diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl similarity index 96% rename from apps/emqx_bridge/src/emqx_bridge_http_schema.erl rename to apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl index ff1ab2c05..972ba86bc 100644 --- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_webhook_schema.erl @@ -1,4 +1,4 @@ --module(emqx_bridge_http_schema). +-module(emqx_bridge_webhook_schema). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -81,7 +81,7 @@ fields("get") -> desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for HTTP bridge using `", string:to_upper(Method), "` method."]; + ["Configuration for WebHook using `", string:to_upper(Method), "` method."]; desc(_) -> undefined. @@ -111,7 +111,7 @@ basic_config() -> type_field() -> {type, mk( - http, + webhook, #{ required => true, desc => ?DESC("desc_type") diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index 75fa71414..fb7d1e314 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -54,8 +54,8 @@ end_per_testcase(t_get_basic_usage_info_1, _Config) -> {ok, _} = emqx_bridge:remove(BridgeType, BridgeName) end, [ - {http, <<"basic_usage_info_http">>}, - {http, <<"basic_usage_info_http_disabled">>}, + {webhook, <<"basic_usage_info_webhook">>}, + {webhook, <<"basic_usage_info_webhook_disabled">>}, {mqtt, <<"basic_usage_info_mqtt">>} ] ), @@ -81,7 +81,7 @@ t_get_basic_usage_info_1(_Config) -> #{ num_bridges => 3, count_by_type => #{ - http => 1, + webhook => 1, mqtt => 2 } }, @@ -119,7 +119,7 @@ setup_fake_telemetry_data() -> url => <<"http://localhost:9901/messages/${topic}">>, enable => true, direction => egress, - local_topic => "emqx_http/#", + local_topic => "emqx_webhook/#", method => post, body => <<"${payload}">>, headers => #{}, @@ -129,10 +129,10 @@ setup_fake_telemetry_data() -> #{ <<"bridges">> => #{ - <<"http">> => + <<"webhook">> => #{ - <<"basic_usage_info_http">> => HTTPConfig, - <<"basic_usage_info_http_disabled">> => + <<"basic_usage_info_webhook">> => HTTPConfig, + <<"basic_usage_info_webhook_disabled">> => HTTPConfig#{enable => false} }, <<"mqtt">> => diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 2c5c318d3..c5d8d924c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(CONF_DEFAULT, <<"bridges: {}">>). --define(BRIDGE_TYPE, <<"http">>). +-define(BRIDGE_TYPE, <<"webhook">>). -define(BRIDGE_NAME, <<"test_bridge">>). -define(URL(PORT, PATH), list_to_binary( @@ -37,7 +37,7 @@ <<"type">> => TYPE, <<"name">> => NAME, <<"url">> => URL, - <<"local_topic">> => <<"emqx_http/#">>, + <<"local_topic">> => <<"emqx_webhook/#">>, <<"method">> => <<"post">>, <<"ssl">> => #{<<"enable">> => false}, <<"body">> => <<"${payload}">>, @@ -158,7 +158,7 @@ t_http_crud_apis(_) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - %% then we add a http bridge, using POST + %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), {ok, 201, Bridge} = request( @@ -182,7 +182,7 @@ t_http_crud_apis(_) -> BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, - emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), + emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), ?assert( receive {http_server, received, #{ @@ -254,7 +254,7 @@ t_http_crud_apis(_) -> ), %% send an message to emqx again, check the path has been changed - emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)), + emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), ?assert( receive {http_server, received, #{path := <<"/path2">>}} -> diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index 27f982e74..3f1d4f4aa 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -30,7 +30,7 @@ post_request/0 ]). -%% the config for http bridges do not need connectors +%% the config for webhook bridges do not need connectors -define(CONN_TYPES, [mqtt]). %%====================================================================================== diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 8281fed21..57afc727d 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -745,8 +745,8 @@ setup_fake_rule_engine_data() -> [ #{function => <<"erlang:hibernate">>, args => #{}}, #{function => console}, - <<"http:my_http_bridge">>, - <<"http:my_http_bridge">> + <<"webhook:my_webhook">>, + <<"webhook:my_webhook">> ] } ), @@ -758,7 +758,7 @@ setup_fake_rule_engine_data() -> actions => [ <<"mqtt:my_mqtt_bridge">>, - <<"http:my_http_bridge">> + <<"webhook:my_webhook">> ] } ), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 0c26fd2a7..82697b97f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -71,7 +71,7 @@ fields("rules") -> desc => ?DESC("rules_actions"), default => [], example => [ - <<"http:my_http_bridge">>, + <<"webhook:my_webhook">>, #{ function => republish, args => #{ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 382933e78..21576519c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -2274,8 +2274,8 @@ t_get_basic_usage_info_1(_Config) -> [ #{function => <<"erlang:hibernate">>, args => #{}}, #{function => console}, - <<"http:my_http_bridge">>, - <<"http:my_http_bridge">> + <<"webhook:my_webhook">>, + <<"webhook:my_webhook">> ] } ), @@ -2287,7 +2287,7 @@ t_get_basic_usage_info_1(_Config) -> actions => [ <<"mqtt:my_mqtt_bridge">>, - <<"http:my_http_bridge">> + <<"webhook:my_webhook">> ] } ), From 55fce3347756243dba55b1f0ad3da3b89154127b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 25 May 2022 21:55:28 +0800 Subject: [PATCH 3/3] fix: add telemetry testcases in all/0 --- apps/emqx_modules/test/emqx_telemetry_SUITE.erl | 2 +- apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 57afc727d..42b31ac0d 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -562,7 +562,7 @@ t_rule_engine_and_data_bridge_info(_Config) -> #{ data_bridge => #{ - http => #{num => 1, num_linked_by_rules => 3}, + webhook => #{num => 1, num_linked_by_rules => 3}, mqtt => #{num => 2, num_linked_by_rules => 2} }, num_data_bridges => 3 diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 21576519c..a5834d5df 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -35,6 +35,7 @@ all() -> {group, registry}, {group, runtime}, {group, events}, + {group, telemetry}, {group, bugs} ]. @@ -91,6 +92,10 @@ groups() -> t_sqlparse_invalid_json ]}, {events, [], [t_events]}, + {telemetry, [], [ + t_get_basic_usage_info_0, + t_get_basic_usage_info_1 + ]}, {bugs, [], [ t_sqlparse_payload_as, t_sqlparse_nested_get @@ -2297,7 +2302,7 @@ t_get_basic_usage_info_1(_Config) -> referenced_bridges => #{ mqtt => 1, - http => 3 + webhook => 3 } }, emqx_rule_engine:get_basic_usage_info()