From af295a9b71ec5099a731539566c79fc1c8f5b285 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 24 Sep 2021 19:15:11 +0800 Subject: [PATCH] refactor(rules): remove resources and actions --- .../etc/emqx_rule_engine.conf | 2 +- apps/emqx_rule_engine/include/rule_engine.hrl | 104 +-- .../src/emqx_rule_actions.erl | 208 ----- .../src/emqx_rule_api_schema.erl | 148 ++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 635 +-------------- .../src/emqx_rule_engine_api.erl | 757 ++++++------------ .../src/emqx_rule_engine_app.erl | 8 +- .../src/emqx_rule_engine_sup.erl | 22 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 7 +- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 20 + .../src/emqx_rule_metrics.erl | 174 +--- .../src/emqx_rule_monitor.erl | 126 --- ..._rule_locker.erl => emqx_rule_outputs.erl} | 26 +- .../src/emqx_rule_registry.erl | 297 +------ .../src/emqx_rule_runtime.erl | 120 +-- .../src/emqx_rule_sqlparser.erl | 8 +- .../src/emqx_rule_sqltester.erl | 50 +- .../src/emqx_rule_validator.erl | 195 ----- .../test/emqx_rule_engine_SUITE.erl | 16 +- .../test/emqx_rule_monitor_SUITE.erl | 109 --- .../test/emqx_rule_registry_SUITE.erl | 2 +- .../test/emqx_rule_validator_SUITE.erl | 191 ----- 22 files changed, 604 insertions(+), 2621 deletions(-) delete mode 100644 apps/emqx_rule_engine/src/emqx_rule_actions.erl create mode 100644 apps/emqx_rule_engine/src/emqx_rule_api_schema.erl delete mode 100644 apps/emqx_rule_engine/src/emqx_rule_monitor.erl rename apps/emqx_rule_engine/src/{emqx_rule_locker.erl => emqx_rule_outputs.erl} (65%) delete mode 100644 apps/emqx_rule_engine/src/emqx_rule_validator.erl delete mode 100644 apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl delete mode 100644 apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl diff --git a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf index 22543a977..a4344cda8 100644 --- a/apps/emqx_rule_engine/etc/emqx_rule_engine.conf +++ b/apps/emqx_rule_engine/etc/emqx_rule_engine.conf @@ -1,6 +1,6 @@ ##==================================================================== ## Rule Engine for EMQ X R5.0 ##==================================================================== -emqx_rule_engine { +rule_engine { ignore_sys_message = true } diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 760495f6b..29d21b7cc 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -23,14 +23,6 @@ -type(rule_id() :: binary()). -type(rule_name() :: binary()). --type(resource_id() :: binary()). --type(action_instance_id() :: binary()). - --type(action_name() :: atom()). --type(resource_type_name() :: atom()). - --type(category() :: data_persist| data_forward | offline_msgs | debug | other). - -type(descr() :: #{en := binary(), zh => binary()}). -type(mf() :: {Module::atom(), Fun::atom()}). @@ -38,89 +30,27 @@ -type(hook() :: atom() | 'any'). -type(topic() :: binary()). +-type(bridge_channel_id() :: binary()). --type(resource_status() :: #{ alive := boolean() - , atom() => binary() | atom() | list(binary()|atom()) - }). +-type(rule_info() :: + #{ from := list(topic()) + , to := list(bridge_channel_id() | fun()) + , sql := binary() + , is_foreach := boolean() + , fields := list() + , doeach := term() + , incase := list() + , conditions := tuple() + , enabled := boolean() + , description := binary() + }). -define(descr, #{en => <<>>, zh => <<>>}). --record(action, - { name :: action_name() - , category :: category() - , for :: hook() - , app :: atom() - , types = [] :: list(resource_type_name()) - , module :: module() - , on_create :: mf() - , on_destroy :: maybe(mf()) - , hidden = false :: boolean() - , params_spec :: #{atom() => term()} %% params specs - , title = ?descr :: descr() - , description = ?descr :: descr() - }). - --record(action_instance, - { id :: action_instance_id() - , name :: action_name() - , fallbacks :: list(#action_instance{}) - , args :: #{binary() => term()} %% the args got from API for initializing action_instance - }). - -record(rule, { id :: rule_id() - , for :: list(topic()) - , rawsql :: binary() - , is_foreach :: boolean() - , fields :: list() - , doeach :: term() - , incase :: list() - , conditions :: tuple() - , on_action_failed :: continue | stop - , actions :: list(#action_instance{}) - , enabled :: boolean() , created_at :: integer() %% epoch in millisecond precision - , description :: binary() - , state = normal :: atom() - }). - --record(resource, - { id :: resource_id() - , type :: resource_type_name() - , config :: #{} %% the configs got from API for initializing resource - , created_at :: integer() | undefined %% epoch in millisecond precision - , description :: binary() - }). - --record(resource_type, - { name :: resource_type_name() - , provider :: atom() - , params_spec :: #{atom() => term()} %% params specs - , on_create :: mf() - , on_status :: mf() - , on_destroy :: mf() - , title = ?descr :: descr() - , description = ?descr :: descr() - }). - --record(rule_hooks, - { hook :: atom() - , rule_id :: rule_id() - }). - --record(resource_params, - { id :: resource_id() - , params :: #{} %% the params got after initializing the resource - , status = #{is_alive => false} :: #{is_alive := boolean(), atom() => term()} - }). - --record(action_instance_params, - { id :: action_instance_id() - %% the params got after initializing the action - , params :: #{} - %% the Func/Bindings got after initializing the action - , apply :: fun((Data::map(), Envs::map()) -> any()) - | #{mod := module(), bindings := #{atom() => term()}} + , info :: rule_info() }). %% Arithmetic operators @@ -157,9 +87,3 @@ %% Tables -define(RULE_TAB, emqx_rule). --define(ACTION_TAB, emqx_rule_action). --define(ACTION_INST_PARAMS_TAB, emqx_action_instance_params). --define(RES_TAB, emqx_resource). --define(RES_PARAMS_TAB, emqx_resource_params). --define(RULE_HOOKS, emqx_rule_hooks). --define(RES_TYPE_TAB, emqx_resource_type). diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl deleted file mode 100644 index 7ac45633c..000000000 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ /dev/null @@ -1,208 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% Define the default actions. --module(emqx_rule_actions). - --include("rule_engine.hrl"). --include("rule_actions.hrl"). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - --define(REPUBLISH_PARAMS_SPEC, #{ - target_topic => #{ - order => 1, - type => string, - required => true, - default => <<"repub/to/${clientid}">>, - title => #{en => <<"Target Topic">>, - zh => <<"目的主题"/utf8>>}, - description => #{en => <<"To which topic the message will be republished">>, - zh => <<"重新发布消息到哪个主题"/utf8>>} - }, - target_qos => #{ - order => 2, - type => number, - enum => [-1, 0, 1, 2], - required => true, - default => 0, - title => #{en => <<"Target QoS">>, - zh => <<"目的 QoS"/utf8>>}, - description => #{en => <<"The QoS Level to be uses when republishing the message. Set to -1 to use the original QoS">>, - zh => <<"重新发布消息时用的 QoS 级别, 设置为 -1 以使用原消息中的 QoS"/utf8>>} - }, - payload_tmpl => #{ - order => 3, - type => string, - input => textarea, - required => false, - default => <<"${payload}">>, - title => #{en => <<"Payload Template">>, - zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported">>, - zh => <<"消息内容模板,支持变量"/utf8>>} - } - }). - --rule_action(#{name => inspect, - category => debug, - for => '$any', - types => [], - create => on_action_create_inspect, - params => #{}, - title => #{en => <<"Inspect (debug)">>, - zh => <<"检查 (调试)"/utf8>>}, - description => #{en => <<"Inspect the details of action params for debug purpose">>, - zh => <<"检查动作参数 (用以调试)"/utf8>>} - }). - --rule_action(#{name => republish, - category => data_forward, - for => '$any', - types => [], - create => on_action_create_republish, - params => ?REPUBLISH_PARAMS_SPEC, - title => #{en => <<"Republish">>, - zh => <<"消息重新发布"/utf8>>}, - description => #{en => <<"Republish a MQTT message to another topic">>, - zh => <<"重新发布消息到另一个主题"/utf8>>} - }). - --rule_action(#{name => do_nothing, - category => debug, - for => '$any', - types => [], - create => on_action_create_do_nothing, - params => #{}, - title => #{en => <<"Do Nothing (debug)">>, - zh => <<"空动作 (调试)"/utf8>>}, - description => #{en => <<"This action does nothing and never fails. It's for debug purpose">>, - zh => <<"此动作什么都不做,并且不会失败 (用以调试)"/utf8>>} - }). - --export([on_resource_create/2]). - -%% callbacks for rule engine --export([ on_action_create_inspect/2 - , on_action_create_republish/2 - , on_action_create_do_nothing/2 - ]). - --export([ on_action_inspect/2 - , on_action_republish/2 - , on_action_do_nothing/2 - ]). - --spec(on_resource_create(binary(), map()) -> map()). -on_resource_create(_Name, Conf) -> - Conf. - -%%------------------------------------------------------------------------------ -%% Action 'inspect' -%%------------------------------------------------------------------------------ --spec on_action_create_inspect(Id :: action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. -on_action_create_inspect(Id, Params) -> - Params. - --spec on_action_inspect(selected_data(), env_vars()) -> any(). -on_action_inspect(Selected, Envs) -> - ?ULOG("[inspect]~n" - "\tSelected Data: ~p~n" - "\tEnvs: ~p~n" - "\tAction Init Params: ~p~n", [Selected, Envs, ?bound_v('Params', Envs)]), - emqx_rule_metrics:inc_actions_success(?bound_v('Id', Envs)). - - -%%------------------------------------------------------------------------------ -%% Action 'republish' -%%------------------------------------------------------------------------------ --spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. -on_action_create_republish(Id, Params = #{ - <<"target_topic">> := TargetTopic, - <<"target_qos">> := TargetQoS, - <<"payload_tmpl">> := PayloadTmpl - }) -> - TopicTks = emqx_plugin_libs_rule:preproc_tmpl(TargetTopic), - PayloadTks = emqx_plugin_libs_rule:preproc_tmpl(PayloadTmpl), - Params. - --spec on_action_republish(selected_data(), env_vars()) -> any(). -on_action_republish(_Selected, Envs = #{ - topic := Topic, - headers := #{republish_by := ActId}, - ?BINDING_KEYS := #{'Id' := ActId} - }) -> - ?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p", - [Topic, ?bound_v('TargetTopic', Envs)]), - emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)); - -on_action_republish(Selected, _Envs = #{ - qos := QoS, flags := Flags, timestamp := Timestamp, - ?BINDING_KEYS := #{ - 'Id' := ActId, - 'TargetTopic' := TargetTopic, - 'TargetQoS' := TargetQoS, - 'TopicTks' := TopicTks, - 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> QoS; true -> TargetQoS end, - from = ActId, - flags = Flags, - headers = #{republish_by => ActId}, - topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), - payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), - timestamp = Timestamp - }); - -%% in case this is not a "message.publish" request -on_action_republish(Selected, _Envs = #{ - ?BINDING_KEYS := #{ - 'Id' := ActId, - 'TargetTopic' := TargetTopic, - 'TargetQoS' := TargetQoS, - 'TopicTks' := TopicTks, - 'PayloadTks' := PayloadTks - }}) -> - ?LOG(debug, "[republish] republish to: ~p, Payload: ~p", - [TargetTopic, Selected]), - increase_and_publish(ActId, - #message{ - id = emqx_guid:gen(), - qos = if TargetQoS =:= -1 -> 0; true -> TargetQoS end, - from = ActId, - flags = #{dup => false, retain => false}, - headers = #{republish_by => ActId}, - topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected), - payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTks, Selected), - timestamp = erlang:system_time(millisecond) - }). - -increase_and_publish(ActId, Msg) -> - _ = emqx_broker:safe_publish(Msg), - emqx_rule_metrics:inc_actions_success(ActId), - emqx_metrics:inc_msg(Msg). - --spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. -on_action_create_do_nothing(ActId, Params) when is_binary(ActId) -> - Params. - -on_action_do_nothing(Selected, Envs) when is_map(Selected) -> - emqx_rule_metrics:inc_actions_success(?bound_v('ActId', Envs)). diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl new file mode 100644 index 000000000..051624b4a --- /dev/null +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -0,0 +1,148 @@ +-module(emqx_rule_api_schema). + +-behaviour(hocon_schema). + +-include_lib("typerefl/include/types.hrl"). + +-export([ check_params/2 + ]). + +-export([roots/0, fields/1]). + +-type tag() :: rule_creation | rule_test. + +-spec check_params(map(), tag()) -> {ok, map()} | {error, term()}. +check_params(Params, Tag) -> + BTag = atom_to_binary(Tag), + try hocon_schema:check_plain(?MODULE, #{BTag => Params}, + #{atom_key => true, nullable => true}, [BTag]) of + #{Tag := Checked} -> {ok, Checked} + catch + Error:Reason:ST -> + logger:error("check rule params failed: ~p", [{Error, Reason, ST}]), + {error, {Reason, ST}} + end. + +%%====================================================================================== +%% Hocon Schema Definitions + +roots() -> + [ {"rule_creation", sc(ref("rule_creation"), #{})} + , {"rule_test", sc(ref("rule_test"), #{})} + ]. + +fields("rule_creation") -> + [ {"id", sc(binary(), #{desc => "The Id of the rule", nullable => false})} + , {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})} + , {"outputs", sc(hoconsc:array(binary()), + #{desc => "The outputs of the rule", default => [<<"console">>]})} + , {"enable", sc(boolean(), #{desc => "Enable or disable the rule", default => true})} + , {"description", sc(binary(), #{desc => "The description of the rule", default => <<>>})} + ]; + +fields("rule_test") -> + [ {"context", sc(hoconsc:union([ ref("ctx_pub") + , ref("ctx_sub") + , ref("ctx_delivered") + , ref("ctx_acked") + , ref("ctx_dropped") + , ref("ctx_connected") + , ref("ctx_disconnected") + ]), + #{desc => "The context of the event for testing", + default => #{}})} + , {"sql", sc(binary(), #{desc => "The SQL of the rule for testing", nullable => false})} + ]; + +fields("ctx_pub") -> + [ {"event_type", sc(message_publish, #{desc => "Event Type", nullable => false})} + , {"id", sc(binary(), #{desc => "Message ID"})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"payload", sc(binary(), #{desc => "The Message Payload"})} + , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"publish_received_at", sc(integer(), #{ + desc => "The Time that this Message is Received"})} + ] ++ [qos()]; + +fields("ctx_sub") -> + [ {"event_type", sc(session_subscribed, #{desc => "Event Type", nullable => false})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"payload", sc(binary(), #{desc => "The Message Payload"})} + , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"publish_received_at", sc(integer(), #{ + desc => "The Time that this Message is Received"})} + ] ++ [qos()]; + +fields("ctx_unsub") -> + [{"event_type", sc(session_unsubscribed, #{desc => "Event Type", nullable => false})}] ++ + proplists:delete("event_type", fields("ctx_sub")); + +fields("ctx_delivered") -> + [ {"event_type", sc(message_delivered, #{desc => "Event Type", nullable => false})} + , {"id", sc(binary(), #{desc => "Message ID"})} + , {"from_clientid", sc(binary(), #{desc => "The Client ID"})} + , {"from_username", sc(binary(), #{desc => "The User Name"})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"payload", sc(binary(), #{desc => "The Message Payload"})} + , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"publish_received_at", sc(integer(), #{ + desc => "The Time that this Message is Received"})} + ] ++ [qos()]; + +fields("ctx_acked") -> + [{"event_type", sc(message_acked, #{desc => "Event Type", nullable => false})}] ++ + proplists:delete("event_type", fields("ctx_delivered")); + +fields("ctx_dropped") -> + [ {"event_type", sc(message_dropped, #{desc => "Event Type", nullable => false})} + , {"id", sc(binary(), #{desc => "Message ID"})} + , {"reason", sc(binary(), #{desc => "The Reason for Dropping"})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"payload", sc(binary(), #{desc => "The Message Payload"})} + , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"publish_received_at", sc(integer(), #{ + desc => "The Time that this Message is Received"})} + ] ++ [qos()]; + +fields("ctx_connected") -> + [ {"event_type", sc(client_connected, #{desc => "Event Type", nullable => false})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"mountpoint", sc(binary(), #{desc => "The Mountpoint"})} + , {"peername", sc(binary(), #{desc => "The IP Address and Port of the Peer Client"})} + , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})} + , {"proto_name", sc(binary(), #{desc => "Protocol Name"})} + , {"proto_ver", sc(binary(), #{desc => "Protocol Version"})} + , {"keepalive", sc(integer(), #{desc => "KeepAlive"})} + , {"clean_start", sc(boolean(), #{desc => "Clean Start", default => true})} + , {"expiry_interval", sc(integer(), #{desc => "Expiry Interval"})} + , {"is_bridge", sc(boolean(), #{desc => "Is Bridge", default => false})} + , {"connected_at", sc(integer(), #{ + desc => "The Time that this Client is Connected"})} + ]; + +fields("ctx_disconnected") -> + [ {"event_type", sc(client_disconnected, #{desc => "Event Type", nullable => false})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"reason", sc(binary(), #{desc => "The Reason for Disconnect"})} + , {"peername", sc(binary(), #{desc => "The IP Address and Port of the Peer Client"})} + , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})} + , {"disconnected_at", sc(integer(), #{ + desc => "The Time that this Client is Disconnected"})} + ]. + +qos() -> + {"qos", sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]), + #{desc => "The Message QoS"})}. + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). +ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index bf0eb06e8..3775b5e4d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -19,627 +19,72 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). --export([ load_providers/0 - , unload_providers/0 - , refresh_resources/0 - , refresh_resource/1 - , refresh_rule/1 - , refresh_rules/0 - , refresh_actions/1 - , refresh_actions/2 - , refresh_resource_status/0 - ]). - -export([ create_rule/1 , update_rule/1 , delete_rule/1 - , create_resource/1 - , test_resource/1 - , start_resource/1 - , get_resource_status/1 - , get_resource_params/1 - , delete_resource/1 - , update_resource/2 ]). --export([ init_resource/4 - , init_action/4 - , clear_resource/3 - , clear_rule/1 - , clear_actions/1 - , clear_action/3 - ]). +-export_type([rule/0]). -type(rule() :: #rule{}). --type(action() :: #action{}). --type(resource() :: #resource{}). --type(resource_type() :: #resource_type{}). --type(resource_params() :: #resource_params{}). --type(action_instance_params() :: #action_instance_params{}). - --export_type([ rule/0 - , action/0 - , resource/0 - , resource_type/0 - , resource_params/0 - , action_instance_params/0 - ]). -define(T_RETRY, 60000). -%%------------------------------------------------------------------------------ -%% Load resource/action providers from all available applications -%%------------------------------------------------------------------------------ - -%% Load all providers . --spec(load_providers() -> ok). -load_providers() -> - lists:foreach(fun(App) -> - load_provider(App) - end, ignore_lib_apps(application:loaded_applications())). - --spec(load_provider(App :: atom()) -> ok). -load_provider(App) when is_atom(App) -> - ok = load_actions(App), - ok = load_resource_types(App). - -%%------------------------------------------------------------------------------ -%% Unload providers -%%------------------------------------------------------------------------------ -%% Load all providers . --spec(unload_providers() -> ok). -unload_providers() -> - lists:foreach(fun(App) -> - unload_provider(App) - end, ignore_lib_apps(application:loaded_applications())). - -%% @doc Unload a provider. --spec(unload_provider(App :: atom()) -> ok). -unload_provider(App) -> - ok = emqx_rule_registry:remove_actions_of(App), - ok = emqx_rule_registry:unregister_resource_types_of(App). - -load_actions(App) -> - Actions = find_actions(App), - emqx_rule_registry:add_actions(Actions). - -load_resource_types(App) -> - ResourceTypes = find_resource_types(App), - emqx_rule_registry:register_resource_types(ResourceTypes). - --spec(find_actions(App :: atom()) -> list(action())). -find_actions(App) -> - lists:map(fun new_action/1, find_attrs(App, rule_action)). - --spec(find_resource_types(App :: atom()) -> list(resource_type())). -find_resource_types(App) -> - lists:map(fun new_resource_type/1, find_attrs(App, resource_type)). - -new_action({App, Mod, #{name := Name, - for := Hook, - types := Types, - create := Create, - params := ParamsSpec} = Params}) -> - ok = emqx_rule_validator:validate_spec(ParamsSpec), - #action{name = Name, for = Hook, app = App, types = Types, - category = maps:get(category, Params, other), - module = Mod, on_create = Create, - hidden = maps:get(hidden, Params, false), - on_destroy = maps:get(destroy, Params, undefined), - params_spec = ParamsSpec, - title = maps:get(title, Params, ?descr), - description = maps:get(description, Params, ?descr)}. - -new_resource_type({App, Mod, #{name := Name, - params := ParamsSpec, - create := Create} = Params}) -> - ok = emqx_rule_validator:validate_spec(ParamsSpec), - #resource_type{name = Name, provider = App, - params_spec = ParamsSpec, - on_create = {Mod, Create}, - on_status = {Mod, maps:get(status, Params, undefined)}, - on_destroy = {Mod, maps:get(destroy, Params, undefined)}, - title = maps:get(title, Params, ?descr), - description = maps:get(description, Params, ?descr)}. - -find_attrs(App, Def) -> - [{App, Mod, Attr} || {ok, Modules} <- [application:get_key(App, modules)], - Mod <- Modules, - {Name, Attrs} <- module_attributes(Mod), Name =:= Def, - Attr <- Attrs]. - -module_attributes(Module) -> - try Module:module_info(attributes) - catch - error:undef -> [] - end. - %%------------------------------------------------------------------------------ %% APIs for rules and resources %%------------------------------------------------------------------------------ --dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]). -spec create_rule(map()) -> {ok, rule()} | {error, term()}. -create_rule(Params = #{rawsql := Sql, actions := ActArgs}) -> - case emqx_rule_sqlparser:parse_select(Sql) of - {ok, Select} -> - RuleId = maps:get(id, Params, rule_id()), - Enabled = maps:get(enabled, Params, true), - try prepare_actions(ActArgs, Enabled) of - Actions -> - Rule = #rule{ - id = RuleId, - rawsql = Sql, - for = emqx_rule_sqlparser:select_from(Select), - is_foreach = emqx_rule_sqlparser:select_is_foreach(Select), - fields = emqx_rule_sqlparser:select_fields(Select), - doeach = emqx_rule_sqlparser:select_doeach(Select), - incase = emqx_rule_sqlparser:select_incase(Select), - conditions = emqx_rule_sqlparser:select_where(Select), - on_action_failed = maps:get(on_action_failed, Params, continue), - actions = Actions, - enabled = Enabled, - created_at = erlang:system_time(millisecond), - description = maps:get(description, Params, ""), - state = normal - }, - ok = emqx_rule_registry:add_rule(Rule), - ok = emqx_rule_metrics:create_rule_metrics(RuleId), - {ok, Rule} - catch - throw:{action_not_found, ActionName} -> - {error, {action_not_found, ActionName}}; - throw:Reason -> - {error, Reason} - end; - Reason -> {error, Reason} - end. - --spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}). -update_rule(Params = #{id := RuleId}) -> +create_rule(Params = #{id := RuleId}) -> case emqx_rule_registry:get_rule(RuleId) of - {ok, Rule0} -> - try may_update_rule_params(Rule0, Params) of - Rule -> - ok = emqx_rule_registry:add_rule(Rule), - {ok, Rule} - catch - throw:Reason -> - {error, Reason} - end; - not_found -> - {error, {not_found, RuleId}} + not_found -> do_create_rule(Params); + {ok, _} -> {error, {already_exists, RuleId}} end. --spec(delete_rule(RuleId :: rule_id()) -> ok). +-spec update_rule(map()) -> {ok, rule()} | {error, term()}. +update_rule(Params = #{id := RuleId}) -> + case delete_rule(RuleId) of + ok -> do_create_rule(Params); + Error -> Error + end. + +-spec(delete_rule(RuleId :: rule_id()) -> ok | {error, term()}). delete_rule(RuleId) -> case emqx_rule_registry:get_rule(RuleId) of - {ok, Rule = #rule{actions = Actions}} -> - try - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_rule, [Rule]), - ok = emqx_rule_registry:remove_rule(Rule) - catch - Error:Reason:ST -> - ?LOG(error, "clear_rule ~p failed: ~p", [RuleId, {Error, Reason, ST}]), - refresh_actions(Actions) - end; - not_found -> - ok - end. - --spec(create_resource(#{type := _, config := _, _ => _}) -> {ok, resource()} | {error, Reason :: term()}). -create_resource(#{type := Type, config := Config0} = Params) -> - case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} -> - Config = emqx_rule_validator:validate_params(Config0, ParamSpec), - ResId = maps:get(id, Params, resource_id()), - Resource = #resource{id = ResId, - type = Type, - config = Config, - description = iolist_to_binary(maps:get(description, Params, "")), - created_at = erlang:system_time(millisecond) - }, - ok = emqx_rule_registry:add_resource(Resource), - %% Note that we will return OK in case of resource creation failure, - %% A timer is started to re-start the resource later. - catch _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [M, F, ResId, Config]), - {ok, Resource}; - not_found -> - {error, {resource_type_not_found, Type}} - end. - --spec(update_resource(resource_id(), map()) -> ok | {error, Reason :: term()}). -update_resource(ResId, NewParams) -> - case emqx_rule_registry:find_enabled_rules_depends_on_resource(ResId) of - [] -> check_and_update_resource(ResId, NewParams); - Rules -> - {error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}} - end. - -check_and_update_resource(Id, NewParams) -> - case emqx_rule_registry:find_resource(Id) of - {ok, #resource{id = Id, type = Type, config = OldConfig, description = OldDescr}} -> - try - Conifg = maps:get(<<"config">>, NewParams, OldConfig), - Descr = maps:get(<<"description">>, NewParams, OldDescr), - do_check_and_update_resource(#{id => Id, config => Conifg, type => Type, - description => Descr}) - catch Error:Reason:ST -> - ?LOG(error, "check_and_update_resource failed: ~0p", [{Error, Reason, ST}]), - {error, Reason} - end; - _Other -> - {error, not_found} - end. - -do_check_and_update_resource(#{id := Id, type := Type, description := NewDescription, - config := NewConfig}) -> - case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{on_create = {Module, Create}, - params_spec = ParamSpec}} -> - Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), - case test_resource(#{type => Type, config => NewConfig}) of - ok -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [Module, Create, Id, Config]), - emqx_rule_registry:add_resource(#resource{ - id = Id, - type = Type, - config = Config, - description = NewDescription, - created_at = erlang:system_time(millisecond) - }), - ok; - {error, Reason} -> - error({error, Reason}) - end - end. - --spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}). -start_resource(ResId) -> - case emqx_rule_registry:find_resource(ResId) of - {ok, #resource{type = ResType, config = Config}} -> - {ok, #resource_type{on_create = {Mod, Create}}} - = emqx_rule_registry:find_resource_type(ResType), - try - init_resource(Mod, Create, ResId, Config), - refresh_actions_of_a_resource(ResId) - catch - throw:Reason -> {error, Reason} - end; - not_found -> - {error, {resource_not_found, ResId}} - end. - --spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). -test_resource(#{type := Type, config := Config0}) -> - case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{on_create = {ModC, Create}, - on_destroy = {ModD, Destroy}, - params_spec = ParamSpec}} -> - Config = emqx_rule_validator:validate_params(Config0, ParamSpec), - ResId = resource_id(), - try - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_resource, [ModC, Create, ResId, Config]), - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), - ok - catch - throw:Reason -> {error, Reason} - end; - not_found -> - {error, {resource_type_not_found, Type}} - end. - --spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). -get_resource_status(ResId) -> - case emqx_rule_registry:find_resource(ResId) of - {ok, #resource{type = ResType}} -> - {ok, #resource_type{on_status = {Mod, OnStatus}}} - = emqx_rule_registry:find_resource_type(ResType), - Status = fetch_resource_status(Mod, OnStatus, ResId), - {ok, Status}; - not_found -> - {error, {resource_not_found, ResId}} - end. - --spec(get_resource_params(resource_id()) -> {ok, map()} | {error, Reason :: term()}). -get_resource_params(ResId) -> - case emqx_rule_registry:find_resource_params(ResId) of - {ok, #resource_params{params = Params}} -> - {ok, Params}; - not_found -> - {error, resource_not_initialized} - end. - --spec(delete_resource(resource_id()) -> ok | {error, Reason :: term()}). -delete_resource(ResId) -> - case emqx_rule_registry:find_resource(ResId) of - {ok, #resource{type = ResType}} -> - {ok, #resource_type{on_destroy = {ModD, Destroy}}} - = emqx_rule_registry:find_resource_type(ResType), - try - case emqx_rule_registry:remove_resource(ResId) of - ok -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_resource, [ModD, Destroy, ResId]), - ok; - {error, _} = R -> R - end - catch - throw:Reason -> {error, Reason} - end; + {ok, Rule} -> + ok = emqx_rule_registry:remove_rule(Rule), + _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, clear_rule_metrics, [RuleId]), + ok; not_found -> {error, not_found} end. -%%------------------------------------------------------------------------------ -%% Re-establish resources -%%------------------------------------------------------------------------------ - --spec(refresh_resources() -> ok). -refresh_resources() -> - lists:foreach(fun refresh_resource/1, - emqx_rule_registry:get_resources()). - -refresh_resource(Type) when is_atom(Type) -> - lists:foreach(fun refresh_resource/1, - emqx_rule_registry:get_resources_by_type(Type)); - -refresh_resource(#resource{id = ResId}) -> - emqx_rule_monitor:ensure_resource_retrier(ResId, ?T_RETRY). - --spec(refresh_rules() -> ok). -refresh_rules() -> - lists:foreach(fun - (#rule{enabled = true} = Rule) -> - try refresh_rule(Rule) - catch _:_ -> - emqx_rule_registry:add_rule(Rule#rule{enabled = false, state = refresh_failed_at_bootup}) - end; - (_) -> ok - end, emqx_rule_registry:get_rules()). - -refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) -> - ok = emqx_rule_metrics:create_rule_metrics(RuleId), - lists:foreach(fun emqx_rule_events:load/1, Topics), - refresh_actions(Actions). - --spec(refresh_resource_status() -> ok). -refresh_resource_status() -> - lists:foreach( - fun(#resource{id = ResId, type = ResType}) -> - case emqx_rule_registry:find_resource_type(ResType) of - {ok, #resource_type{on_status = {Mod, OnStatus}}} -> - _ = fetch_resource_status(Mod, OnStatus, ResId); - _ -> ok - end - end, emqx_rule_registry:get_resources()). - %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ -prepare_actions(Actions, NeedInit) -> - [prepare_action(Action, NeedInit) || Action <- Actions]. - -prepare_action(#{name := Name, args := Args0} = Action, NeedInit) -> - case emqx_rule_registry:find_action(Name) of - {ok, #action{module = Mod, on_create = Create, params_spec = ParamSpec}} -> - Args = emqx_rule_validator:validate_params(Args0, ParamSpec), - ActionInstId = maps:get(id, Action, action_instance_id(Name)), - case NeedInit of - true -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, ActionInstId, - with_resource_params(Args)]), - ok; - false -> ok - end, - #action_instance{ - id = ActionInstId, name = Name, args = Args, - fallbacks = prepare_actions(maps:get(fallbacks, Action, []), NeedInit) - }; - not_found -> - throw({action_not_found, Name}) - end. - -with_resource_params(Args = #{<<"$resource">> := ResId}) -> - case emqx_rule_registry:find_resource_params(ResId) of - {ok, #resource_params{params = Params}} -> - maps:merge(Args, Params); - not_found -> - throw({resource_not_initialized, ResId}) - end; -with_resource_params(Args) -> Args. - --dialyzer([{nowarn_function, may_update_rule_params/2}]). -may_update_rule_params(Rule, Params = #{rawsql := SQL}) -> - case emqx_rule_sqlparser:parse_select(SQL) of +do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> + case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> - may_update_rule_params( - Rule#rule{ - rawsql = SQL, - for = emqx_rule_sqlparser:select_from(Select), - is_foreach = emqx_rule_sqlparser:select_is_foreach(Select), - fields = emqx_rule_sqlparser:select_fields(Select), - doeach = emqx_rule_sqlparser:select_doeach(Select), - incase = emqx_rule_sqlparser:select_incase(Select), - conditions = emqx_rule_sqlparser:select_where(Select) - }, - maps:remove(rawsql, Params)); - Reason -> throw(Reason) - end; -may_update_rule_params(Rule = #rule{enabled = OldEnb, actions = Actions, state = OldState}, - Params = #{enabled := NewEnb}) -> - State = case {OldEnb, NewEnb} of - {false, true} -> - refresh_rule(Rule), - force_changed; - {true, false} -> - clear_actions(Actions), - force_changed; - _NoChange -> OldState - end, - may_update_rule_params(Rule#rule{enabled = NewEnb, state = State}, maps:remove(enabled, Params)); -may_update_rule_params(Rule, Params = #{description := Descr}) -> - may_update_rule_params(Rule#rule{description = Descr}, maps:remove(description, Params)); -may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) -> - may_update_rule_params(Rule#rule{on_action_failed = OnFailed}, - maps:remove(on_action_failed, Params)); -may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) -> - %% prepare new actions before removing old ones - NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)), - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, clear_actions, [OldActions]), - may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params)); -may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params - Rule. - -ignore_lib_apps(Apps) -> - LibApps = [kernel, stdlib, sasl, appmon, eldap, erts, - syntax_tools, ssl, crypto, mnesia, os_mon, - inets, goldrush, gproc, runtime_tools, - snmp, otp_mibs, public_key, asn1, ssh, hipe, - common_test, observer, webtool, xmerl, tools, - test_server, compiler, debugger, eunit, et, - wx], - [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)]. - -resource_id() -> - gen_id("resource:", fun emqx_rule_registry:find_resource/1). - -rule_id() -> - gen_id("rule:", fun emqx_rule_registry:get_rule/1). - -gen_id(Prefix, TestFun) -> - Id = iolist_to_binary([Prefix, emqx_misc:gen_id()]), - case TestFun(Id) of - not_found -> Id; - _Res -> gen_id(Prefix, TestFun) + Rule = #rule{ + id = RuleId, + created_at = erlang:system_time(millisecond), + info = #{ + enabled => maps:get(enabled, Params, true), + sql => Sql, + from => emqx_rule_sqlparser:select_from(Select), + outputs => Outputs, + description => maps:get(description, Params, ""), + %% -- calculated fields: + is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), + fields => emqx_rule_sqlparser:select_fields(Select), + doeach => emqx_rule_sqlparser:select_doeach(Select), + incase => emqx_rule_sqlparser:select_incase(Select), + conditions => emqx_rule_sqlparser:select_where(Select) + %% -- calculated fields end + } + }, + ok = emqx_rule_registry:add_rule(Rule), + _ = emqx_plugin_libs_rule:cluster_call(emqx_rule_metrics, create_rule_metrics, [RuleId]), + {ok, Rule}; + Reason -> {error, Reason} end. - -action_instance_id(ActionName) -> - iolist_to_binary([atom_to_list(ActionName), "_", integer_to_list(erlang:system_time())]). - -init_resource(Module, OnCreate, ResId, Config) -> - Params = ?RAISE(Module:OnCreate(ResId, Config), - {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), - ResParams = #resource_params{id = ResId, - params = Params, - status = #{is_alive => true}}, - emqx_rule_registry:add_resource_params(ResParams). - -init_action(Module, OnCreate, ActionInstId, Params) -> - ok = emqx_rule_metrics:create_metrics(ActionInstId), - case ?RAISE(Module:OnCreate(ActionInstId, Params), - {{init_action_failure, node()}, - {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of - {Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2 - ok = emqx_rule_registry:add_action_instance_params( - #action_instance_params{id = ActionInstId, params = NewParams, apply = Apply}); - {Bindings, NewParams} when is_list(Bindings) -> - ok = emqx_rule_registry:add_action_instance_params( - #action_instance_params{ - id = ActionInstId, params = NewParams, - apply = #{mod => Module, bindings => maps:from_list(Bindings)}}); - Apply when is_function(Apply) -> %% BACKW: =< e4.2.2 - ok = emqx_rule_registry:add_action_instance_params( - #action_instance_params{id = ActionInstId, params = Params, apply = Apply}) - end. - -clear_resource(_Module, undefined, ResId) -> - ok = emqx_rule_registry:remove_resource_params(ResId); -clear_resource(Module, Destroy, ResId) -> - case emqx_rule_registry:find_resource_params(ResId) of - {ok, #resource_params{params = Params}} -> - ?RAISE(Module:Destroy(ResId, Params), - {{destroy_resource_failure, node()}, {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}), - ok = emqx_rule_registry:remove_resource_params(ResId); - not_found -> - ok - end. - -clear_rule(#rule{id = RuleId, actions = Actions}) -> - clear_actions(Actions), - emqx_rule_metrics:clear_rule_metrics(RuleId), - ok. - -clear_actions(Actions) -> - lists:foreach( - fun(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks}) -> - {ok, #action{module = Mod, on_destroy = Destory}} = emqx_rule_registry:find_action(ActName), - clear_action(Mod, Destory, Id), - clear_actions(Fallbacks) - end, Actions). - -clear_action(_Module, undefined, ActionInstId) -> - emqx_rule_metrics:clear_metrics(ActionInstId), - ok = emqx_rule_registry:remove_action_instance_params(ActionInstId); -clear_action(Module, Destroy, ActionInstId) -> - case erlang:function_exported(Module, Destroy, 2) of - true -> - emqx_rule_metrics:clear_metrics(ActionInstId), - case emqx_rule_registry:get_action_instance_params(ActionInstId) of - {ok, #action_instance_params{params = Params}} -> - ?RAISE(Module:Destroy(ActionInstId, Params),{{destroy_action_failure, node()}, - {{Module, Destroy}, {_EXCLASS_,_EXCPTION_,_ST_}}}), - ok = emqx_rule_registry:remove_action_instance_params(ActionInstId); - not_found -> - ok - end; - false -> ok - end. - -fetch_resource_status(Module, OnStatus, ResId) -> - case emqx_rule_registry:find_resource_params(ResId) of - {ok, ResParams = #resource_params{params = Params, status = #{is_alive := LastIsAlive}}} -> - NewStatus = try - case Module:OnStatus(ResId, Params) of - #{is_alive := LastIsAlive} = Status -> Status; - #{is_alive := true} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:deactivate(Name), - Status; - #{is_alive := false} = Status -> - {ok, Type} = find_type(ResId), - Name = alarm_name_of_resource_down(Type, ResId), - emqx_alarm:activate(Name, #{id => ResId, type => Type}), - Status - end - catch _Error:Reason:STrace -> - ?LOG(error, "get resource status for ~p failed: ~0p", [ResId, {Reason, STrace}]), - #{is_alive => false} - end, - emqx_rule_registry:add_resource_params(ResParams#resource_params{status = NewStatus}), - NewStatus; - not_found -> - #{is_alive => false} - end. - -refresh_actions_of_a_resource(ResId) -> - R = fun (#action_instance{args = #{<<"$resource">> := ResId0}}) - when ResId0 =:= ResId -> true; - (_) -> false - end, - F = fun(#rule{actions = Actions}) -> refresh_actions(Actions, R) end, - lists:foreach(F, emqx_rule_registry:get_rules()). - -refresh_actions(Actions) -> - refresh_actions(Actions, fun(_) -> true end). -refresh_actions(Actions, Pred) -> - lists:foreach( - fun(#action_instance{args = Args, - id = Id, name = ActName, - fallbacks = Fallbacks} = ActionInst) -> - case Pred(ActionInst) of - true -> - {ok, #action{module = Mod, on_create = Create}} - = emqx_rule_registry:find_action(ActName), - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, init_action, [Mod, Create, Id, with_resource_params(Args)]), - refresh_actions(Fallbacks, Pred); - false -> ok - end - end, Actions). - -find_type(ResId) -> - {ok, #resource{type = Type}} = emqx_rule_registry:find_resource(ResId), - {ok, Type}. - -alarm_name_of_resource_down(Type, ResId) -> - list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 122ae7705..b1cb7b778 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -19,536 +19,293 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-behaviour(minirest_api). --rest_api(#{name => create_rule, - method => 'POST', - path => "/rules/", - func => create_rule, - descr => "Create a rule" - }). +-export([api_spec/0]). --rest_api(#{name => update_rule, - method => 'PUT', - path => "/rules/:bin:id", - func => update_rule, - descr => "Update a rule" - }). - --rest_api(#{name => list_rules, - method => 'GET', - path => "/rules/", - func => list_rules, - descr => "A list of all rules" - }). - --rest_api(#{name => show_rule, - method => 'GET', - path => "/rules/:bin:id", - func => show_rule, - descr => "Show a rule" - }). - --rest_api(#{name => delete_rule, - method => 'DELETE', - path => "/rules/:bin:id", - func => delete_rule, - descr => "Delete a rule" - }). - --rest_api(#{name => list_actions, - method => 'GET', - path => "/actions/", - func => list_actions, - descr => "A list of all actions" - }). - --rest_api(#{name => show_action, - method => 'GET', - path => "/actions/:atom:name", - func => show_action, - descr => "Show an action" - }). - --rest_api(#{name => list_resources, - method => 'GET', - path => "/resources/", - func => list_resources, - descr => "A list of all resources" - }). - --rest_api(#{name => create_resource, - method => 'POST', - path => "/resources/", - func => create_resource, - descr => "Create a resource" - }). - --rest_api(#{name => update_resource, - method => 'PUT', - path => "/resources/:bin:id", - func => update_resource, - descr => "Update a resource" - }). - --rest_api(#{name => show_resource, - method => 'GET', - path => "/resources/:bin:id", - func => show_resource, - descr => "Show a resource" - }). - --rest_api(#{name => get_resource_status, - method => 'GET', - path => "/resource_status/:bin:id", - func => get_resource_status, - descr => "Get status of a resource" - }). - --rest_api(#{name => start_resource, - method => 'POST', - path => "/resources/:bin:id", - func => start_resource, - descr => "Start a resource" - }). - --rest_api(#{name => delete_resource, - method => 'DELETE', - path => "/resources/:bin:id", - func => delete_resource, - descr => "Delete a resource" - }). - --rest_api(#{name => list_resource_types, - method => 'GET', - path => "/resource_types/", - func => list_resource_types, - descr => "List all resource types" - }). - --rest_api(#{name => show_resource_type, - method => 'GET', - path => "/resource_types/:atom:name", - func => show_resource_type, - descr => "Show a resource type" - }). - --rest_api(#{name => list_resources_by_type, - method => 'GET', - path => "/resource_types/:atom:type/resources", - func => list_resources_by_type, - descr => "List all resources of a resource type" - }). - --rest_api(#{name => list_events, - method => 'GET', - path => "/rule_events/", - func => list_events, - descr => "List all events with detailed info" - }). - --export([ create_rule/2 - , update_rule/2 - , list_rules/2 - , show_rule/2 - , delete_rule/2 +-export([ crud_rules/2 + , list_events/2 + , crud_rules_by_id/2 + , rule_test/2 ]). --export([ list_actions/2 - , show_action/2 - ]). - --export([ create_resource/2 - , list_resources/2 - , show_resource/2 - , get_resource_status/2 - , start_resource/2 - , delete_resource/2 - , update_resource/2 - ]). - --export([ list_resource_types/2 - , list_resources_by_type/2 - , show_resource_type/2 - ]). - --export([list_events/2]). - -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))). --define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))). --define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))). --define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))). --define(ERR_DEP_RULES_EXISTS(RULEIDS), list_to_binary(io_lib:format("Found rules ~0p depends on this resource, disable them first", [(RULEIDS)]))). -define(ERR_BADARGS(REASON), begin - R0 = list_to_binary(io_lib:format("~0p", [REASON])), + R0 = err_msg(REASON), <<"Bad Arguments: ", R0/binary>> end). +-define(CHECK_PARAMS(PARAMS, TAG, EXPR), + case emqx_rule_api_schema:check_params(PARAMS, TAG) of + {ok, CheckedParams} -> + EXPR; + {error, REASON} -> + {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(REASON)}} + end). --dialyzer({nowarn_function, [create_rule/2, - test_rule_sql/1, - do_create_rule/1, - update_rule/2 - ]}). +api_spec() -> + { + [ api_rules_list_create() + , api_rules_crud() + , api_rule_test() + , api_events_list() + ], + [] + }. + +api_rules_list_create() -> + Metadata = #{ + get => #{ + description => <<"List all rules">>, + responses => #{ + <<"200">> => + emqx_mgmt_util:array_schema(resp_schema(), <<"List rules successfully">>)}}, + post => #{ + description => <<"Create a new rule using given Id to all nodes in the cluster">>, + requestBody => emqx_mgmt_util:schema(post_req_schema(), <<"Rule parameters">>), + responses => #{ + <<"400">> => + emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), + <<"201">> => + emqx_mgmt_util:schema(resp_schema(), <<"Create rule successfully">>)}} + }, + {"/rules", Metadata, crud_rules}. + +api_events_list() -> + Metadata = #{ + get => #{ + description => <<"List all events can be used in rules">>, + responses => #{ + <<"200">> => + emqx_mgmt_util:array_schema(resp_schema(), <<"List events successfully">>)}} + }, + {"/rule_events", Metadata, list_events}. + +api_rules_crud() -> + Metadata = #{ + get => #{ + description => <<"Get a rule by given Id">>, + parameters => [param_path_id()], + responses => #{ + <<"404">> => + emqx_mgmt_util:error_schema(<<"Rule not found">>, ['NOT_FOUND']), + <<"200">> => + emqx_mgmt_util:schema(resp_schema(), <<"Get rule successfully">>)}}, + put => #{ + description => <<"Create or update a rule by given Id to all nodes in the cluster">>, + parameters => [param_path_id()], + requestBody => emqx_mgmt_util:schema(put_req_schema(), <<"Rule parameters">>), + responses => #{ + <<"400">> => + emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), + <<"200">> => + emqx_mgmt_util:schema(resp_schema(), <<"Create or update rule successfully">>)}}, + delete => #{ + description => <<"Delete a rule by given Id from all nodes in the cluster">>, + parameters => [param_path_id()], + responses => #{ + <<"200">> => + emqx_mgmt_util:schema(<<"Delete rule successfully">>)}} + }, + {"/rules/:id", Metadata, crud_rules_by_id}. + +api_rule_test() -> + Metadata = #{ + post => #{ + description => <<"Test a rule">>, + requestBody => emqx_mgmt_util:schema(rule_test_req_schema(), <<"Rule parameters">>), + responses => #{ + <<"400">> => + emqx_mgmt_util:error_schema(<<"Invalid Parameters">>, ['BAD_ARGS']), + <<"412">> => + emqx_mgmt_util:error_schema(<<"SQL Not Match">>, ['NOT_MATCH']), + <<"200">> => + emqx_mgmt_util:schema(rule_test_resp_schema(), <<"Rule Test Pass">>)}} + }, + {"/rule_test", Metadata, rule_test}. + +put_req_schema() -> + #{type => object, + properties => #{ + sql => #{ + description => <<"The SQL">>, + type => string, + example => <<"SELECT * from \"t/1\"">> + }, + enable => #{ + description => <<"Enable or disable the rule">>, + type => boolean, + example => true + }, + outputs => #{ + description => <<"The outputs of the rule">>, + type => array, + items => #{ + type => string, + example => <<"console">> + } + }, + description => #{ + description => <<"The description for the rule">>, + type => string, + example => <<"A simple rule that handles MQTT messages from topic \"t/1\"">> + } + } + }. + +post_req_schema() -> + Req = #{properties := Prop} = put_req_schema(), + Req#{properties => Prop#{ + id => #{ + description => <<"The Id for the rule">>, + example => <<"my_rule">>, + type => string + } + }}. + +resp_schema() -> + Req = #{properties := Prop} = put_req_schema(), + Req#{properties => Prop#{ + id => #{ + description => <<"The Id for the rule">>, + type => string + }, + created_at => #{ + description => <<"The time that this rule was created, in rfc3339 format">>, + type => string, + example => <<"2021-09-18T13:57:29+08:00">> + } + }}. + +rule_test_req_schema() -> + #{type => object, properties => #{ + sql => #{ + description => <<"The SQL">>, + type => string, + example => <<"SELECT * from \"t/1\"">> + }, + context => #{ + type => object, + properties => #{ + event_type => #{ + description => <<"Event Type">>, + type => string, + enum => ["message_publish", "message_acked", "message_delivered", + "message_dropped", "session_subscribed", "session_unsubscribed", + "client_connected", "client_disconnected"], + example => <<"message_publish">> + }, + clientid => #{ + description => <<"The Client ID">>, + type => string, + example => <<"\"c_emqx\"">> + }, + topic => #{ + description => <<"The Topic">>, + type => string, + example => <<"t/1">> + } + } + } + }}. + +rule_test_resp_schema() -> + #{type => object}. + +param_path_id() -> + #{ + name => id, + in => path, + schema => #{type => string}, + required => true + }. %%------------------------------------------------------------------------------ %% Rules API %%------------------------------------------------------------------------------ -create_rule(_Bindings, Params) -> - if_test(fun() -> test_rule_sql(Params) end, - fun() -> do_create_rule(Params) end, - Params). - -test_rule_sql(Params) -> - case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of - {ok, Result} -> return({ok, Result}); - {error, nomatch} -> return({error, 404, <<"SQL Not Match">>}); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -do_create_rule(Params) -> - case emqx_rule_engine:create_rule(parse_rule_params(Params)) of - {ok, Rule} -> return({ok, record_to_map(Rule)}); - {error, {action_not_found, ActionName}} -> - return({error, 400, ?ERR_NO_ACTION(ActionName)}); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -update_rule(#{id := Id}, Params) -> - case emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of - {ok, Rule} -> return({ok, record_to_map(Rule)}); - {error, {not_found, RuleId}} -> - return({error, 400, ?ERR_NO_RULE(RuleId)}); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -list_rules(_Bindings, _Params) -> - return_all(emqx_rule_registry:get_rules_ordered_by_ts()). - -show_rule(#{id := Id}, _Params) -> - reply_with(fun emqx_rule_registry:get_rule/1, Id). - -delete_rule(#{id := Id}, _Params) -> - ok = emqx_rule_engine:delete_rule(Id), - return(ok). - -%%------------------------------------------------------------------------------ -%% Actions API -%%------------------------------------------------------------------------------ - -list_actions(#{}, _Params) -> - return_all( - sort_by_title(action, - emqx_rule_registry:get_actions())). - -show_action(#{name := Name}, _Params) -> - reply_with(fun emqx_rule_registry:find_action/1, Name). - -%%------------------------------------------------------------------------------ -%% Resources API -%%------------------------------------------------------------------------------ -create_resource(#{}, Params) -> - case parse_resource_params(Params) of - {ok, ParsedParams} -> - if_test(fun() -> do_create_resource(test_resource, ParsedParams) end, - fun() -> do_create_resource(create_resource, ParsedParams) end, - Params); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -do_create_resource(Create, ParsedParams) -> - case emqx_rule_engine:Create(ParsedParams) of - ok -> - return(ok); - {ok, Resource} -> - return({ok, record_to_map(Resource)}); - {error, {resource_type_not_found, Type}} -> - return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)}); - {error, {init_resource, _}} -> - return({error, 500, <<"Init resource failure!">>}); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -list_resources(#{}, _Params) -> - Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), - Data = lists:map(fun(Res = #{id := Id}) -> - Status = lists:all(fun(Node) -> - case rpc:call(Node, emqx_rule_registry, find_resource_params, [Id]) of - {ok, #resource_params{status = #{is_alive := true}}} -> true; - _ -> false - end - end, ekka_mnesia:running_nodes()), - maps:put(status, Status, Res) - end, Data0), - return({ok, Data}). - -list_resources_by_type(#{type := Type}, _Params) -> - return_all(emqx_rule_registry:get_resources_by_type(Type)). - -show_resource(#{id := Id}, _Params) -> - case emqx_rule_registry:find_resource(Id) of - {ok, R} -> - Status = - [begin - {ok, St} = rpc:call(Node, emqx_rule_engine, get_resource_status, [Id]), - maps:put(node, Node, St) - end || Node <- ekka_mnesia:running_nodes()], - return({ok, maps:put(status, Status, record_to_map(R))}); - not_found -> - return({error, 404, <<"Not Found">>}) - end. - -get_resource_status(#{id := Id}, _Params) -> - case emqx_rule_engine:get_resource_status(Id) of - {ok, Status} -> - return({ok, Status}); - {error, {resource_not_found, ResId}} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}) - end. - -start_resource(#{id := Id}, _Params) -> - case emqx_rule_engine:start_resource(Id) of - ok -> - return(ok); - {error, {resource_not_found, ResId}} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}); - {error, Reason} -> - ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -update_resource(#{id := Id}, NewParams) -> - P1 = case proplists:get_value(<<"description">>, NewParams) of - undefined -> #{}; - Value -> #{<<"description">> => Value} - end, - P2 = case proplists:get_value(<<"config">>, NewParams) of - undefined -> #{}; - [{}] -> #{}; - Config -> #{<<"config">> => ?RAISE(json_term_to_map(Config), {invalid_config, Config})} - end, - case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of - ok -> - return(ok); - {error, not_found} -> - return({error, 400, <<"Resource not found:", Id/binary>>}); - {error, {init_resource, _}} -> - return({error, 500, <<"Init resource failure:", Id/binary>>}); - {error, {dependent_rules_exists, RuleIds}} -> - return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)}); - {error, Reason} -> - ?LOG(error, "Resource update failed: ~0p", [Reason]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -delete_resource(#{id := Id}, _Params) -> - case emqx_rule_engine:delete_resource(Id) of - ok -> return(ok); - {error, not_found} -> return(ok); - {error, {dependent_rules_exists, RuleIds}} -> - return({error, 400, ?ERR_DEP_RULES_EXISTS(RuleIds)}); - {error, Reason} -> - return({error, 400, ?ERR_BADARGS(Reason)}) - end. - -%%------------------------------------------------------------------------------ -%% Resource Types API -%%------------------------------------------------------------------------------ - -list_resource_types(#{}, _Params) -> - return_all( - sort_by_title(resource_type, - emqx_rule_registry:get_resource_types())). - -show_resource_type(#{name := Name}, _Params) -> - reply_with(fun emqx_rule_registry:find_resource_type/1, Name). - - -%%------------------------------------------------------------------------------ -%% Events API -%%------------------------------------------------------------------------------ list_events(#{}, _Params) -> - return({ok, emqx_rule_events:event_info()}). + {200, emqx_rule_events:event_info()}. + +crud_rules(get, _Params) -> + Records = emqx_rule_registry:get_rules_ordered_by_ts(), + {200, format_rule_resp(Records)}; + +crud_rules(post, #{body := Params}) -> + ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of + {ok, Rule} -> {201, format_rule_resp(Rule)}; + {error, Reason} -> + ?LOG(error, "create rule failed: ~0p", [Reason]), + {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + end). + +rule_test(post, #{body := Params}) -> + ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of + {ok, Result} -> {200, Result}; + {error, nomatch} -> {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}}; + {error, Reason} -> + ?LOG(error, "rule test failed: ~0p", [Reason]), + {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + end). + +crud_rules_by_id(get, #{bindings := #{id := Id}}) -> + case emqx_rule_registry:get_rule(Id) of + {ok, Rule} -> + {200, format_rule_resp(Rule)}; + not_found -> + {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} + end; + +crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params0}) -> + Params = maps:merge(Params0, #{id => Id}), + ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:update_rule(CheckedParams) of + {ok, Rule} -> {200, format_rule_resp(Rule)}; + {error, not_found} -> + {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}}; + {error, Reason} -> + ?LOG(error, "update rule failed: ~0p", [Reason]), + {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + end); + +crud_rules_by_id(delete, #{bindings := #{id := Id}}) -> + case emqx_rule_engine:delete_rule(Id) of + ok -> {200}; + {error, not_found} -> {200}; + {error, Reason} -> + ?LOG(error, "delete rule failed: ~0p", [Reason]), + {500, #{code => 'UNKNOW_ERROR', message => err_msg(Reason)}} + end. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ +err_msg(Msg) -> + list_to_binary(io_lib:format("~0p", [Msg])). -if_test(True, False, Params) -> - case proplists:get_value(<<"test">>, Params) of - Test when Test =:= true; Test =:= <<"true">> -> - True(); - _ -> - False() - end. -return_all(Records) -> - Data = lists:foldr(fun maybe_record_to_map/2, [], Records), - return({ok, Data}). +format_rule_resp(Rules) when is_list(Rules) -> + [format_rule_resp(R) || R <- Rules]; -maybe_record_to_map(Rec, Acc) -> - case record_to_map(Rec) of - ignore -> Acc; - Map -> [Map | Acc] - end. - -reply_with(Find, Key) -> - case Find(Key) of - {ok, R} -> - return({ok, record_to_map(R)}); - not_found -> - return({error, 404, <<"Not Found">>}) - end. - -record_to_map(#rule{id = Id, - for = Hook, - rawsql = RawSQL, - actions = Actions, - on_action_failed = OnFailed, - enabled = Enabled, - description = Descr}) -> +format_rule_resp(#rule{id = Id, created_at = CreatedAt, + info = #{ + from := Topics, + outputs := Output, + sql := SQL, + enabled := Enabled, + description := Descr}}) -> #{id => Id, - for => Hook, - rawsql => RawSQL, - actions => printable_actions(Actions), - on_action_failed => OnFailed, + from => Topics, + outputs => Output, + sql => SQL, metrics => get_rule_metrics(Id), enabled => Enabled, - description => Descr - }; - -record_to_map(#action{hidden = true}) -> - ignore; -record_to_map(#action{name = Name, - category = Category, - app = App, - for = Hook, - types = Types, - params_spec = Params, - title = Title, - description = Descr}) -> - #{name => Name, - category => Category, - app => App, - for => Hook, - types => Types, - params => Params, - title => Title, - description => Descr - }; - -record_to_map(#resource{id = Id, - type = Type, - config = Config, - description = Descr}) -> - #{id => Id, - type => Type, - config => Config, - description => Descr - }; - -record_to_map(#resource_type{name = Name, - provider = Provider, - params_spec = Params, - title = Title, - description = Descr}) -> - #{name => Name, - provider => Provider, - params => Params, - title => Title, + created_at => format_datetime(CreatedAt, millisecond), description => Descr }. -printable_actions(Actions) -> - [#{id => Id, name => Name, params => Args, - metrics => get_action_metrics(Id), - fallbacks => printable_actions(Fallbacks)} - || #action_instance{id = Id, name = Name, args = Args, fallbacks = Fallbacks} <- Actions]. - -parse_rule_params(Params) -> - parse_rule_params(Params, #{description => <<"">>}). -parse_rule_params([], Rule) -> - Rule; -parse_rule_params([{<<"id">>, Id} | Params], Rule) -> - parse_rule_params(Params, Rule#{id => Id}); -parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) -> - parse_rule_params(Params, Rule#{rawsql => RawSQL}); -parse_rule_params([{<<"enabled">>, Enabled} | Params], Rule) -> - parse_rule_params(Params, Rule#{enabled => enabled(Enabled)}); -parse_rule_params([{<<"on_action_failed">>, OnFailed} | Params], Rule) -> - parse_rule_params(Params, Rule#{on_action_failed => on_failed(OnFailed)}); -parse_rule_params([{<<"actions">>, Actions} | Params], Rule) -> - parse_rule_params(Params, Rule#{actions => parse_actions(Actions)}); -parse_rule_params([{<<"description">>, Descr} | Params], Rule) -> - parse_rule_params(Params, Rule#{description => Descr}); -parse_rule_params([_ | Params], Rule) -> - parse_rule_params(Params, Rule). - -on_failed(<<"continue">>) -> continue; -on_failed(<<"stop">>) -> stop; -on_failed(OnFailed) -> error({invalid_on_failed, OnFailed}). - -enabled(Enabled) when is_boolean(Enabled) -> Enabled; -enabled(Enabled) -> error({invalid_enabled, Enabled}). - -parse_actions(Actions) -> - [parse_action(json_term_to_map(A)) || A <- Actions]. - -parse_action(Action) -> - #{name => binary_to_existing_atom(maps:get(<<"name">>, Action), utf8), - args => maps:get(<<"params">>, Action, #{}), - fallbacks => parse_actions(maps:get(<<"fallbacks">>, Action, []))}. - -parse_resource_params(Params) -> - parse_resource_params(Params, #{config => #{}, description => <<"">>}). -parse_resource_params([], Res) -> - {ok, Res}; -parse_resource_params([{<<"id">>, Id} | Params], Res) -> - parse_resource_params(Params, Res#{id => Id}); -parse_resource_params([{<<"type">>, ResourceType} | Params], Res) -> - try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)}) - catch error:badarg -> - {error, {resource_type_not_found, ResourceType}} - end; -parse_resource_params([{<<"config">>, Config} | Params], Res) -> - parse_resource_params(Params, Res#{config => json_term_to_map(Config)}); -parse_resource_params([{<<"description">>, Descr} | Params], Res) -> - parse_resource_params(Params, Res#{description => Descr}); -parse_resource_params([_ | Params], Res) -> - parse_resource_params(Params, Res). - -json_term_to_map(List) -> - emqx_json:decode(emqx_json:encode(List), [return_maps]). - -sort_by_title(action, Actions) -> - sort_by(#action.title, Actions); -sort_by_title(resource_type, ResourceTypes) -> - sort_by(#resource_type.title, ResourceTypes). - -sort_by(Pos, TplList) -> - lists:sort( - fun(RecA, RecB) -> - maps:get(en, element(Pos, RecA), 0) - =< maps:get(en, element(Pos, RecB), 0) - end, TplList). +format_datetime(Timestamp, Unit) -> + list_to_binary(calendar:system_time_to_rfc3339(Timestamp, [{unit, Unit}])). get_rule_metrics(Id) -> [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) || Node <- ekka_mnesia:running_nodes()]. - -get_action_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id])) - || Node <- ekka_mnesia:running_nodes()]. - -%% TODO: V5 API -return(_) -> ok. \ No newline at end of file diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 5893f9827..052f916b3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -23,13 +23,7 @@ -export([stop/1]). start(_Type, _Args) -> - {ok, Sup} = emqx_rule_engine_sup:start_link(), - _ = emqx_rule_engine_sup:start_locker(), - ok = emqx_rule_engine:load_providers(), - ok = emqx_rule_engine:refresh_resources(), - ok = emqx_rule_engine:refresh_rules(), - ok = emqx_rule_engine_cli:load(), - {ok, Sup}. + emqx_rule_engine_sup:start_link(). stop(_State) -> ok = emqx_rule_events:unload(), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 9ff5ce741..4fad54a4b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -22,17 +22,12 @@ -export([start_link/0]). --export([start_locker/0]). - -export([init/1]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Opts = [public, named_table, set, {read_concurrency, true}], - _ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]), - _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), Registry = #{id => emqx_rule_registry, start => {emqx_rule_registry, start_link, []}, restart => permanent, @@ -45,19 +40,4 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_metrics]}, - Monitor = #{id => emqx_rule_monitor, - start => {emqx_rule_monitor, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_rule_monitor]}, - {ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}. - -start_locker() -> - Locker = #{id => emqx_rule_locker, - start => {emqx_rule_locker, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_rule_locker]}, - supervisor:start_child(?MODULE, Locker). + {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a0960df25..151af84f0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -79,10 +79,9 @@ unload(Topic) -> %%-------------------------------------------------------------------- on_message_publish(Message = #message{topic = Topic}, _Env) -> case ignore_sys_message(Message) of - true -> - ok; + true -> ok; false -> - case emqx_rule_registry:get_rules_for(Topic) of + case emqx_rule_registry:get_rules_for_topic(Topic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message)) end @@ -297,7 +296,7 @@ with_basic_columns(EventName, Data) when is_map(Data) -> %%-------------------------------------------------------------------- apply_event(EventName, GenEventMsg, _Env) -> EventTopic = event_topic(EventName), - case emqx_rule_registry:get_rules_for(EventTopic) of + case emqx_rule_registry:get_rules_for_topic(EventTopic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index f73858f32..3f60d97c9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -17,6 +17,8 @@ -module(emqx_rule_funcs). -include("rule_engine.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). %% IoT Funcs -export([ msgid/0 @@ -36,6 +38,8 @@ , contains_topic_match/2 , contains_topic_match/3 , null/0 + , republish/3 + , republish/4 ]). %% Arithmetic Funcs @@ -305,6 +309,22 @@ find_topic_filter(Filter, TopicFilters, Func) -> null() -> undefined. +republish(Topic, Payload, Qos) -> + republish(Topic, Payload, Qos, false). + +republish(Topic, Payload, Qos, Retain) -> + Msg = #message{ + id = emqx_guid:gen(), + qos = Qos, + from = republish_function, + flags = #{retain => Retain}, + headers = #{}, + topic = Topic, + payload = Payload, + timestamp = erlang:system_time(millisecond) + }, + emqx_broker:safe_publish(Msg). + %%------------------------------------------------------------------------------ %% Arithmetic Funcs %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 8db444a7c..874514a03 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -26,42 +26,21 @@ ]). -export([ get_rules_matched/1 - , get_actions_taken/1 - , get_actions_success/1 - , get_actions_error/1 - , get_actions_exception/1 - , get_actions_retry/1 ]). -export([ inc_rules_matched/1 , inc_rules_matched/2 - , inc_actions_taken/1 - , inc_actions_taken/2 - , inc_actions_success/1 - , inc_actions_success/2 - , inc_actions_error/1 - , inc_actions_error/2 - , inc_actions_exception/1 - , inc_actions_exception/2 - , inc_actions_retry/1 - , inc_actions_retry/2 ]). -export([ inc/2 , inc/3 , get/2 - , get_overall/1 , get_rule_speed/1 - , get_overall_rule_speed/0 , create_rule_metrics/1 - , create_metrics/1 , clear_rule_metrics/1 - , clear_metrics/1 - , overall_metrics/0 ]). -export([ get_rule_metrics/1 - , get_action_metrics/1 ]). %% gen_server callbacks @@ -82,7 +61,7 @@ -define(SAMPLING, 1). -endif. --define(CRefID(ID), {?MODULE, ID}). +-define(CntrRef, ?MODULE). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). -record(rule_speed, { @@ -99,48 +78,32 @@ -record(state, { metric_ids = sets:new(), - rule_speeds :: undefined | #{rule_id() => #rule_speed{}}, - overall_rule_speed :: #rule_speed{} + rule_speeds :: undefined | #{rule_id() => #rule_speed{}} }). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ + -spec(create_rule_metrics(rule_id()) -> ok). create_rule_metrics(Id) -> gen_server:call(?MODULE, {create_rule_metrics, Id}). --spec(create_metrics(rule_id()) -> ok). -create_metrics(Id) -> - gen_server:call(?MODULE, {create_metrics, Id}). - -spec(clear_rule_metrics(rule_id()) -> ok). clear_rule_metrics(Id) -> gen_server:call(?MODULE, {delete_rule_metrics, Id}). --spec(clear_metrics(rule_id()) -> ok). -clear_metrics(Id) -> - gen_server:call(?MODULE, {delete_metrics, Id}). - -spec(get(rule_id(), atom()) -> number()). get(Id, Metric) -> - case couters_ref(Id) of + case get_couters_ref(Id) of not_found -> 0; Ref -> counters:get(Ref, metrics_idx(Metric)) end. --spec(get_overall(atom()) -> number()). -get_overall(Metric) -> - emqx_metrics:val(Metric). - -spec(get_rule_speed(rule_id()) -> map()). get_rule_speed(Id) -> gen_server:call(?MODULE, {get_rule_speed, Id}). --spec(get_overall_rule_speed() -> map()). -get_overall_rule_speed() -> - gen_server:call(?MODULE, get_overall_rule_speed). - -spec(get_rule_metrics(rule_id()) -> map()). get_rule_metrics(Id) -> #{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id), @@ -150,95 +113,39 @@ get_rule_metrics(Id) -> speed_last5m => Last5M }. --spec(get_action_metrics(action_instance_id()) -> map()). -get_action_metrics(Id) -> - #{success => get_actions_success(Id), - failed => get_actions_error(Id) + get_actions_exception(Id), - taken => get_actions_taken(Id) - }. - -spec inc(rule_id(), atom()) -> ok. inc(Id, Metric) -> inc(Id, Metric, 1). -spec inc(rule_id(), atom(), pos_integer()) -> ok. inc(Id, Metric, Val) -> - case couters_ref(Id) of + case get_couters_ref(Id) of not_found -> %% this may occur when increasing a counter for %% a rule that was created from a remove node. - case atom_to_list(Metric) of - "rules." ++ _ -> create_rule_metrics(Id); - _ -> create_metrics(Id) - end, - counters:add(couters_ref(Id), metrics_idx(Metric), Val); + create_rule_metrics(Id), + counters:add(get_couters_ref(Id), metrics_idx(Metric), Val); Ref -> counters:add(Ref, metrics_idx(Metric), Val) - end, - inc_overall(Metric, Val). - --spec(inc_overall(atom(), pos_integer()) -> ok). -inc_overall(Metric, Val) -> - emqx_metrics:inc(Metric, Val). + end. inc_rules_matched(Id) -> inc_rules_matched(Id, 1). inc_rules_matched(Id, Val) -> inc(Id, 'rules.matched', Val). -inc_actions_taken(Id) -> - inc_actions_taken(Id, 1). -inc_actions_taken(Id, Val) -> - inc(Id, 'actions.taken', Val). - -inc_actions_success(Id) -> - inc_actions_success(Id, 1). -inc_actions_success(Id, Val) -> - inc(Id, 'actions.success', Val). - -inc_actions_error(Id) -> - inc_actions_error(Id, 1). -inc_actions_error(Id, Val) -> - inc(Id, 'actions.error', Val). - -inc_actions_exception(Id) -> - inc_actions_exception(Id, 1). -inc_actions_exception(Id, Val) -> - inc(Id, 'actions.exception', Val). - -inc_actions_retry(Id) -> - inc_actions_retry(Id, 1). -inc_actions_retry(Id, Val) -> - inc(Id, 'actions.retry', Val). - get_rules_matched(Id) -> get(Id, 'rules.matched'). -get_actions_taken(Id) -> - get(Id, 'actions.taken'). - -get_actions_success(Id) -> - get(Id, 'actions.success'). - -get_actions_error(Id) -> - get(Id, 'actions.error'). - -get_actions_exception(Id) -> - get(Id, 'actions.exception'). - -get_actions_retry(Id) -> - get(Id, 'actions.retry'). - start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> erlang:process_flag(trap_exit, true), - %% the overall counters - [ok = emqx_metrics:ensure(Metric)|| Metric <- overall_metrics()], %% the speed metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - {ok, #state{overall_rule_speed = #rule_speed{}}}. + persistent_term:put(?CntrRef, #{}), + {ok, #state{}}. handle_call({get_rule_speed, _Id}, _From, State = #state{rule_speeds = undefined}) -> {reply, format_rule_speed(#rule_speed{}), State}; @@ -248,12 +155,6 @@ handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds Speed -> format_rule_speed(Speed) end, State}; -handle_call(get_overall_rule_speed, _From, State = #state{overall_rule_speed = RuleSpeed}) -> - {reply, format_rule_speed(RuleSpeed), State}; - -handle_call({create_metrics, Id}, _From, State = #state{metric_ids = MIDs}) -> - {reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs)}}; - handle_call({create_rule_metrics, Id}, _From, State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) -> {reply, create_counters(Id), @@ -263,10 +164,6 @@ handle_call({create_rule_metrics, Id}, _From, _ -> RuleSpeeds#{Id => #rule_speed{}} end}}; -handle_call({delete_metrics, Id}, _From, - State = #state{metric_ids = MIDs, rule_speeds = undefined}) -> - {reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs)}}; - handle_call({delete_rule_metrics, Id}, _From, State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) -> {reply, delete_counters(Id), @@ -283,21 +180,16 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info(ticking, State = #state{rule_speeds = undefined}) -> - async_refresh_resource_status(), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; -handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0, - overall_rule_speed = OverallRuleSpeed0}) -> +handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) -> RuleSpeeds = maps:map( fun(Id, RuleSpeed) -> calculate_speed(get_rules_matched(Id), RuleSpeed) end, RuleSpeeds0), - OverallRuleSpeed = calculate_speed(get_overall('rules.matched'), OverallRuleSpeed0), - async_refresh_resource_status(), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - {noreply, State#state{rule_speeds = RuleSpeeds, - overall_rule_speed = OverallRuleSpeed}}; + {noreply, State#state{rule_speeds = RuleSpeeds}}; handle_info(_Info, State) -> {noreply, State}. @@ -307,7 +199,7 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, #state{metric_ids = MIDs}) -> [delete_counters(Id) || Id <- sets:to_list(MIDs)], - persistent_term:erase(?MODULE). + persistent_term:erase(?CntrRef). stop() -> gen_server:stop(?MODULE). @@ -316,26 +208,22 @@ stop() -> %% Internal Functions %%------------------------------------------------------------------------------ -async_refresh_resource_status() -> - spawn(emqx_rule_engine, refresh_resource_status, []). - create_counters(Id) -> - case couters_ref(Id) of + case get_couters_ref(Id) of not_found -> - ok = persistent_term:put(?CRefID(Id), - counters:new(max_counters_size(), [write_concurrency])); + CntrRef = counters:new(max_counters_size(), [write_concurrency]), + persistent_term:put(?CntrRef, #{Id => CntrRef}); _Ref -> ok end. delete_counters(Id) -> - persistent_term:erase(?CRefID(Id)), - ok. + persistent_term:put(?CntrRef, maps:remove(Id, get_all_counters())). -couters_ref(Id) -> - try persistent_term:get(?CRefID(Id)) - catch - error:badarg -> not_found - end. +get_couters_ref(Id) -> + maps:get(Id, get_all_counters(), not_found). + +get_all_counters() -> + persistent_term:get(?CntrRef, #{}). calculate_speed(_CurrVal, undefined) -> undefined; @@ -379,21 +267,7 @@ precision(Float, N) -> %% Metrics Definitions %%------------------------------------------------------------------------------ -max_counters_size() -> 7. - +max_counters_size() -> 2. metrics_idx('rules.matched') -> 1; -metrics_idx('actions.success') -> 2; -metrics_idx('actions.error') -> 3; -metrics_idx('actions.taken') -> 4; -metrics_idx('actions.exception') -> 5; -metrics_idx('actions.retry') -> 6; -metrics_idx(_) -> 7. +metrics_idx(_) -> 2. -overall_metrics() -> - [ 'rules.matched' - , 'actions.success' - , 'actions.error' - , 'actions.taken' - , 'actions.exception' - , 'actions.retry' - ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl b/apps/emqx_rule_engine/src/emqx_rule_monitor.erl deleted file mode 100644 index dd2f6237c..000000000 --- a/apps/emqx_rule_engine/src/emqx_rule_monitor.erl +++ /dev/null @@ -1,126 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_monitor). - --behavior(gen_server). - --include("rule_engine.hrl"). --include_lib("emqx/include/logger.hrl"). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --export([ start_link/0 - , stop/0 - , ensure_resource_retrier/2 - , retry_loop/3 - ]). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -stop() -> - gen_server:stop(?MODULE). - -init([]) -> - _ = erlang:process_flag(trap_exit, true), - {ok, #{retryers => #{}}}. - -ensure_resource_retrier(ResId, Interval) -> - gen_server:cast(?MODULE, {create_restart_handler, resource, ResId, Interval}). - -handle_call(_Msg, _From, State) -> - {reply, ok, State}. - -handle_cast({create_restart_handler, Tag, Obj, Interval}, State) -> - Objects = maps:get(Tag, State, #{}), - NewState = case maps:find(Obj, Objects) of - error -> - update_object(Tag, Obj, - create_restart_handler(Tag, Obj, Interval), State); - {ok, _Pid} -> - State - end, - {noreply, NewState}; - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'EXIT', Pid, Reason}, State = #{retryers := Retryers}) -> - case maps:take(Pid, Retryers) of - {{Tag, Obj}, Retryers2} -> - Objects = maps:get(Tag, State, #{}), - {noreply, State#{Tag => maps:remove(Obj, Objects), - retryers => Retryers2}}; - error -> - ?LOG(error, "got unexpected proc down: ~p ~p", [Pid, Reason]), - {noreply, State} - end; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -update_object(Tag, Obj, Retryer, State) -> - Objects = maps:get(Tag, State, #{}), - Retryers = maps:get(retryers, State, #{}), - State#{ - Tag => Objects#{Obj => Retryer}, - retryers => Retryers#{Retryer => {Tag, Obj}} - }. - -create_restart_handler(Tag, Obj, Interval) -> - ?LOG(info, "keep restarting ~p ~p, interval: ~p", [Tag, Obj, Interval]), - %% spawn a dedicated process to handle the restarting asynchronously - spawn_link(?MODULE, retry_loop, [Tag, Obj, Interval]). - -retry_loop(resource, ResId, Interval) -> - case emqx_rule_registry:find_resource(ResId) of - {ok, #resource{type = Type, config = Config}} -> - try - {ok, #resource_type{on_create = {M, F}}} = - emqx_rule_registry:find_resource_type(Type), - ok = emqx_rule_engine:init_resource(M, F, ResId, Config), - refresh_and_enable_rules_of_resource(ResId) - catch - Err:Reason:ST -> - ?LOG(warning, "init_resource failed: ~p, ~0p", - [{Err, Reason}, ST]), - timer:sleep(Interval), - ?MODULE:retry_loop(resource, ResId, Interval) - end; - not_found -> - ok - end. - -refresh_and_enable_rules_of_resource(ResId) -> - lists:foreach( - fun (#rule{id = Id, enabled = false, state = refresh_failed_at_bootup} = Rule) -> - emqx_rule_engine:refresh_rule(Rule), - emqx_rule_registry:add_rule(Rule#rule{enabled = true, state = normal}), - ?LOG(info, "rule ~s is refreshed and re-enabled", [Id]); - (_) -> ok - end, emqx_rule_registry:find_rules_depends_on_resource(ResId)). diff --git a/apps/emqx_rule_engine/src/emqx_rule_locker.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl similarity index 65% rename from apps/emqx_rule_engine/src/emqx_rule_locker.erl rename to apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 9e45b8c09..6f8e3908e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_locker.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -14,21 +14,19 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_rule_locker). +%% Define the default actions. +-module(emqx_rule_outputs). +-include_lib("emqx/include/logger.hrl"). --export([start_link/0]). - --export([ lock/1 - , unlock/1 +-export([ console/2 + , get_selected_data/2 ]). -start_link() -> - ekka_locker:start_link(?MODULE). +-spec console(map(), map()) -> any(). +console(Selected, #{metadata := #{rule_id := RuleId}} = Envs) -> + ?ULOG("[rule output] ~s~n" + "\tOutput Data: ~p~n" + "\tEnvs: ~p~n", [RuleId, Selected, Envs]). --spec(lock(binary()) -> ekka_locker:lock_result()). -lock(Id) -> - ekka_locker:acquire(?MODULE, Id, local). - --spec(unlock(binary()) -> {boolean(), [node()]}). -unlock(Id) -> - ekka_locker:release(?MODULE, Id, local). +get_selected_data(Selected, _Envs) -> + Selected. diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index a0b8b48d5..8261149a7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -27,7 +27,7 @@ %% Rule Management -export([ get_rules/0 - , get_rules_for/1 + , get_rules_for_topic/1 , get_rules_with_same_event/1 , get_rules_ordered_by_ts/0 , get_rule/1 @@ -37,39 +37,6 @@ , remove_rules/1 ]). -%% Action Management --export([ add_action/1 - , add_actions/1 - , get_actions/0 - , find_action/1 - , remove_action/1 - , remove_actions/1 - , remove_actions_of/1 - , add_action_instance_params/1 - , get_action_instance_params/1 - , remove_action_instance_params/1 - ]). - -%% Resource Management --export([ get_resources/0 - , add_resource/1 - , add_resource_params/1 - , find_resource/1 - , find_resource_params/1 - , get_resources_by_type/1 - , remove_resource/1 - , remove_resource_params/1 - ]). - -%% Resource Types --export([ get_resource_types/0 - , find_resource_type/1 - , find_rules_depends_on_resource/1 - , find_enabled_rules_depends_on_resource/1 - , register_resource_types/1 - , unregister_resource_types_of/1 - ]). - -export([ load_hooks_for_rule/1 , unload_hooks_for_rule/1 ]). @@ -110,53 +77,15 @@ mnesia(boot) -> {rlog_shard, ?RULE_ENGINE_SHARD}, {disc_copies, [node()]}, {record_name, rule}, - {index, [#rule.for]}, {attributes, record_info(fields, rule)}, - {storage_properties, StoreProps}]), - %% Rule action table - ok = ekka_mnesia:create_table(?ACTION_TAB, [ - {rlog_shard, ?RULE_ENGINE_SHARD}, - {ram_copies, [node()]}, - {record_name, action}, - {index, [#action.for, #action.app]}, - {attributes, record_info(fields, action)}, - {storage_properties, StoreProps}]), - %% Resource table - ok = ekka_mnesia:create_table(?RES_TAB, [ - {rlog_shard, ?RULE_ENGINE_SHARD}, - {disc_copies, [node()]}, - {record_name, resource}, - {index, [#resource.type]}, - {attributes, record_info(fields, resource)}, - {storage_properties, StoreProps}]), - %% Resource type table - ok = ekka_mnesia:create_table(?RES_TYPE_TAB, [ - {rlog_shard, ?RULE_ENGINE_SHARD}, - {ram_copies, [node()]}, - {record_name, resource_type}, - {index, [#resource_type.provider]}, - {attributes, record_info(fields, resource_type)}, {storage_properties, StoreProps}]); mnesia(copy) -> %% Copy rule table - ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies), - %% Copy rule action table - ok = ekka_mnesia:copy_table(?ACTION_TAB, ram_copies), - %% Copy resource table - ok = ekka_mnesia:copy_table(?RES_TAB, disc_copies), - %% Copy resource type table - ok = ekka_mnesia:copy_table(?RES_TYPE_TAB, ram_copies). + ok = ekka_mnesia:copy_table(?RULE_TAB, disc_copies). dump() -> - ?ULOG("Rules: ~p~n" - "ActionInstParams: ~p~n" - "Resources: ~p~n" - "ResourceParams: ~p~n", - [ets:tab2list(?RULE_TAB), - ets:tab2list(?ACTION_INST_PARAMS_TAB), - ets:tab2list(?RES_TAB), - ets:tab2list(?RES_PARAMS_TAB)]). + ?ULOG("Rules: ~p~n", [ets:tab2list(?RULE_TAB)]). %%------------------------------------------------------------------------------ %% Start the registry @@ -182,16 +111,16 @@ get_rules_ordered_by_ts() -> {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F), List. --spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). -get_rules_for(Topic) -> - [Rule || Rule = #rule{for = For} <- get_rules(), - emqx_plugin_libs_rule:can_topic_match_oneof(Topic, For)]. +-spec(get_rules_for_topic(Topic :: binary()) -> list(emqx_rule_engine:rule())). +get_rules_for_topic(Topic) -> + [Rule || Rule = #rule{info = #{from := From}} <- get_rules(), + emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)]. -spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_with_same_event(Topic) -> EventName = emqx_rule_events:event_name(Topic), - [Rule || Rule = #rule{for = For} <- get_rules(), - lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)]. + [Rule || Rule = #rule{info = #{from := From}} <- get_rules(), + lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)]. is_of_event_name(EventName, Topic) -> EventName =:= emqx_rule_events:event_name(Topic). @@ -223,7 +152,7 @@ remove_rules(Rules) -> insert_rules([]) -> ok; insert_rules(Rules) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]), + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, load_hooks_for_rule, [Rules]), [mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules]. %% @private @@ -235,7 +164,7 @@ delete_rules(Rules = [R|_]) when is_binary(R) -> {ok, Rule} -> [Rule|Acc]; not_found -> Acc end - end, [], Rules), + end, [], Rules), delete_rules_unload_hooks(RuleRecs); delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) -> delete_rules_unload_hooks(Rules). @@ -245,209 +174,20 @@ delete_rules_unload_hooks(Rules) -> [mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules]. load_hooks_for_rule(Rules) -> - lists:foreach(fun(#rule{for = Topics}) -> - lists:foreach(fun emqx_rule_events:load/1, Topics) - end, Rules). + lists:foreach(fun(#rule{info = #{from := Topics}}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics) + end, Rules). unload_hooks_for_rule(Rules) -> - lists:foreach(fun(#rule{id = Id, for = Topics}) -> + lists:foreach(fun(#rule{id = Id, info = #{from := Topics}}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of - [#rule{id = Id}] -> %% we are now deleting the last rule + [#rule{id = Id0}] when Id0 == Id -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end - end, Topics) - end, Rules). - -%%------------------------------------------------------------------------------ -%% Action Management -%%------------------------------------------------------------------------------ - -%% @doc Get all actions. --spec(get_actions() -> list(emqx_rule_engine:action())). -get_actions() -> - get_all_records(?ACTION_TAB). - -%% @doc Find an action by name. --spec(find_action(Name :: action_name()) -> {ok, emqx_rule_engine:action()} | not_found). -find_action(Name) -> - case mnesia:dirty_read(?ACTION_TAB, Name) of - [Action] -> {ok, Action}; - [] -> not_found - end. - -%% @doc Add an action. --spec(add_action(emqx_rule_engine:action()) -> ok). -add_action(Action) when is_record(Action, action) -> - trans(fun insert_action/1, [Action]). - -%% @doc Add actions. --spec(add_actions(list(emqx_rule_engine:action())) -> ok). -add_actions(Actions) when is_list(Actions) -> - trans(fun lists:foreach/2, [fun insert_action/1, Actions]). - -%% @doc Remove an action. --spec(remove_action(emqx_rule_engine:action() | atom()) -> ok). -remove_action(Action) when is_record(Action, action) -> - trans(fun delete_action/1, [Action]); - -remove_action(Name) -> - trans(fun mnesia:delete/1, [{?ACTION_TAB, Name}]). - -%% @doc Remove actions. --spec(remove_actions(list(emqx_rule_engine:action())) -> ok). -remove_actions(Actions) -> - trans(fun lists:foreach/2, [fun delete_action/1, Actions]). - -%% @doc Remove actions of the App. --spec(remove_actions_of(App :: atom()) -> ok). -remove_actions_of(App) -> - trans(fun() -> - lists:foreach(fun delete_action/1, mnesia:index_read(?ACTION_TAB, App, #action.app)) - end). - -%% @private -insert_action(Action) -> - mnesia:write(?ACTION_TAB, Action, write). - -%% @private -delete_action(Action) when is_record(Action, action) -> - mnesia:delete_object(?ACTION_TAB, Action, write); -delete_action(Name) when is_atom(Name) -> - mnesia:delete(?ACTION_TAB, Name, write). - -%% @doc Add an action instance params. --spec(add_action_instance_params(emqx_rule_engine:action_instance_params()) -> ok). -add_action_instance_params(ActionInstParams) when is_record(ActionInstParams, action_instance_params) -> - ets:insert(?ACTION_INST_PARAMS_TAB, ActionInstParams), - ok. - --spec(get_action_instance_params(action_instance_id()) -> {ok, emqx_rule_engine:action_instance_params()} | not_found). -get_action_instance_params(ActionInstId) -> - case ets:lookup(?ACTION_INST_PARAMS_TAB, ActionInstId) of - [ActionInstParams] -> {ok, ActionInstParams}; - [] -> not_found - end. - -%% @doc Delete an action instance params. --spec(remove_action_instance_params(action_instance_id()) -> ok). -remove_action_instance_params(ActionInstId) -> - ets:delete(?ACTION_INST_PARAMS_TAB, ActionInstId), - ok. - -%%------------------------------------------------------------------------------ -%% Resource Management -%%------------------------------------------------------------------------------ - --spec(get_resources() -> list(emqx_rule_engine:resource())). -get_resources() -> - get_all_records(?RES_TAB). - --spec(add_resource(emqx_rule_engine:resource()) -> ok). -add_resource(Resource) when is_record(Resource, resource) -> - trans(fun insert_resource/1, [Resource]). - --spec(add_resource_params(emqx_rule_engine:resource_params()) -> ok). -add_resource_params(ResParams) when is_record(ResParams, resource_params) -> - ets:insert(?RES_PARAMS_TAB, ResParams), - ok. - --spec(find_resource(Id :: resource_id()) -> {ok, emqx_rule_engine:resource()} | not_found). -find_resource(Id) -> - case mnesia:dirty_read(?RES_TAB, Id) of - [Res] -> {ok, Res}; - [] -> not_found - end. - --spec(find_resource_params(Id :: resource_id()) - -> {ok, emqx_rule_engine:resource_params()} | not_found). -find_resource_params(Id) -> - case ets:lookup(?RES_PARAMS_TAB, Id) of - [ResParams] -> {ok, ResParams}; - [] -> not_found - end. - --spec(remove_resource(emqx_rule_engine:resource() | emqx_rule_engine:resource_id()) -> ok | {error, term()}). -remove_resource(Resource) when is_record(Resource, resource) -> - trans(fun delete_resource/1, [Resource#resource.id]); - -remove_resource(ResId) when is_binary(ResId) -> - trans(fun delete_resource/1, [ResId]). - --spec(remove_resource_params(emqx_rule_engine:resource_id()) -> ok). -remove_resource_params(ResId) -> - ets:delete(?RES_PARAMS_TAB, ResId), - ok. - -%% @private -delete_resource(ResId) -> - case find_enabled_rules_depends_on_resource(ResId) of - [] -> mnesia:delete(?RES_TAB, ResId, write); - Rules -> - {error, {dependent_rules_exists, [Id || #rule{id = Id} <- Rules]}} - end. - -%% @private -insert_resource(Resource) -> - mnesia:write(?RES_TAB, Resource, write). - -find_enabled_rules_depends_on_resource(ResId) -> - [R || #rule{enabled = true} = R <- find_rules_depends_on_resource(ResId)]. - -find_rules_depends_on_resource(ResId) -> - lists:foldl(fun(#rule{actions = Actions} = R, Rules) -> - case search_action_despends_on_resource(ResId, Actions) of - false -> Rules; - {value, _} -> [R | Rules] - end - end, [], get_rules()). - -search_action_despends_on_resource(ResId, Actions) -> - lists:search(fun - (#action_instance{args = #{<<"$resource">> := ResId0}}) -> - ResId0 =:= ResId; - (_) -> - false - end, Actions). - -%%------------------------------------------------------------------------------ -%% Resource Type Management -%%------------------------------------------------------------------------------ - --spec(get_resource_types() -> list(emqx_rule_engine:resource_type())). -get_resource_types() -> - get_all_records(?RES_TYPE_TAB). - --spec(find_resource_type(Name :: resource_type_name()) -> {ok, emqx_rule_engine:resource_type()} | not_found). -find_resource_type(Name) -> - case mnesia:dirty_read(?RES_TYPE_TAB, Name) of - [ResType] -> {ok, ResType}; - [] -> not_found - end. - --spec(get_resources_by_type(Type :: resource_type_name()) -> list(emqx_rule_engine:resource())). -get_resources_by_type(Type) -> - mnesia:dirty_index_read(?RES_TAB, Type, #resource.type). - --spec(register_resource_types(list(emqx_rule_engine:resource_type())) -> ok). -register_resource_types(Types) -> - trans(fun lists:foreach/2, [fun insert_resource_type/1, Types]). - -%% @doc Unregister resource types of the App. --spec(unregister_resource_types_of(App :: atom()) -> ok). -unregister_resource_types_of(App) -> - trans(fun() -> - lists:foreach(fun delete_resource_type/1, mnesia:index_read(?RES_TYPE_TAB, App, #resource_type.provider)) - end). - -%% @private -insert_resource_type(Type) -> - mnesia:write(?RES_TYPE_TAB, Type, write). - -%% @private -delete_resource_type(Type) -> - mnesia:delete_object(?RES_TYPE_TAB, Type, write). + end, Topics) + end, Rules). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -500,7 +240,6 @@ get_all_records(Tab) -> end), Ret. -trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of {atomic, Result} -> Result; diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f9e210ab3..5a3dd2ed4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -48,7 +48,7 @@ -spec(apply_rules(list(emqx_rule_engine:rule()), input()) -> ok). apply_rules([], _Input) -> ok; -apply_rules([#rule{enabled = false}|More], Input) -> +apply_rules([#rule{info = #{enabled := false}}|More], Input) -> apply_rules(More, Input); apply_rules([Rule = #rule{id = RuleID}|More], Input) -> try apply_rule_discard_result(Rule, Input) @@ -80,14 +80,14 @@ apply_rule(Rule = #rule{id = RuleID}, Input) -> clear_rule_payload(), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). -do_apply_rule(#rule{id = RuleId, - is_foreach = true, - fields = Fields, - doeach = DoEach, - incase = InCase, - conditions = Conditions, - on_action_failed = OnFailed, - actions = Actions}, Input) -> +do_apply_rule(#rule{id = RuleId, info = #{ + is_foreach := true, + fields := Fields, + doeach := DoEach, + incase := InCase, + conditions := Conditions, + outputs := Outputs + }}, Input) -> {Selected, Collection} = ?RAISE(select_and_collect(Fields, Input), {select_and_collect_error, {_EXCLASS_,_EXCPTION_,_ST_}}), ColumnsAndSelected = maps:merge(Input, Selected), @@ -96,24 +96,24 @@ do_apply_rule(#rule{id = RuleId, true -> ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'), Collection2 = filter_collection(Input, InCase, DoEach, Collection), - {ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]}; + {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; false -> {error, nomatch} end; -do_apply_rule(#rule{id = RuleId, - is_foreach = false, - fields = Fields, - conditions = Conditions, - on_action_failed = OnFailed, - actions = Actions}, Input) -> +do_apply_rule(#rule{id = RuleId, info = #{ + is_foreach := false, + fields := Fields, + conditions := Conditions, + outputs := Outputs + }}, Input) -> Selected = ?RAISE(select_and_transform(Fields, Input), {select_and_transform_error, {_EXCLASS_,_EXCPTION_,_ST_}}), case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'), - {ok, take_actions(Actions, Selected, Input, OnFailed)}; + {ok, handle_output_list(Outputs, Selected, Input)}; false -> {error, nomatch} end. @@ -198,8 +198,6 @@ match_conditions({'fun', {_, Name}, Args}, Data) -> apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data); match_conditions({Op, L, R}, Data) when ?is_comp(Op) -> compare(Op, eval(L, Data), eval(R, Data)); -%%match_conditions({'like', Var, Pattern}, Data) -> -%% match_like(eval(Var, Data), Pattern); match_conditions({}, _Data) -> true. @@ -229,81 +227,27 @@ number(Bin) -> catch error:badarg -> binary_to_float(Bin) end. -%% Step3 -> Take actions -take_actions(Actions, Selected, Envs, OnFailed) -> - [take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry) - || ActInst <- Actions]. +handle_output_list(Outputs, Selected, Envs) -> + [handle_output(Out, Selected, Envs) || Out <- Outputs]. -take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst, - Selected, Envs, OnFailed, RetryN) when RetryN >= 0 -> +handle_output(OutId, Selected, Envs) -> try - {ok, #action_instance_params{apply = Apply}} - = emqx_rule_registry:get_action_instance_params(Id), - emqx_rule_metrics:inc_actions_taken(Id), - apply_action_func(Selected, Envs, Apply, ActName) - of - {badact, Reason} -> - handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, Reason); - Result -> Result + do_handle_output(OutId, Selected, Envs) catch - error:{badfun, _Func}:_ST -> - %?LOG(warning, "Action ~p maybe outdated, refresh it and try again." - % "Func: ~p~nST:~0p", [Id, Func, ST]), - _ = trans_action_on(Id, fun() -> - emqx_rule_engine:refresh_actions([ActInst]) - end, 5000), - emqx_rule_metrics:inc_actions_retry(Id), - take_action(ActInst, Selected, Envs, OnFailed, RetryN-1); - Error:Reason:Stack -> - emqx_rule_metrics:inc_actions_exception(Id), - handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {Error, Reason, Stack}) - end; - -take_action(#action_instance{id = Id, fallbacks = Fallbacks}, Selected, Envs, OnFailed, _RetryN) -> - emqx_rule_metrics:inc_actions_error(Id), - handle_action_failure(OnFailed, Id, Fallbacks, Selected, Envs, {max_try_reached, ?ActionMaxRetry}). - -apply_action_func(Data, Envs, #{mod := Mod, bindings := Bindings}, Name) -> - %% TODO: Build the Func Name when creating the action - Func = cbk_on_action_triggered(Name), - Mod:Func(Data, Envs#{'__bindings__' => Bindings}); -apply_action_func(Data, Envs, Func, _Name) when is_function(Func) -> - erlang:apply(Func, [Data, Envs]). - -cbk_on_action_triggered(Name) -> - list_to_atom("on_action_" ++ atom_to_list(Name)). - -trans_action_on(Id, Callback, Timeout) -> - case emqx_rule_locker:lock(Id) of - true -> try Callback() after emqx_rule_locker:unlock(Id) end; - _ -> - wait_action_on(Id, Timeout div 10) + Err:Reason:ST -> + ?LOG(warning, "Output to ~p failed, ~p", [OutId, {Err, Reason, ST}]) end. -wait_action_on(_, 0) -> - {error, timeout}; -wait_action_on(Id, RetryN) -> - timer:sleep(10), - case emqx_rule_registry:get_action_instance_params(Id) of - not_found -> - {error, not_found}; - {ok, #action_instance_params{apply = Apply}} -> - case catch apply_action_func(baddata, #{}, Apply, tryit) of - {'EXIT', {{badfun, _}, _}} -> - wait_action_on(Id, RetryN-1); - _ -> - ok - end - end. +do_handle_output(<<"bridge:", _/binary>> = _ChannelId, _Selected, _Envs) -> + ?LOG(warning, "calling bridge from rules has not been implemented yet!"); -handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) -> - ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]), - _ = take_actions(Fallbacks, Selected, Envs, continue), - failed; -handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) -> - ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]), - _ = take_actions(Fallbacks, Selected, Envs, continue), - error({take_action_failed, {Id, Reason}}). +do_handle_output(BuiltInOutput, Selected, Envs) -> + try binary_to_existing_atom(BuiltInOutput) of Func -> + erlang:apply(emqx_rule_outputs, Func, [Selected, Envs]) + catch + error:badarg -> error(not_found); + error:undef -> error(not_found) + end. eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl index 9a8ce55ea..835271141 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl @@ -18,7 +18,7 @@ -include("rule_engine.hrl"). --export([parse_select/1]). +-export([parse/1]). -export([ select_fields/1 , select_is_foreach/1 @@ -50,12 +50,12 @@ %% Dialyzer gives up on the generated code. %% probably due to stack depth, or inlines. --dialyzer({nowarn_function, [parse_select/1]}). +-dialyzer({nowarn_function, [parse/1]}). %% Parse one select statement. --spec(parse_select(string() | binary()) +-spec(parse(string() | binary()) -> {ok, select()} | {parse_error, term()} | {lex_error, term()}). -parse_select(Sql) -> +parse(Sql) -> try case rulesql:parsetree(Sql) of {ok, {select, Clauses}} -> {ok, #select{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 2f1edbeb2..843b6f83e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -18,6 +18,7 @@ -include_lib("emqx/include/logger.hrl"). -export([ test/1 + , echo_action/2 ]). %% Dialyzer gives up on the generated code. @@ -25,15 +26,14 @@ -dialyzer({nowarn_function, [test/1, test_rule/4, flatten/1, - sql_test_action/0, fill_default_values/2, envs_examp/1 ]}). -spec(test(#{}) -> {ok, map() | list()} | {error, term()}). -test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> - {ok, Select} = emqx_rule_sqlparser:parse_select(Sql), - InTopic = maps:get(<<"topic">>, Context, <<>>), +test(#{sql := Sql, context := Context}) -> + {ok, Select} = emqx_rule_sqlparser:parse(Sql), + InTopic = maps:get(topic, Context, <<>>), EventTopics = emqx_rule_sqlparser:select_from(Select), case lists:all(fun is_publish_topic/1, EventTopics) of true -> @@ -48,38 +48,30 @@ test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> end. test_rule(Sql, Select, Context, EventTopics) -> - RuleId = iolist_to_binary(["test_rule", emqx_misc:gen_id()]), - ActInstId = iolist_to_binary(["test_action", emqx_misc:gen_id()]), + RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]), ok = emqx_rule_metrics:create_rule_metrics(RuleId), - ok = emqx_rule_metrics:create_metrics(ActInstId), Rule = #rule{ id = RuleId, - rawsql = Sql, - for = EventTopics, - is_foreach = emqx_rule_sqlparser:select_is_foreach(Select), - fields = emqx_rule_sqlparser:select_fields(Select), - doeach = emqx_rule_sqlparser:select_doeach(Select), - incase = emqx_rule_sqlparser:select_incase(Select), - conditions = emqx_rule_sqlparser:select_where(Select), - actions = [#action_instance{ - id = ActInstId, - name = test_rule_sql}] + info = #{ + sql => Sql, + from => EventTopics, + outputs => [<<"get_selected_data">>], + enabled => true, + is_foreach => emqx_rule_sqlparser:select_is_foreach(Select), + fields => emqx_rule_sqlparser:select_fields(Select), + doeach => emqx_rule_sqlparser:select_doeach(Select), + incase => emqx_rule_sqlparser:select_incase(Select), + conditions => emqx_rule_sqlparser:select_where(Select) + } }, FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), try - ok = emqx_rule_registry:add_action_instance_params( - #action_instance_params{id = ActInstId, - params = #{}, - apply = sql_test_action()}), - R = emqx_rule_runtime:apply_rule(Rule, FullContext), - emqx_rule_metrics:clear_rule_metrics(RuleId), - emqx_rule_metrics:clear_metrics(ActInstId), - R + emqx_rule_runtime:apply_rule(Rule, FullContext) of {ok, Data} -> {ok, flatten(Data)}; {error, nomatch} -> {error, nomatch} after - ok = emqx_rule_registry:remove_action_instance_params(ActInstId) + emqx_rule_metrics:clear_rule_metrics(RuleId) end. is_publish_topic(<<"$events/", _/binary>>) -> false; @@ -90,10 +82,8 @@ flatten([D1]) -> D1; flatten([D1 | L]) when is_list(D1) -> D1 ++ flatten(L). -sql_test_action() -> - fun(Data, _Envs) -> - ?LOG(info, "Testing Rule SQL OK"), Data - end. +echo_action(Data, _Envs) -> + ?LOG(info, "Testing Rule SQL OK"), Data. fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context). diff --git a/apps/emqx_rule_engine/src/emqx_rule_validator.erl b/apps/emqx_rule_engine/src/emqx_rule_validator.erl deleted file mode 100644 index 8f39d2d1c..000000000 --- a/apps/emqx_rule_engine/src/emqx_rule_validator.erl +++ /dev/null @@ -1,195 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_validator). - --include("rule_engine.hrl"). - --export([ validate_params/2 - , validate_spec/1 - ]). - --type name() :: atom(). - --type spec() :: #{ - type := data_type(), - required => boolean(), - default => term(), - enum => list(term()), - schema => spec() -}. - --type data_type() :: string | password | number | boolean - | object | array | file | cfgselect. - --type params_spec() :: #{name() => spec()} | any. --type params() :: #{binary() => term()}. - --define(DATA_TYPES, - [ string - , password %% TODO: [5.0] remove this, use string instead - , number - , boolean - , object - , array - , file - , cfgselect %% TODO: [5.0] refactor this - ]). - -%%------------------------------------------------------------------------------ -%% APIs -%%------------------------------------------------------------------------------ - -%% Validate the params according to the spec. -%% Some keys will be added into the result if they have default values in spec. -%% Note that this function throws exception in case of validation failure. --spec(validate_params(params(), params_spec()) -> params()). -validate_params(Params, any) -> Params; -validate_params(Params, ParamsSepc) -> - maps:fold(fun(Name, Spec, Params0) -> - IsRequired = maps:get(required, Spec, false), - BinName = bin(Name), - find_field(Name, Params, - fun (not_found) when IsRequired =:= true -> - throw({required_field_missing, BinName}); - (not_found) when IsRequired =:= false -> - case maps:find(default, Spec) of - {ok, Default} -> Params0#{BinName => Default}; - error -> Params0 - end; - (Val) -> - Params0#{BinName => validate_value(Val, Spec)} - end) - end, Params, ParamsSepc). - --spec(validate_spec(params_spec()) -> ok). -validate_spec(any) -> ok; -validate_spec(ParamsSepc) -> - map_foreach(fun do_validate_spec/2, ParamsSepc). - -%%------------------------------------------------------------------------------ -%% Internal Functions -%%------------------------------------------------------------------------------ - -validate_value(Val, #{enum := Enum}) -> - validate_enum(Val, Enum); -validate_value(Val, #{type := object} = Spec) -> - validate_params(Val, maps:get(schema, Spec, any)); -validate_value(Val, #{type := Type} = Spec) -> - validate_type(Val, Type, Spec). - -validate_type(Val, file, _Spec) -> - validate_file(Val); -validate_type(Val, String, Spec) when String =:= string; - String =:= password -> - validate_string(Val, reg_exp(maps:get(format, Spec, any))); -validate_type(Val, number, Spec) -> - validate_number(Val, maps:get(range, Spec, any)); -validate_type(Val, boolean, _Spec) -> - validate_boolean(Val); -validate_type(Val, array, Spec) -> - ItemsSpec = maps:get(items, Spec), - [validate_value(V, ItemsSpec) || V <- Val]; -validate_type(Val, cfgselect, _Spec) -> - %% TODO: [5.0] refactor this. - Val. - -validate_enum(Val, Enum) -> - case lists:member(Val, Enum) of - true -> Val; - false -> throw({invalid_data_type, {enum, {Val, Enum}}}) - end. - -validate_string(Val, RegExp) -> - try re:run(Val, RegExp) of - nomatch -> throw({invalid_data_type, {string, Val}}); - _Match -> Val - catch - _:_ -> throw({invalid_data_type, {string, Val}}) - end. - -validate_number(Val, any) when is_integer(Val); is_float(Val) -> - Val; -validate_number(Val, _Range = [Min, Max]) - when (is_integer(Val) orelse is_float(Val)), - (Val >= Min andalso Val =< Max) -> - Val; -validate_number(Val, Range) -> - throw({invalid_data_type, {number, {Val, Range}}}). - -validate_boolean(true) -> true; -validate_boolean(<<"true">>) -> true; -validate_boolean(false) -> false; -validate_boolean(<<"false">>) -> false; -validate_boolean(Val) -> throw({invalid_data_type, {boolean, Val}}). - -validate_file(Val) when is_map(Val) -> Val; -validate_file(Val) when is_list(Val) -> Val; -validate_file(Val) when is_binary(Val) -> Val; -validate_file(Val) -> throw({invalid_data_type, {file, Val}}). - -reg_exp(url) -> "^https?://\\w+(\.\\w+)*(:[0-9]+)?"; -reg_exp(topic) -> "^/?(\\w|\\#|\\+)+(/?(\\w|\\#|\\+))*/?$"; -reg_exp(resource_type) -> "[a-zA-Z0-9_:-]"; -reg_exp(any) -> ".*"; -reg_exp(RegExp) -> RegExp. - -do_validate_spec(Name, #{type := object} = Spec) -> - find_field(schema, Spec, - fun (not_found) -> throw({required_field_missing, {schema, {in, Name}}}); - (Schema) -> validate_spec(Schema) - end); -do_validate_spec(Name, #{type := array} = Spec) -> - find_field(items, Spec, - fun (not_found) -> throw({required_field_missing, {items, {in, Name}}}); - (Items) -> do_validate_spec(Name, Items) - end); -do_validate_spec(_Name, #{type := Type}) -> - _ = supported_data_type(Type, ?DATA_TYPES); - -do_validate_spec(Name, _Spec) -> - throw({required_field_missing, {type, {in, Name}}}). - -supported_data_type(Type, Supported) -> - case lists:member(Type, Supported) of - false -> throw({unsupported_data_types, Type}); - true -> ok - end. - -map_foreach(Fun, Map) -> - Iterator = maps:iterator(Map), - map_foreach_loop(Fun, maps:next(Iterator)). - -map_foreach_loop(_Fun, none) -> ok; -map_foreach_loop(Fun, {Key, Value, Iterator}) -> - _ = Fun(Key, Value), - map_foreach_loop(Fun, maps:next(Iterator)). - -find_field(Field, Spec, Func) -> - do_find_field([bin(Field), Field], Spec, Func). - -do_find_field([], _Spec, Func) -> - Func(not_found); -do_find_field([F | Fields], Spec, Func) -> - case maps:find(F, Spec) of - {ok, Value} -> Func(Value); - error -> - do_find_field(Fields, Spec, Func) - end. - -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); -bin(Str) when is_list(Str) -> iolist_to_binary(Str); -bin(Bin) when is_binary(Bin) -> Bin. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index e172cbb84..0b46d07c4 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -79,8 +79,8 @@ groups() -> t_create_existing_rule, t_update_rule, t_disable_rule, - t_get_rules_for, - t_get_rules_for_2, + t_get_rules_for_topic, + t_get_rules_for_topic_2, t_get_rules_with_same_event, t_add_get_remove_action, t_add_get_remove_actions, @@ -650,12 +650,12 @@ t_disable_rule(_Config) -> ?assert(DAt3 < Now3), ok = emqx_rule_engine:delete_rule(<<"simple_rule_2">>). -t_get_rules_for(_Config) -> - Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/topic">>)), +t_get_rules_for_topic(_Config) -> + Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>)), ok = emqx_rule_registry:add_rules( [make_simple_rule(<<"rule-debug-1">>), make_simple_rule(<<"rule-debug-2">>)]), - ?assertEqual(Len0+2, length(emqx_rule_registry:get_rules_for(<<"simple/topic">>))), + ?assertEqual(Len0+2, length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>))), ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]), ok. @@ -672,8 +672,8 @@ t_get_rules_ordered_by_ts(_Config) -> #rule{id = <<"rule-debug-2">>} ], emqx_rule_registry:get_rules_ordered_by_ts()). -t_get_rules_for_2(_Config) -> - Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/1">>)), +t_get_rules_for_topic_2(_Config) -> + Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>)), ok = emqx_rule_registry:add_rules( [make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]), @@ -682,7 +682,7 @@ t_get_rules_for_2(_Config) -> make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]), make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>]) ]), - ?assertEqual(Len0+4, length(emqx_rule_registry:get_rules_for(<<"simple/1">>))), + ?assertEqual(Len0+4, length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>))), ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), ok. diff --git a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl deleted file mode 100644 index 62f538f43..000000000 --- a/apps/emqx_rule_engine/test/emqx_rule_monitor_SUITE.erl +++ /dev/null @@ -1,109 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_monitor_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("emqx_rule_engine/include/rule_engine.hrl"). --include_lib("emqx/include/emqx.hrl"). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - -all() -> - [ {group, resource} - ]. - -suite() -> - [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. - -groups() -> - [{resource, [sequence], - [ t_restart_resource - ]} - ]. - -init_per_suite(Config) -> - application:load(emqx_machine), - ok = ekka_mnesia:start(), - ok = emqx_rule_registry:mnesia(boot), - Config. - -end_per_suite(_Config) -> - ok. - -init_per_testcase(t_restart_resource, Config) -> - Opts = [public, named_table, set, {read_concurrency, true}], - _ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]), - ets:new(t_restart_resource, [named_table, public]), - ets:insert(t_restart_resource, {failed_count, 0}), - ets:insert(t_restart_resource, {succ_count, 0}), - Config; - -init_per_testcase(_, Config) -> - Config. - -end_per_testcase(t_restart_resource, Config) -> - ets:delete(t_restart_resource), - Config; -end_per_testcase(_, Config) -> - Config. - -t_restart_resource(_) -> - {ok, _} = emqx_rule_monitor:start_link(), - emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc,1000), - ok = emqx_rule_registry:register_resource_types( - [#resource_type{ - name = test_res_1, - provider = ?APP, - params_spec = #{}, - on_create = {?MODULE, on_resource_create}, - on_destroy = {?MODULE, on_resource_destroy}, - on_status = {?MODULE, on_get_resource_status}, - title = #{en => <<"Test Resource">>}, - description = #{en => <<"Test Resource">>}}]), - ok = emqx_rule_engine:load_providers(), - {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( - #{type => test_res_1, - config => #{}, - description => <<"debug resource">>}), - [{_, 1}] = ets:lookup(t_restart_resource, failed_count), - [{_, 0}] = ets:lookup(t_restart_resource, succ_count), - ct:pal("monitor: ~p", [whereis(emqx_rule_monitor)]), - emqx_rule_monitor:ensure_resource_retrier(ResId, 100), - timer:sleep(1000), - [{_, 5}] = ets:lookup(t_restart_resource, failed_count), - [{_, 1}] = ets:lookup(t_restart_resource, succ_count), - #{retryers := Pids} = sys:get_state(whereis(emqx_rule_monitor)), - ?assertEqual(0, map_size(Pids)), - ok = emqx_rule_engine:unload_providers(), - emqx_rule_registry:remove_resource(ResId), - emqx_rule_monitor:stop(), - ok. - -on_resource_create(Id, _) -> - case ets:lookup(t_restart_resource, failed_count) of - [{_, 5}] -> - ets:insert(t_restart_resource, {succ_count, 1}), - #{}; - [{_, N}] -> - ets:insert(t_restart_resource, {failed_count, N+1}), - error({incorrect_params, Id}) - end. -on_resource_destroy(_Id, _) -> ok. -on_get_resource_status(_Id, _) -> #{}. diff --git a/apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl index cbd69c878..2273d886d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_registry_SUITE.erl @@ -38,7 +38,7 @@ end_per_testcase(_TestCase, Config) -> % t_start_link(_) -> % error('TODO'). -% t_get_rules_for(_) -> +% t_get_rules_for_topic(_) -> % error('TODO'). % t_add_rules(_) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl deleted file mode 100644 index fdd7857d4..000000000 --- a/apps/emqx_rule_engine/test/emqx_rule_validator_SUITE.erl +++ /dev/null @@ -1,191 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_validator_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include_lib("eunit/include/eunit.hrl"). - --define(VALID_SPEC, - #{ - string_required => #{ - type => string, - required => true - }, - string_optional_with_default => #{ - type => string, - required => false, - default => <<"a/b">> - }, - string_optional_without_default_0 => #{ - type => string, - required => false - }, - string_optional_without_default_1 => #{ - type => string - }, - type_number => #{ - type => number, - required => true - }, - type_boolean => #{ - type => boolean, - required => true - }, - type_enum_number => #{ - type => number, - enum => [-1, 0, 1, 2], - required => true - }, - type_file => #{ - type => file, - required => true - }, - type_object => #{ - type => object, - required => true, - schema => #{ - string_required => #{ - type => string, - required => true - }, - type_number => #{ - type => number, - required => true - } - } - }, - type_array => #{ - type => array, - required => true, - items => #{ - type => string, - required => true - } - } - }). - -all() -> emqx_ct:all(?MODULE). - -t_validate_spec_the_complex(_) -> - ok = emqx_rule_validator:validate_spec(?VALID_SPEC). - -t_validate_spec_invalid_1(_) -> - ?assertThrow({required_field_missing, {type, _}}, - emqx_rule_validator:validate_spec(#{ - type_enum_number => #{ - required => true - } - })). - -t_validate_spec_invalid_2(_) -> - ?assertThrow({required_field_missing, {schema, _}}, - emqx_rule_validator:validate_spec(#{ - type_enum_number => #{ - type => object - } - })). - -t_validate_spec_invalid_3(_) -> - ?assertThrow({required_field_missing, {items, _}}, - emqx_rule_validator:validate_spec(#{ - type_enum_number => #{ - type => array - } - })). - -t_validate_params_0(_) -> - Params = #{<<"eee">> => <<"eee">>}, - Specs = #{<<"eee">> => #{ - type => string, - required => true - }}, - ?assertEqual(Params, - emqx_rule_validator:validate_params(Params, Specs)). - -t_validate_params_1(_) -> - Params = #{<<"eee">> => 1}, - Specs = #{<<"eee">> => #{ - type => string, - required => true - }}, - ?assertThrow({invalid_data_type, {string, 1}}, - emqx_rule_validator:validate_params(Params, Specs)). - -t_validate_params_2(_) -> - ?assertThrow({required_field_missing, <<"eee">>}, - emqx_rule_validator:validate_params( - #{<<"abc">> => 1}, - #{<<"eee">> => #{ - type => string, - required => true - }})). - -t_validate_params_format(_) -> - Params = #{<<"eee">> => <<"abc">>}, - Params1 = #{<<"eee">> => <<"http://abc:8080">>}, - Params2 = #{<<"eee">> => <<"http://abc">>}, - Specs = #{<<"eee">> => #{ - type => string, - format => url, - required => true - }}, - ?assertThrow({invalid_data_type, {string, <<"abc">>}}, - emqx_rule_validator:validate_params(Params, Specs)), - ?assertEqual(Params1, - emqx_rule_validator:validate_params(Params1, Specs)), - ?assertEqual(Params2, - emqx_rule_validator:validate_params(Params2, Specs)). - -t_validate_params_fill_default(_) -> - Params = #{<<"abc">> => 1}, - Specs = #{<<"eee">> => #{ - type => string, - required => false, - default => <<"hello">> - }}, - ?assertMatch(#{<<"abc">> := 1, <<"eee">> := <<"hello">>}, - emqx_rule_validator:validate_params(Params, Specs)). - -t_validate_params_the_complex(_) -> - Params = #{ - <<"string_required">> => <<"hello">>, - <<"type_number">> => 1, - <<"type_boolean">> => true, - <<"type_enum_number">> => 2, - <<"type_file">> => <<"">>, - <<"type_object">> => #{ - <<"string_required">> => <<"hello2">>, - <<"type_number">> => 1.3 - }, - <<"type_array">> => [<<"ok">>, <<"no">>] - }, - ?assertMatch( - #{ <<"string_required">> := <<"hello">>, - <<"string_optional_with_default">> := <<"a/b">>, - <<"type_number">> := 1, - <<"type_boolean">> := true, - <<"type_enum_number">> := 2, - <<"type_file">> := <<"">>, - <<"type_object">> := #{ - <<"string_required">> := <<"hello2">>, - <<"type_number">> := 1.3 - }, - <<"type_array">> := [<<"ok">>, <<"no">>] - }, - emqx_rule_validator:validate_params(Params, ?VALID_SPEC)).