diff --git a/apps/emqx_rule_engine/include/rule_actions.hrl b/apps/emqx_rule_engine/include/rule_actions.hrl index a1532da9e..e432c4399 100644 --- a/apps/emqx_rule_engine/include/rule_actions.hrl +++ b/apps/emqx_rule_engine/include/rule_actions.hrl @@ -2,7 +2,7 @@ -type selected_data() :: map(). -type env_vars() :: map(). --type bindings() :: list(#{atom() => term()}). +-type bindings() :: list({atom(), term()}). -define(BINDING_KEYS, '__bindings__'). diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 49024bada..35ed30a48 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -35,6 +35,8 @@ -type(hook() :: atom() | 'any'). +-type(topic() :: binary()). + -type(resource_status() :: #{ alive := boolean() , atom() => binary() | atom() | list(binary()|atom()) }). @@ -65,7 +67,7 @@ -record(rule, { id :: rule_id() - , for :: hook() + , for :: list(topic()) , rawsql :: binary() , is_foreach :: boolean() , fields :: list() @@ -110,9 +112,11 @@ -record(action_instance_params, { id :: action_instance_id() - , params :: #{} %% the params got after initializing the action + %% the params got after initializing the action + , params :: #{} + %% the Func/Bindings got after initializing the action , apply :: fun((Data::map(), Envs::map()) -> any()) - | {M::module(), F::atom(), Args::list()} %% the func got after initializing the action + | #{mod := module(), bindings := #{atom() => term()}} }). %% Arithmetic operators diff --git a/apps/emqx_rule_engine/include/rule_events.hrl b/apps/emqx_rule_engine/include/rule_events.hrl deleted file mode 100644 index 9e76848b0..000000000 --- a/apps/emqx_rule_engine/include/rule_events.hrl +++ /dev/null @@ -1,285 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --define(COLUMNS(EVENT), [Key || {Key, _ExampleVal} <- ?EG_COLUMNS(EVENT)]). - --define(EG_COLUMNS(EVENT), - case EVENT of - 'message.publish' -> - [ {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"flags">>, #{}} - , {<<"headers">>, undefined} - , {<<"publish_received_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'message.delivered' -> - [ {<<"event">>, 'message.delivered'} - , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} - , {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"flags">>, #{}} - , {<<"publish_received_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'message.acked' -> - [ {<<"event">>, 'message.acked'} - , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} - , {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"flags">>, #{}} - , {<<"publish_received_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'message.dropped' -> - [ {<<"event">>, 'message.dropped'} - , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} - , {<<"reason">>, no_subscribers} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"flags">>, #{}} - , {<<"publish_received_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'client.connected' -> - [ {<<"event">>, 'client.connected'} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"mountpoint">>, undefined} - , {<<"peername">>, <<"192.168.0.10:56431">>} - , {<<"sockname">>, <<"0.0.0.0:1883">>} - , {<<"proto_name">>, <<"MQTT">>} - , {<<"proto_ver">>, 5} - , {<<"keepalive">>, 60} - , {<<"clean_start">>, true} - , {<<"expiry_interval">>, 3600} - , {<<"is_bridge">>, false} - , {<<"connected_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'client.disconnected' -> - [ {<<"event">>, 'client.disconnected'} - , {<<"reason">>, normal} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"peername">>, <<"192.168.0.10:56431">>} - , {<<"sockname">>, <<"0.0.0.0:1883">>} - , {<<"disconnected_at">>, erlang:system_time(millisecond)} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'session.subscribed' -> - [ {<<"event">>, 'session.subscribed'} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - 'session.unsubscribed' -> - [ {<<"event">>, 'session.unsubscribed'} - , {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"peerhost">>, <<"192.168.0.10">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"timestamp">>, erlang:system_time(millisecond)} - , {<<"node">>, node()} - ]; - RuleType -> - error({unknown_rule_type, RuleType}) - end). - --define(TEST_COLUMNS_MESSGE, - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - ]). - --define(TEST_COLUMNS_MESSGE_DELIVERED_ACKED, - [ {<<"from_clientid">>, <<"c_emqx_1">>} - , {<<"from_username">>, <<"u_emqx_1">>} - , {<<"clientid">>, <<"c_emqx_2">>} - , {<<"username">>, <<"u_emqx_2">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} - ]). - --define(TEST_COLUMNS(EVENT), - case EVENT of - 'message.publish' -> ?TEST_COLUMNS_MESSGE; - 'message.dropped' -> ?TEST_COLUMNS_MESSGE; - 'message.delivered' -> ?TEST_COLUMNS_MESSGE_DELIVERED_ACKED; - 'message.acked' -> ?TEST_COLUMNS_MESSGE_DELIVERED_ACKED; - 'client.connected' -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"peername">>, <<"127.0.0.1:52918">>} - ]; - 'client.disconnected' -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"reason">>, <<"normal">>} - ]; - 'session.subscribed' -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - ]; - 'session.unsubscribed' -> - [ {<<"clientid">>, <<"c_emqx">>} - , {<<"username">>, <<"u_emqx">>} - , {<<"topic">>, <<"t/a">>} - , {<<"qos">>, 1} - ]; - RuleType -> - error({unknown_rule_type, RuleType}) - end). - --define(EVENT_INFO_MESSAGE_PUBLISH, - #{ event => '$events/message_publish', - title => #{en => <<"message publish">>, zh => <<"消息发布"/utf8>>}, - description => #{en => <<"message publish">>, zh => <<"消息发布"/utf8>>}, - test_columns => ?TEST_COLUMNS('message.publish'), - columns => ?COLUMNS('message.publish'), - sql_example => <<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">> - }). - --define(EVENT_INFO_MESSAGE_DELIVER, - #{ event => '$events/message_delivered', - title => #{en => <<"message delivered">>, zh => <<"消息投递"/utf8>>}, - description => #{en => <<"message delivered">>, zh => <<"消息投递"/utf8>>}, - test_columns => ?TEST_COLUMNS('message.delivered'), - columns => ?COLUMNS('message.delivered'), - sql_example => <<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">> - }). - --define(EVENT_INFO_MESSAGE_ACKED, - #{ event => '$events/message_acked', - title => #{en => <<"message acked">>, zh => <<"消息应答"/utf8>>}, - description => #{en => <<"message acked">>, zh => <<"消息应答"/utf8>>}, - test_columns => ?TEST_COLUMNS('message.acked'), - columns => ?COLUMNS('message.acked'), - sql_example => <<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">> - }). - --define(EVENT_INFO_MESSAGE_DROPPED, - #{ event => '$events/message_dropped', - title => #{en => <<"message dropped">>, zh => <<"消息丢弃"/utf8>>}, - description => #{en => <<"message dropped">>, zh => <<"消息丢弃"/utf8>>}, - test_columns => ?TEST_COLUMNS('message.dropped'), - columns => ?COLUMNS('message.dropped'), - sql_example => <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> - }). - --define(EVENT_INFO_CLIENT_CONNECTED, - #{ event => '$events/client_connected', - title => #{en => <<"client connected">>, zh => <<"连接建立"/utf8>>}, - description => #{en => <<"client connected">>, zh => <<"连接建立"/utf8>>}, - test_columns => ?TEST_COLUMNS('client.connected'), - columns => ?COLUMNS('client.connected'), - sql_example => <<"SELECT * FROM \"$events/client_connected\"">> - }). - --define(EVENT_INFO_CLIENT_DISCONNECTED, - #{ event => '$events/client_disconnected', - title => #{en => <<"client disconnected">>, zh => <<"连接断开"/utf8>>}, - description => #{en => <<"client disconnected">>, zh => <<"连接断开"/utf8>>}, - test_columns => ?TEST_COLUMNS('client.disconnected'), - columns => ?COLUMNS('client.disconnected'), - sql_example => <<"SELECT * FROM \"$events/client_disconnected\"">> - }). - --define(EVENT_INFO_SESSION_SUBSCRIBED, - #{ event => '$events/session_subscribed', - title => #{en => <<"session subscribed">>, zh => <<"会话订阅完成"/utf8>>}, - description => #{en => <<"session subscribed">>, zh => <<"会话订阅完成"/utf8>>}, - test_columns => ?TEST_COLUMNS('session.subscribed'), - columns => ?COLUMNS('session.subscribed'), - sql_example => <<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">> - }). - --define(EVENT_INFO_SESSION_UNSUBSCRIBED, - #{ event => '$events/session_unsubscribed', - title => #{en => <<"session unsubscribed">>, zh => <<"会话取消订阅完成"/utf8>>}, - description => #{en => <<"session unsubscribed">>, zh => <<"会话取消订阅完成"/utf8>>}, - test_columns => ?TEST_COLUMNS('session.unsubscribed'), - columns => ?COLUMNS('session.unsubscribed'), - sql_example => <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> - }). - --define(EVENT_INFO, - [ ?EVENT_INFO_MESSAGE_PUBLISH - , ?EVENT_INFO_MESSAGE_DELIVER - , ?EVENT_INFO_MESSAGE_ACKED - , ?EVENT_INFO_MESSAGE_DROPPED - , ?EVENT_INFO_CLIENT_CONNECTED - , ?EVENT_INFO_CLIENT_DISCONNECTED - , ?EVENT_INFO_SESSION_SUBSCRIBED - , ?EVENT_INFO_SESSION_UNSUBSCRIBED - ]). - --define(EG_ENVS(EVENT_TOPIC), - case EVENT_TOPIC of - <<"$events/", _/binary>> -> - EventName = emqx_rule_events:event_name(EVENT_TOPIC), - emqx_rule_maps:atom_key_map(maps:from_list(?EG_COLUMNS(EventName))); - _PublishTopic -> - #{id => emqx_guid:to_hexstr(emqx_guid:gen()), - clientid => <<"c_emqx">>, - username => <<"u_emqx">>, - payload => <<"{\"id\": 1, \"name\": \"ha\"}">>, - peerhost => <<"127.0.0.1">>, - topic => <<"t/a">>, - qos => 1, - flags => #{sys => true, event => true}, - publish_received_at => emqx_rule_utils:now_ms(), - timestamp => emqx_rule_utils:now_ms(), - node => node() - } - end). diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index da54fdb3c..b4c685e7c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -106,7 +106,6 @@ , on_action_do_nothing/2 ]). - -spec(on_resource_create(binary(), map()) -> map()). on_resource_create(_Name, Conf) -> Conf. @@ -114,8 +113,7 @@ on_resource_create(_Name, Conf) -> %%------------------------------------------------------------------------------ %% Action 'inspect' %%------------------------------------------------------------------------------ --spec on_action_create_inspect(action_instance_id(), Params :: map()) - -> NewParams :: map(). +-spec on_action_create_inspect(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. on_action_create_inspect(Id, Params) -> Params. @@ -131,8 +129,7 @@ on_action_inspect(Selected, Envs) -> %%------------------------------------------------------------------------------ %% Action 'republish' %%------------------------------------------------------------------------------ --spec on_action_create_republish(action_instance_id(), Params :: map()) - -> NewParams :: map(). +-spec on_action_create_republish(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. on_action_create_republish(Id, Params = #{ <<"target_topic">> := TargetTopic, <<"target_qos">> := TargetQoS, @@ -203,8 +200,7 @@ increase_and_publish(ActId, Msg) -> emqx_rule_metrics:inc_actions_success(ActId), emqx_metrics:inc_msg(Msg). --spec on_action_create_do_nothing(action_instance_id(), Params :: map()) - -> NewParams :: map(). +-spec on_action_create_do_nothing(action_instance_id(), Params :: map()) -> {bindings(), NewParams :: map()}. on_action_create_do_nothing(ActId, Params) when is_binary(ActId) -> Params. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index fc40ff38e..f1be590c5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -72,8 +72,9 @@ %% Load all providers . -spec(load_providers() -> ok). load_providers() -> - [load_provider(App) || App <- ignore_lib_apps(application:loaded_applications())], - ok. + lists:foreach(fun(App) -> + load_provider(App) + end, ignore_lib_apps(application:loaded_applications())). -spec(load_provider(App :: atom()) -> ok). load_provider(App) when is_atom(App) -> @@ -86,8 +87,9 @@ load_provider(App) when is_atom(App) -> %% Load all providers . -spec(unload_providers() -> ok). unload_providers() -> - [unload_provider(App) || App <- ignore_lib_apps(application:loaded_applications())], - ok. + lists:foreach(fun(App) -> + unload_provider(App) + end, ignore_lib_apps(application:loaded_applications())). %% @doc Unload a provider. -spec(unload_provider(App :: atom()) -> ok). @@ -147,8 +149,7 @@ find_attrs(App, Def) -> module_attributes(Module) -> try Module:module_info(attributes) catch - error:undef -> []; - error:Reason -> error(Reason) + error:undef -> [] end. %%------------------------------------------------------------------------------ @@ -156,37 +157,52 @@ module_attributes(Module) -> %%------------------------------------------------------------------------------ -dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]). --spec(create_rule(#{}) -> {ok, rule()} | no_return()). -create_rule(Params = #{rawsql := Sql, actions := Actions}) -> +-spec create_rule(map()) -> {ok, rule()} | {error, term()}. +create_rule(Params = #{rawsql := Sql, actions := ActArgs}) -> case emqx_rule_sqlparser:parse_select(Sql) of {ok, Select} -> RuleId = maps:get(id, Params, rule_id()), Enabled = maps:get(enabled, Params, true), - Rule = #rule{id = RuleId, - rawsql = Sql, - for = emqx_rule_sqlparser:select_from(Select), - is_foreach = emqx_rule_sqlparser:select_is_foreach(Select), - fields = emqx_rule_sqlparser:select_fields(Select), - doeach = emqx_rule_sqlparser:select_doeach(Select), - incase = emqx_rule_sqlparser:select_incase(Select), - conditions = emqx_rule_sqlparser:select_where(Select), - on_action_failed = maps:get(on_action_failed, Params, continue), - actions = prepare_actions(Actions, Enabled), - enabled = Enabled, - description = maps:get(description, Params, "")}, - ok = emqx_rule_registry:add_rule(Rule), - ok = emqx_rule_metrics:create_rule_metrics(RuleId), - {ok, Rule}; - Error -> error(Error) + try prepare_actions(ActArgs, Enabled) of + Actions -> + Rule = #rule{ + id = RuleId, + rawsql = Sql, + for = emqx_rule_sqlparser:select_from(Select), + is_foreach = emqx_rule_sqlparser:select_is_foreach(Select), + fields = emqx_rule_sqlparser:select_fields(Select), + doeach = emqx_rule_sqlparser:select_doeach(Select), + incase = emqx_rule_sqlparser:select_incase(Select), + conditions = emqx_rule_sqlparser:select_where(Select), + on_action_failed = maps:get(on_action_failed, Params, continue), + actions = Actions, + enabled = Enabled, + description = maps:get(description, Params, "") + }, + ok = emqx_rule_registry:add_rule(Rule), + ok = emqx_rule_metrics:create_rule_metrics(RuleId), + {ok, Rule} + catch + throw:{action_not_found, ActionName} -> + {error, {action_not_found, ActionName}}; + throw:Reason -> + {error, Reason} + end; + Reason -> {error, Reason} end. -spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}). update_rule(Params = #{id := RuleId}) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule0} -> - Rule = may_update_rule_params(Rule0, Params), - ok = emqx_rule_registry:add_rule(Rule), - {ok, Rule}; + try may_update_rule_params(Rule0, Params) of + Rule -> + ok = emqx_rule_registry:add_rule(Rule), + {ok, Rule} + catch + throw:Reason -> + {error, Reason} + end; not_found -> {error, {not_found, RuleId}} end. @@ -234,9 +250,12 @@ start_resource(ResId) -> {ok, #resource{type = ResType, config = Config}} -> {ok, #resource_type{on_create = {Mod, Create}}} = emqx_rule_registry:find_resource_type(ResType), - _ = init_resource(Mod, Create, ResId, Config), - refresh_actions_of_a_resource(ResId), - ok; + try + init_resource(Mod, Create, ResId, Config), + refresh_actions_of_a_resource(ResId) + catch + throw:Reason -> {error, Reason} + end; not_found -> {error, {resource_not_found, ResId}} end. @@ -247,9 +266,12 @@ test_resource(#{type := Type, config := Config}) -> {ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} -> ok = emqx_rule_validator:validate_params(Config, ParamSpec), ResId = resource_id(), - cluster_call(init_resource, [ModC, Create, ResId, Config]), - cluster_call(clear_resource, [ModD, Destroy, ResId]), - ok; + try + cluster_call(init_resource, [ModC, Create, ResId, Config]), + cluster_call(clear_resource, [ModD, Destroy, ResId]) + catch + throw:Reason -> {error, Reason} + end; not_found -> {error, {resource_type_not_found, Type}} end. @@ -299,36 +321,37 @@ ensure_resource_deleted(ResId) -> -spec(refresh_resources() -> ok). refresh_resources() -> - [try refresh_resource(Res) - catch _:Error:ST -> - logger:critical( - "Can not re-stablish resource ~p: ~0p. The resource is disconnected." - "Fix the issue and establish it manually.\n" - "Stacktrace: ~0p", - [ResId, Error, ST]) - end || Res = #resource{id = ResId} - <- emqx_rule_registry:get_resources()], - ok. + lists:foreach(fun(#resource{id = ResId} = Res) -> + try refresh_resource(Res) + catch Error:Reason:ST -> + logger:critical( + "Can not re-stablish resource ~p: ~0p. The resource is disconnected." + "Fix the issue and establish it manually.\n" + "Stacktrace: ~0p", + [ResId, {Error,Reason}, ST]) + end + end, emqx_rule_registry:get_resources()). refresh_resource(Type) when is_atom(Type) -> - [refresh_resource(Resource) - || Resource <- emqx_rule_registry:get_resources_by_type(Type)]; + lists:foreach(fun(Resource) -> + refresh_resource(Resource) + end, emqx_rule_registry:get_resources_by_type(Type)); refresh_resource(#resource{id = ResId, config = Config, type = Type}) -> {ok, #resource_type{on_create = {M, F}}} = emqx_rule_registry:find_resource_type(Type), cluster_call(init_resource, [M, F, ResId, Config]). -spec(refresh_rules() -> ok). refresh_rules() -> - [try refresh_rule(Rule) - catch _:Error:ST -> - logger:critical( - "Can not re-build rule ~p: ~0p. The rule is disabled." - "Fix the issue and enable it manually.\n" - "Stacktrace: ~0p", - [RuleId, Error, ST]) - end || Rule = #rule{id = RuleId} - <- emqx_rule_registry:get_rules()], - ok. + lists:foreach(fun(#rule{id = RuleId} = Rule) -> + try refresh_rule(Rule) + catch Error:Reason:ST -> + logger:critical( + "Can not re-build rule ~p: ~0p. The rule is disabled." + "Fix the issue and enable it manually.\n" + "Stacktrace: ~0p", + [RuleId, {Error,Reason}, ST]) + end + end, emqx_rule_registry:get_rules()). refresh_rule(#rule{id = RuleId, actions = Actions}) -> ok = emqx_rule_metrics:create_rule_metrics(RuleId), @@ -389,7 +412,7 @@ may_update_rule_params(Rule, Params = #{rawsql := SQL}) -> conditions = emqx_rule_sqlparser:select_where(Select) }, maps:remove(rawsql, Params)); - Error -> error(Error) + Reason -> throw(Reason) end; may_update_rule_params(Rule, Params = #{enabled := Enabled}) -> Enabled andalso refresh_rule(Rule), @@ -443,7 +466,7 @@ cluster_call(Func, Args) -> end; {ResL, BadNodes} -> ?LOG(error, "cluster_call bad nodes found: ~p, ResL: ~p", [BadNodes, ResL]), - throw({func_fail(Func), {nodes_not_exist, BadNodes}}) + throw({func_fail(Func), {failed_on_nodes, BadNodes}}) end. init_resource(Module, OnCreate, ResId, Config) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 131ea9651..53cc125b0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -17,13 +17,11 @@ -module(emqx_rule_engine_api). -include("rule_engine.hrl"). --include("rule_events.hrl"). -include_lib("emqx/include/logger.hrl"). --import(minirest, [return/1]). +-logger_header("[RuleEngineAPI]"). -%% A lot of case clause no_match:es from rule_events.hrl --dialyzer(no_match). +-import(minirest, [return/1]). -rest_api(#{name => create_rule, method => 'POST', @@ -173,16 +171,19 @@ -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~s Not Found", [(ID)]))). -define(ERR_NO_ACTION(NAME), list_to_binary(io_lib:format("Action ~s Not Found", [(NAME)]))). -define(ERR_NO_RESOURCE(RESID), list_to_binary(io_lib:format("Resource ~s Not Found", [(RESID)]))). --define(ERR_NO_HOOK(HOOK), list_to_binary(io_lib:format("Event ~s Not Found", [(HOOK)]))). -define(ERR_NO_RESOURCE_TYPE(TYPE), list_to_binary(io_lib:format("Resource Type ~s Not Found", [(TYPE)]))). --define(ERR_UNKNOWN_COLUMN(COLUMN), list_to_binary(io_lib:format("Unknown Column: ~s", [(COLUMN)]))). --define(ERR_START_RESOURCE(RESID), list_to_binary(io_lib:format("Start Resource ~s Failed", [(RESID)]))). -define(ERR_BADARGS(REASON), begin R0 = list_to_binary(io_lib:format("~0p", [REASON])), <<"Bad Arguments: ", R0/binary>> end). +-dialyzer({nowarn_function, [create_rule/2, + test_rule_sql/1, + do_create_rule/1, + update_rule/2 + ]}). + %%------------------------------------------------------------------------------ %% Rules API %%------------------------------------------------------------------------------ @@ -192,60 +193,31 @@ create_rule(_Bindings, Params) -> Params). test_rule_sql(Params) -> - try emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of + case emqx_rule_sqltester:test(emqx_json:decode(emqx_json:encode(Params), [return_maps])) of {ok, Result} -> return({ok, Result}); - {error, nomatch} -> return({error, 404, <<"SQL Not Match">>}) - catch - throw:{invalid_hook, Hook} -> - return({error, 400, ?ERR_NO_HOOK(Hook)}); - throw:Reason -> - return({error, 400, ?ERR_BADARGS(Reason)}); - _:{parse_error,{unknown_column, Column}, _} -> - return({error, 400, ?ERR_UNKNOWN_COLUMN(Column)}); - _Error:Reason:StackT -> - ?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]), + {error, nomatch} -> return({error, 404, <<"SQL Not Match">>}); + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. do_create_rule(Params) -> - try emqx_rule_engine:create_rule(parse_rule_params(Params)) of - {ok, Rule} -> - return({ok, record_to_map(Rule)}); + case emqx_rule_engine:create_rule(parse_rule_params(Params)) of + {ok, Rule} -> return({ok, record_to_map(Rule)}); {error, {action_not_found, ActionName}} -> - return({error, 400, ?ERR_NO_ACTION(ActionName)}) - catch - throw:{resource_not_found, ResId} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}); - throw:{invalid_hook, Hook} -> - return({error, 400, ?ERR_NO_HOOK(Hook)}); - throw:Reason -> - return({error, 400, ?ERR_BADARGS(Reason)}); - _:{parse_error,{unknown_column, Column}} -> - return({error, 400, ?ERR_UNKNOWN_COLUMN(Column)}); - _Error:Reason:StackT -> - ?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]), + return({error, 400, ?ERR_NO_ACTION(ActionName)}); + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. update_rule(#{id := Id}, Params) -> - try emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of - {ok, Rule} -> - return({ok, record_to_map(Rule)}); + case emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of + {ok, Rule} -> return({ok, record_to_map(Rule)}); {error, {not_found, RuleId}} -> return({error, 400, ?ERR_NO_RULE(RuleId)}); - {error, {action_not_found, ActionName}} -> - return({error, 400, ?ERR_NO_ACTION(ActionName)}) - catch - throw:{resource_not_found, ResId} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}); - throw:{invalid_hook, Hook} -> - return({error, 400, ?ERR_NO_HOOK(Hook)}); - throw:Reason -> - return({error, 400, ?ERR_BADARGS(Reason)}); - _:{parse_error,{unknown_column, Column}} -> - return({error, 400, ?ERR_UNKNOWN_COLUMN(Column)}); - _Error:Reason:StackT -> - ?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]), + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. @@ -264,7 +236,9 @@ delete_rule(#{id := Id}, _Params) -> %%------------------------------------------------------------------------------ list_actions(#{}, _Params) -> - return_all(emqx_rule_registry:get_actions()). + return_all( + sort_by_title(action, + emqx_rule_registry:get_actions())). show_action(#{name := Name}, _Params) -> reply_with(fun emqx_rule_registry:find_action/1, Name). @@ -278,24 +252,15 @@ create_resource(#{}, Params) -> Params). do_create_resource(Create, Params) -> - try emqx_rule_engine:Create(parse_resource_params(Params)) of + case emqx_rule_engine:Create(parse_resource_params(Params)) of ok -> return(ok); {ok, Resource} -> return({ok, record_to_map(Resource)}); {error, {resource_type_not_found, Type}} -> - return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)}) - catch - throw:{resource_type_not_found, Type} -> return({error, 400, ?ERR_NO_RESOURCE_TYPE(Type)}); - throw:{init_resource_failure, Reason} -> - %% only test_resource would throw exceptions, create_resource won't - ?LOG(error, "[RuleEngineAPI] test_resource_failure: ~p", [Reason]), - return({error, 500, <<"Test Creating Resource Failed">>}); - throw:Reason -> - return({error, 400, ?ERR_BADARGS(Reason)}); - _Error:Reason:StackT -> - ?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]), + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. @@ -337,35 +302,19 @@ get_resource_status(#{id := Id}, _Params) -> end. start_resource(#{id := Id}, _Params) -> - try emqx_rule_engine:start_resource(Id) of + case emqx_rule_engine:start_resource(Id) of ok -> return(ok); {error, {resource_not_found, ResId}} -> - return({error, 400, ?ERR_NO_RESOURCE(ResId)}) - catch - throw:{{init_resource_failure, _}, Reason} -> - ?LOG(error, "[RuleEngineAPI] init_resource_failure: ~p", [Reason]), - return({error, 400, ?ERR_START_RESOURCE(Id)}); - throw:Reason -> - return({error, 400, ?ERR_BADARGS(Reason)}); - _Error:Reason:StackT -> - ?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]), + return({error, 400, ?ERR_NO_RESOURCE(ResId)}); + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), return({error, 400, ?ERR_BADARGS(Reason)}) end. delete_resource(#{id := Id}, _Params) -> - try - ok = emqx_rule_engine:ensure_resource_deleted(Id), - return(ok) - catch - _Error:{throw,Reason} -> - return({error, 400, ?ERR_BADARGS(Reason)}); - throw:Reason -> - return({error, 400, ?ERR_BADARGS(Reason)}); - _Error:Reason:StackT -> - ?LOG(error, "[RuleEngineAPI] ~p failed: ~0p", [?FUNCTION_NAME, {Reason, StackT}]), - return({error, 400, ?ERR_BADARGS(Reason)}) - end. + ok = emqx_rule_engine:ensure_resource_deleted(Id), + return(ok). %%------------------------------------------------------------------------------ %% Resource Types API @@ -385,7 +334,7 @@ show_resource_type(#{name := Name}, _Params) -> %%------------------------------------------------------------------------------ list_events(#{}, _Params) -> - return({ok, ?EVENT_INFO}). + return({ok, emqx_rule_events:event_info()}). %%------------------------------------------------------------------------------ %% Internal functions diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index b3b2d1963..ff65e6818 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -91,7 +91,7 @@ unload() -> %%----------------------------------------------------------------------------- %% 'rules' command %%----------------------------------------------------------------------------- - +-dialyzer([{nowarn_function, [rules/1]}]). rules(["list"]) -> print_all(emqx_rule_registry:get_rules()); diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index c9e6a32b3..c6df377bb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -37,6 +37,11 @@ , on_message_acked/3 ]). +-export([ event_info/0 + , columns/1 + , columns_with_exam/1 + ]). + -define(SUPPORTED_HOOK, [ 'client.connected' , 'client.disconnected' @@ -309,6 +314,239 @@ make_msg(QoS, Topic, Payload) -> emqx_message:set_flags(#{sys => true, event => true}, emqx_message:make(emqx_events, QoS, Topic, iolist_to_binary(Payload))). +%%-------------------------------------------------------------------- +%% Columns +%%-------------------------------------------------------------------- +columns(Event) -> + [Key || {Key, _ExampleVal} <- columns_with_exam(Event)]. + +event_info() -> + [ event_info_message_publish() + , event_info_message_deliver() + , event_info_message_acked() + , event_info_message_dropped() + , event_info_client_connected() + , event_info_client_disconnected() + , event_info_session_subscribed() + , event_info_session_unsubscribed() + ]. + +event_info_message_publish() -> + event_info_common( + 'message.publish', + {<<"message publish">>, <<"消息发布"/utf8>>}, + {<<"message publish">>, <<"消息发布"/utf8>>}, + <<"SELECT payload.msg as msg FROM \"t/#\" WHERE msg = 'hello'">> + ). +event_info_message_deliver() -> + event_info_common( + 'message.delivered', + {<<"message delivered">>, <<"消息已投递"/utf8>>}, + {<<"message delivered">>, <<"消息已投递"/utf8>>}, + <<"SELECT * FROM \"$events/message_delivered\" WHERE topic =~ 't/#'">> + ). +event_info_message_acked() -> + event_info_common( + 'message.acked', + {<<"message acked">>, <<"消息应答"/utf8>>}, + {<<"message acked">>, <<"消息应答"/utf8>>}, + <<"SELECT * FROM \"$events/message_acked\" WHERE topic =~ 't/#'">> + ). +event_info_message_dropped() -> + event_info_common( + 'message.dropped', + {<<"message dropped">>, <<"消息丢弃"/utf8>>}, + {<<"message dropped">>, <<"消息丢弃"/utf8>>}, + <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> + ). +event_info_client_connected() -> + event_info_common( + 'client.connected', + {<<"client connected">>, <<"连接建立"/utf8>>}, + {<<"client connected">>, <<"连接建立"/utf8>>}, + <<"SELECT * FROM \"$events/client_connected\"">> + ). +event_info_client_disconnected() -> + event_info_common( + 'client.disconnected', + {<<"client disconnected">>, <<"连接断开"/utf8>>}, + {<<"client disconnected">>, <<"连接断开"/utf8>>}, + <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">> + ). +event_info_session_subscribed() -> + event_info_common( + 'session.subscribed', + {<<"session subscribed">>, <<"会话订阅完成"/utf8>>}, + {<<"session subscribed">>, <<"会话订阅完成"/utf8>>}, + <<"SELECT * FROM \"$events/session_subscribed\" WHERE topic =~ 't/#'">> + ). +event_info_session_unsubscribed() -> + event_info_common( + 'session.unsubscribed', + {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, + {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, + <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> + ). + +event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> + #{event => event_topic(Event), + title => #{en => TitleEN, zh => TitleZH}, + description => #{en => DescrEN, zh => DescrZH}, + test_columns => test_columns(Event), + columns => columns(Event), + sql_example => SqlExam + }. + +test_columns('message.dropped') -> + test_columns('message.publish'); +test_columns('message.publish') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + ]; +test_columns('message.acked') -> + test_columns('message.delivered'); +test_columns('message.delivered') -> + [ {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + ]; +test_columns('client.connected') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peername">>, <<"127.0.0.1:52918">>} + ]; +test_columns('client.disconnected') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"reason">>, <<"normal">>} + ]; +test_columns('session.unsubscribed') -> + test_columns('session.subscribed'); +test_columns('session.subscribed') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + ]. + +columns_with_exam('message.publish') -> + [ {<<"event">>, 'message.publish'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"headers">>, undefined} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('message.delivered') -> + [ {<<"event">>, 'message.delivered'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('message.acked') -> + [ {<<"event">>, 'message.acked'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('message.dropped') -> + [ {<<"event">>, 'message.dropped'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"reason">>, no_subscribers} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('client.connected') -> + [ {<<"event">>, 'client.connected'} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"mountpoint">>, undefined} + , {<<"peername">>, <<"192.168.0.10:56431">>} + , {<<"sockname">>, <<"0.0.0.0:1883">>} + , {<<"proto_name">>, <<"MQTT">>} + , {<<"proto_ver">>, 5} + , {<<"keepalive">>, 60} + , {<<"clean_start">>, true} + , {<<"expiry_interval">>, 3600} + , {<<"is_bridge">>, false} + , {<<"connected_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('client.disconnected') -> + [ {<<"event">>, 'client.disconnected'} + , {<<"reason">>, normal} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peername">>, <<"192.168.0.10:56431">>} + , {<<"sockname">>, <<"0.0.0.0:1883">>} + , {<<"disconnected_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('session.subscribed') -> + [ {<<"event">>, 'session.subscribed'} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('session.unsubscribed') -> + [ {<<"event">>, 'session.unsubscribed'} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -354,7 +592,8 @@ event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; -event_topic('message.dropped') -> <<"$events/message_dropped">>. +event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{}; printable_maps(Headers) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index 7631435a4..4a1ef7da7 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -106,11 +106,11 @@ %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ --spec(create_rule_metrics(rule_id()) -> Ref :: counters:counters_ref()). +-spec(create_rule_metrics(rule_id()) -> ok). create_rule_metrics(Id) -> gen_server:call(?MODULE, {create_rule_metrics, Id}). --spec(create_metrics(rule_id()) -> Ref :: counters:counters_ref()). +-spec(create_metrics(rule_id()) -> ok). create_metrics(Id) -> gen_server:call(?MODULE, {create_metrics, Id}). diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 395e13eff..0a5358cc8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -19,7 +19,6 @@ -behaviour(gen_server). -include("rule_engine.hrl"). --include("rule_events.hrl"). -include_lib("emqx/include/logger.hrl"). -export([start_link/0]). @@ -166,14 +165,10 @@ start_link() -> get_rules() -> get_all_records(?RULE_TAB). -%% TODO: emqx_rule_utils:can_topic_match_oneof(Topic::any(), For::atom()) -%% will never return since it differs in the 2nd argument from the success -%% typing arguments: (any(), [binary() | ['' | '#' | '+' | binary()]]) --dialyzer([{nowarn_function, get_rules_for/1}]). -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), - Topic =:= For orelse emqx_rule_utils:can_topic_match_oneof(Topic, For)]. + emqx_rule_utils:can_topic_match_oneof(Topic, For)]. -spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found). get_rule(Id) -> @@ -333,10 +328,14 @@ remove_resource_params(ResId) -> %% @private delete_resource(ResId) -> - %% TODO, change to foreache:s - _ = [[ResId =:= ResId1 andalso throw({dependency_exists, {rule, Id}}) - || #action_instance{args = #{<<"$resource">> := ResId1}} <- Actions] - || #rule{id = Id, actions = Actions} <- get_rules()], + lists:foreach(fun(#rule{id = Id, actions = Actions}) -> + lists:foreach( + fun (#action_instance{args = #{<<"$resource">> := ResId1}}) + when ResId =:= ResId1 -> + throw({dependency_exists, {rule, Id}}); + (_) -> ok + end, Actions) + end, get_rules()), mnesia:delete(?RES_TAB, ResId, write). %% @private diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 72d8f752e..f2f3f5efc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -71,9 +71,6 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) -> apply_rules(More, Input). apply_rule_discard_result(Rule, Input) -> - %% TODO check if below two clauses are ok to discard: - %% {'error','nomatch'} - %% {'ok',[any()]} _ = apply_rule(Rule, Input), ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl index f6db69e44..ed2142ea4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl @@ -17,7 +17,6 @@ -module(emqx_rule_sqlparser). -include("rule_engine.hrl"). --include("rule_events.hrl"). -export([parse_select/1]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index bedfe324e..90eb623ac 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -15,7 +15,6 @@ -module(emqx_rule_sqltester). -include("rule_engine.hrl"). --include("rule_events.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ test/1 @@ -27,10 +26,11 @@ test_rule/4, flatten/1, sql_test_action/0, - fill_default_values/2 + fill_default_values/2, + envs_examp/1 ]}). --spec(test(#{}) -> {ok, Result::map()} | no_return()). +-spec(test(#{}) -> {ok, map() | list()} | {error, term()}). test(#{<<"rawsql">> := Sql, <<"ctx">> := Context}) -> {ok, Select} = emqx_rule_sqlparser:parse_select(Sql), InTopic = maps:get(<<"topic">>, Context, <<>>), @@ -96,4 +96,23 @@ sql_test_action() -> end. fill_default_values(Event, Context) -> - maps:merge(?EG_ENVS(Event), Context). + maps:merge(envs_examp(Event), Context). + +envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) -> + EventName = emqx_rule_events:event_name(EVENT_TOPIC), + emqx_rule_maps:atom_key_map( + maps:from_list( + emqx_rule_events:columns_with_exam(EventName))); +envs_examp(_) -> + #{id => emqx_guid:to_hexstr(emqx_guid:gen()), + clientid => <<"c_emqx">>, + username => <<"u_emqx">>, + payload => <<"{\"id\": 1, \"name\": \"ha\"}">>, + peerhost => <<"127.0.0.1">>, + topic => <<"t/a">>, + qos => 1, + flags => #{sys => true, event => true}, + publish_received_at => emqx_rule_utils:now_ms(), + timestamp => emqx_rule_utils:now_ms(), + node => node() + }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 78cd5c27a..ba3d4e841 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -67,9 +67,7 @@ -type(tmpl_cmd() :: list(tmpl_token())). --type(prepare_statement() :: binary()). - --type(prepare_params() :: fun((binary()) -> list())). +-type(prepare_statement_key() :: binary()). %% preprocess template string with place holders -spec(preproc_tmpl(binary()) -> tmpl_token()). @@ -94,6 +92,7 @@ put_head(Type, Term, List) -> proc_tmpl(Tokens, Data) -> proc_tmpl(Tokens, Data, #{return => full_binary}). +-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()). proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) -> Trans = maps:get(var_trans, Opts, fun bin/1), list_to_binary( @@ -115,19 +114,21 @@ preproc_cmd(Str) -> SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]), [preproc_tmpl(SubStr) || SubStr <- SubStrList]. +-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()). proc_cmd(Tokens, Data) -> proc_cmd(Tokens, Data, #{return => full_binary}). +-spec(proc_cmd([tmpl_token()], map(), map()) -> list()). proc_cmd(Tokens, Data, Opts) -> [proc_tmpl(Tks, Data, Opts) || Tks <- Tokens]. %% preprocess SQL with place holders --spec(preproc_sql(Sql::binary()) -> {prepare_statement(), prepare_params()}). --dialyzer({nowarn_function,preproc_sql/1}). +-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}). preproc_sql(Sql) -> preproc_sql(Sql, '?'). --spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') -> {prepare_statement(), prepare_params()}). --dialyzer({nowarn_function,preproc_sql/2}). +-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n') + -> {prepare_statement_key(), tmpl_token()}). + preproc_sql(Sql, ReplaceWith) -> case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of {match, PlaceHolders} -> @@ -330,5 +331,6 @@ now_ms() -> erlang:system_time(millisecond). can_topic_match_oneof(Topic, Filters) -> - MatchedFilters = [Fltr || Fltr <- Filters, emqx_topic:match(Topic, Fltr)], - length(MatchedFilters) > 0. + lists:any(fun(Fltr) -> + emqx_topic:match(Topic, Fltr) + end, Filters). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 06a6b5d4b..99a89ac32 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -2378,7 +2378,7 @@ start_apps() -> [start_apps(App, SchemaFile, ConfigFile) || {App, SchemaFile, ConfigFile} <- [{emqx, deps_path(emqx, "priv/emqx.schema"), - deps_path(emqx, "etc/gen.emqx.conf")}, + deps_path(emqx, "etc/emqx.conf")}, {emqx_rule_engine, local_path("priv/emqx_rule_engine.schema"), local_path("etc/emqx_rule_engine.conf")}]]. @@ -2424,6 +2424,7 @@ set_special_configs(_App) -> ok. print_mock() -> + catch meck:unload(emqx_ctl), meck:new(emqx_ctl, [non_strict, passthrough]), meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end), meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end), diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index dd07d10a7..045f5e114 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -121,12 +121,8 @@ zh => <<"将数据转发给 Web 服务"/utf8>>} }). --type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())). - -type(url() :: binary()). --export_type([action_fun/0]). - -export([ on_resource_create/2 , on_get_resource_status/2 , on_resource_destroy/2 @@ -166,7 +162,7 @@ on_resource_destroy(_ResId, _Params) -> ok. %% An action that forwards publish messages to a remote web server. --spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> action_fun()). +-spec(on_action_create_data_to_webserver(Id::binary(), #{url() := string()}) -> {bindings(), NewParams :: map()}). on_action_create_data_to_webserver(Id, Params) -> #{url := Url, headers := Headers, method := Method, content_type := ContentType, payload_tmpl := PayloadTmpl, path := Path} = parse_action_params(Params),