diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 7cb5d28d4..88527a2b9 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -88,4 +88,4 @@ end()). %% Tables --define(RULE_TAB, emqx_rule). +-define(RULE_TAB, emqx_rule_engine). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index a93a7e14d..95b153191 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -3,7 +3,7 @@ [{description, "EMQ X Rule Engine"}, {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, - {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, + {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel,stdlib,rulesql,getopt]}, {mod, {emqx_rule_engine_app, []}}, {env, []}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 244952a14..0c534b665 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -16,19 +16,66 @@ -module(emqx_rule_engine). +-behaviour(gen_server). + -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-export([start_link/0]). + +%% Rule Management -export([ load_rules/0 ]). -export([ create_rule/1 + , insert_rule/1 , update_rule/1 , delete_rule/1 + , get_rule/1 ]). +-export([ get_rules/0 + , get_rules_for_topic/1 + , get_rules_with_same_event/1 + , get_rules_ordered_by_ts/0 + ]). + +%% exported for cluster_call +-export([ do_delete_rule/1 + , do_insert_rule/1 + ]). + +-export([ load_hooks_for_rule/1 + , unload_hooks_for_rule/1 + , add_metrics_for_rule/1 + , clear_metrics_for_rule/1 + ]). + +%% gen_server Callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-define(RULE_ENGINE, ?MODULE). + +-define(T_CALL, 10000). + %%------------------------------------------------------------------------------ -%% APIs for rules and resources +%% Start the gen_server +%%------------------------------------------------------------------------------ + +-spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []). + +%%------------------------------------------------------------------------------ +%% APIs for rules %%------------------------------------------------------------------------------ -spec load_rules() -> ok. @@ -39,26 +86,111 @@ load_rules() -> -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params = #{id := RuleId}) -> - case emqx_rule_registry:get_rule(RuleId) of + case get_rule(RuleId) of not_found -> do_create_rule(Params); {ok, _} -> {error, {already_exists, RuleId}} end. -spec update_rule(map()) -> {ok, rule()} | {error, term()}. update_rule(Params = #{id := RuleId}) -> - case delete_rule(RuleId) of - ok -> do_create_rule(Params); - Error -> Error + ok = delete_rule(RuleId), + do_create_rule(Params). + +-spec(delete_rule(RuleId :: rule_id()) -> ok). +delete_rule(RuleId) -> + gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL). + +-spec(insert_rule(Rule :: rule()) -> ok). +insert_rule(Rule) -> + gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL). + +%%------------------------------------------------------------------------------ +%% Rule Management +%%------------------------------------------------------------------------------ + +-spec(get_rules() -> [rule()]). +get_rules() -> + get_all_records(?RULE_TAB). + +get_rules_ordered_by_ts() -> + lists:sort(fun(#{created_at := CreatedA}, #{created_at := CreatedB}) -> + CreatedA =< CreatedB + end, get_rules()). + +-spec(get_rules_for_topic(Topic :: binary()) -> [rule()]). +get_rules_for_topic(Topic) -> + [Rule || Rule = #{from := From} <- get_rules(), + emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)]. + +-spec(get_rules_with_same_event(Topic :: binary()) -> [rule()]). +get_rules_with_same_event(Topic) -> + EventName = emqx_rule_events:event_name(Topic), + [Rule || Rule = #{from := From} <- get_rules(), + lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)]. + +is_of_event_name(EventName, Topic) -> + EventName =:= emqx_rule_events:event_name(Topic). + +-spec(get_rule(Id :: rule_id()) -> {ok, rule()} | not_found). +get_rule(Id) -> + case ets:lookup(?RULE_TAB, Id) of + [{Id, Rule}] -> {ok, Rule#{id => Id}}; + [] -> not_found end. --spec(delete_rule(RuleId :: rule_id()) -> ok | {error, term()}). -delete_rule(RuleId) -> - case emqx_rule_registry:get_rule(RuleId) of - {ok, Rule} -> - emqx_rule_registry:remove_rule(Rule); - not_found -> - {error, not_found} - end. +load_hooks_for_rule(#{from := Topics}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics). + +add_metrics_for_rule(#{id := Id}) -> + ok = emqx_rule_metrics:create_rule_metrics(Id). + +clear_metrics_for_rule(#{id := Id}) -> + ok = emqx_rule_metrics:clear_rule_metrics(Id). + +unload_hooks_for_rule(#{id := Id, from := Topics}) -> + lists:foreach(fun(Topic) -> + case get_rules_with_same_event(Topic) of + [#{id := Id0}] when Id0 == Id -> %% we are now deleting the last rule + emqx_rule_events:unload(Topic); + _ -> ok + end + end, Topics). + +%%------------------------------------------------------------------------------ +%% gen_server callbacks +%%------------------------------------------------------------------------------ + +init([]) -> + _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, + {read_concurrency, true}]), + ok = load_rules(), + {ok, #{}}. + +handle_call({insert_rule, Rule}, _From, State) -> + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_insert_rule, [Rule]), + {reply, ok, State}; + +handle_call({delete_rule, Rule}, _From, State) -> + _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_delete_rule, [Rule]), + {reply, ok, State}; + +handle_call(Req, _From, State) -> + ?SLOG(error, #{msg => "unexpected_call", request => Req}), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?SLOG(error, #{msg => "unexpected_cast", request => Msg}), + {noreply, State}. + +handle_info(Info, State) -> + ?SLOG(error, #{msg => "unexpected_info", request => Info}), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%------------------------------------------------------------------------------ %% Internal Functions @@ -83,11 +215,27 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> conditions => emqx_rule_sqlparser:select_where(Select) %% -- calculated fields end }, - ok = emqx_rule_registry:add_rule(Rule), + ok = insert_rule(Rule), {ok, Rule}; {error, Reason} -> {error, Reason} end. +do_insert_rule(#{id := Id} = Rule) -> + ok = load_hooks_for_rule(Rule), + ok = add_metrics_for_rule(Rule), + true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}), + ok. + +do_delete_rule(RuleId) -> + case get_rule(RuleId) of + {ok, Rule} -> + ok = unload_hooks_for_rule(Rule), + ok = clear_metrics_for_rule(Rule), + true = ets:delete(?RULE_TAB, RuleId), + ok; + not_found -> ok + end. + parse_outputs(Outputs) -> [do_parse_outputs(Out) || Out <- Outputs]. @@ -109,3 +257,6 @@ parse_output_func(BinFunc) when is_binary(BinFunc) -> end; parse_output_func(Func) when is_function(Func) -> Func. + +get_all_records(Tab) -> + [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 63c00ed62..6099bea81 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -141,21 +141,24 @@ put_req_schema() -> description => <<"The outputs of the rule">>, type => array, items => #{ - type => object, - properties => #{ - type => #{ + oneOf => [ + #{ type => string, - enum => [<<"bridge">>, <<"builtin">>], - example => <<"builtin">> + description => <<"The channel id of an emqx bridge">> }, - target => #{ - type => string, - example => <<"console">> - }, - args => #{ - type => object + #{ + type => object, + properties => #{ + function => #{ + type => string, + example => <<"console">> + }, + args => #{ + type => object + } + } } - } + ] } }, description => #{ @@ -241,7 +244,7 @@ list_events(#{}, _Params) -> {200, emqx_rule_events:event_info()}. crud_rules(get, _Params) -> - Records = emqx_rule_registry:get_rules_ordered_by_ts(), + Records = emqx_rule_engine:get_rules_ordered_by_ts(), {200, format_rule_resp(Records)}; crud_rules(post, #{body := Params}) -> @@ -259,7 +262,7 @@ rule_test(post, #{body := Params}) -> end). crud_rules_by_id(get, #{bindings := #{id := Id}}) -> - case emqx_rule_registry:get_rule(Id) of + case emqx_rule_engine:get_rule(Id) of {ok, Rule} -> {200, format_rule_resp(Rule)}; not_found -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 4f41ba334..b9ee6f1d5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -25,9 +25,8 @@ -export([stop/1]). start(_Type, _Args) -> - ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]), + _ = ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]), ok = emqx_rule_events:reload(), - ok = emqx_rule_engine:load_rules(), emqx_rule_engine_sup:start_link(). stop(_State) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 197412a9e..41b3693bb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -24,6 +24,9 @@ , roots/0 , fields/1]). +-export([ validate_sql/1 + ]). + namespace() -> rule_engine. roots() -> ["rule_engine"]. @@ -34,7 +37,8 @@ fields("rule_engine") -> ]; fields("rules") -> - [ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})} + [ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false, + validator => fun ?MODULE:validate_sql/1})} , {"outputs", sc(hoconsc:array(hoconsc:union( [ binary() , ref("builtin_output_republish") @@ -79,5 +83,11 @@ fields("republish_args") -> default => <<"${payload}">>})} ]. +validate_sql(Sql) -> + case emqx_rule_sqlparser:parse(Sql) of + {ok, _Result} -> ok; + {error, Reason} -> {error, Reason} + end. + sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 4fad54a4b..7fd44df82 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -28,12 +28,12 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Registry = #{id => emqx_rule_registry, - start => {emqx_rule_registry, start_link, []}, + Registry = #{id => emqx_rule_engine, + start => {emqx_rule_engine, start_link, []}, restart => permanent, shutdown => 5000, type => worker, - modules => [emqx_rule_registry]}, + modules => [emqx_rule_engine]}, Metrics = #{id => emqx_rule_metrics, start => {emqx_rule_metrics, start_link, []}, restart => permanent, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a2f5439b2..614dc841b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -64,7 +64,9 @@ -endif. reload() -> - emqx_rule_registry:load_hooks_for_rules(emqx_rule_registry:get_rules()). + lists:foreach(fun(Rule) -> + ok = emqx_rule_engine:load_hooks_for_rule(Rule) + end, emqx_rule_engine:get_rules()). load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) -> emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received, @@ -86,7 +88,7 @@ unload(Topic) -> %% Callbacks %%-------------------------------------------------------------------- on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) -> - case emqx_rule_registry:get_rules_for_topic(BridgeTopic) of + case emqx_rule_engine:get_rules_for_topic(BridgeTopic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, Message) end. @@ -95,7 +97,7 @@ on_message_publish(Message = #message{topic = Topic}, _Env) -> case ignore_sys_message(Message) of true -> ok; false -> - case emqx_rule_registry:get_rules_for_topic(Topic) of + case emqx_rule_engine:get_rules_for_topic(Topic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message)) end @@ -310,7 +312,7 @@ with_basic_columns(EventName, Data) when is_map(Data) -> %%-------------------------------------------------------------------- apply_event(EventName, GenEventMsg, _Env) -> EventTopic = event_topic(EventName), - case emqx_rule_registry:get_rules_for_topic(EventTopic) of + case emqx_rule_engine:get_rules_for_topic(EventTopic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl index 021f72ca6..eccf090e9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_outputs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_outputs.erl @@ -90,17 +90,14 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload) -> _ = emqx_broker:safe_publish(Msg), emqx_metrics:inc_msg(Msg). -pre_process_repub_args(#{<<"topic">> := Topic} = Args) -> - QoS = maps:get(<<"qos">>, Args, <<"${qos}">>), - Retain = maps:get(<<"retain">>, Args, <<"${retain}">>), - Payload = maps:get(<<"payload">>, Args, <<"${payload}">>), - #{topic => Topic, qos => QoS, payload => Payload, retain => Retain, - preprocessed_tmpl => #{ - topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), - qos => preproc_vars(QoS), - retain => preproc_vars(Retain), - payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) - }}. +pre_process_repub_args(#{topic := Topic, qos := QoS, retain := Retain, + payload := Payload} = Args) -> + Args#{preprocessed_tmpl => #{ + topic => emqx_plugin_libs_rule:preproc_tmpl(Topic), + qos => preproc_vars(QoS), + retain => preproc_vars(Retain), + payload => emqx_plugin_libs_rule:preproc_tmpl(Payload) + }}. preproc_vars(Data) when is_binary(Data) -> emqx_plugin_libs_rule:preproc_tmpl(Data); diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl deleted file mode 100644 index ae7302673..000000000 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ /dev/null @@ -1,216 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_registry). - --behaviour(gen_server). - --include("rule_engine.hrl"). --include_lib("emqx/include/logger.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --export([start_link/0]). - -%% Rule Management --export([ get_rules/0 - , get_rules_for_topic/1 - , get_rules_with_same_event/1 - , get_rules_ordered_by_ts/0 - , get_rule/1 - , add_rule/1 - , add_rules/1 - , remove_rule/1 - , remove_rules/1 - ]). - --export([ do_remove_rules/1 - , do_add_rules/1 - ]). - --export([ load_hooks_for_rules/1 - , unload_hooks_for_rule/1 - , add_metrics_for_rules/1 - , clear_metrics_for_rules/1 - ]). - -%% gen_server Callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --define(REGISTRY, ?MODULE). - --define(T_CALL, 10000). - -%%------------------------------------------------------------------------------ -%% Start the registry -%%------------------------------------------------------------------------------ - --spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). - -%%------------------------------------------------------------------------------ -%% Rule Management -%%------------------------------------------------------------------------------ - --spec(get_rules() -> [rule()]). -get_rules() -> - get_all_records(?RULE_TAB). - -get_rules_ordered_by_ts() -> - lists:sort(fun(#{created_at := CreatedA}, #{created_at := CreatedB}) -> - CreatedA =< CreatedB - end, get_rules()). - --spec(get_rules_for_topic(Topic :: binary()) -> [rule()]). -get_rules_for_topic(Topic) -> - [Rule || Rule = #{from := From} <- get_rules(), - emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)]. - --spec(get_rules_with_same_event(Topic :: binary()) -> [rule()]). -get_rules_with_same_event(Topic) -> - EventName = emqx_rule_events:event_name(Topic), - [Rule || Rule = #{from := From} <- get_rules(), - lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)]. - -is_of_event_name(EventName, Topic) -> - EventName =:= emqx_rule_events:event_name(Topic). - --spec(get_rule(Id :: rule_id()) -> {ok, rule()} | not_found). -get_rule(Id) -> - case ets:lookup(?RULE_TAB, Id) of - [{Id, Rule}] -> {ok, Rule#{id => Id}}; - [] -> not_found - end. - --spec(add_rule(rule()) -> ok). -add_rule(Rule) -> - add_rules([Rule]). - --spec(add_rules([rule()]) -> ok). -add_rules(Rules) -> - gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL). - --spec(remove_rule(rule() | rule_id()) -> ok). -remove_rule(RuleOrId) -> - remove_rules([RuleOrId]). - --spec(remove_rules([rule()] | list(rule_id())) -> ok). -remove_rules(Rules) -> - gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). - -%% @private - -do_add_rules([]) -> ok; -do_add_rules(Rules) -> - load_hooks_for_rules(Rules), - add_metrics_for_rules(Rules), - ets:insert(?RULE_TAB, [{Id, maps:remove(id, R)} || #{id := Id} = R <- Rules]), - ok. - -%% @private -do_remove_rules([]) -> ok; -do_remove_rules(RuleIds = [Id|_]) when is_binary(Id) -> - RuleRecs = - lists:foldl(fun(RuleId, Acc) -> - case get_rule(RuleId) of - {ok, Rule} -> [Rule|Acc]; - not_found -> Acc - end - end, [], RuleIds), - remove_rules_unload_hooks(RuleRecs); -do_remove_rules(Rules = [Rule|_]) when is_map(Rule) -> - remove_rules_unload_hooks(Rules). - -remove_rules_unload_hooks(Rules) -> - unload_hooks_for_rule(Rules), - clear_metrics_for_rules(Rules), - lists:foreach(fun(#{id := Id}) -> - ets:delete(?RULE_TAB, Id) - end, Rules). - -load_hooks_for_rules(Rules) -> - lists:foreach(fun(#{from := Topics}) -> - lists:foreach(fun emqx_rule_events:load/1, Topics) - end, Rules). - -add_metrics_for_rules(Rules) -> - lists:foreach(fun(#{id := Id}) -> - ok = emqx_rule_metrics:create_rule_metrics(Id) - end, Rules). - -clear_metrics_for_rules(Rules) -> - lists:foreach(fun(#{id := Id}) -> - ok = emqx_rule_metrics:clear_rule_metrics(Id) - end, Rules). - -unload_hooks_for_rule(Rules) -> - lists:foreach(fun(#{id := Id, from := Topics}) -> - lists:foreach(fun(Topic) -> - case get_rules_with_same_event(Topic) of - [#{id := Id0}] when Id0 == Id -> %% we are now deleting the last rule - emqx_rule_events:unload(Topic); - _ -> ok - end - end, Topics) - end, Rules). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([]) -> - _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, - {read_concurrency, true}]), - {ok, #{}}. - -handle_call({add_rules, Rules}, _From, State) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_add_rules, [Rules]), - {reply, ok, State}; - -handle_call({remove_rules, Rules}, _From, State) -> - _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_remove_rules, [Rules]), - {reply, ok, State}; - -handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", request => Req}), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", request => Msg}), - {noreply, State}. - -handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", request => Info}), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%------------------------------------------------------------------------------ -%% Private functions -%%------------------------------------------------------------------------------ - -get_all_records(Tab) -> - [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)]. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 348d6055c..812406b2c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -162,7 +162,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(t_events, Config) -> ets:delete(events_record_tab), - ok = emqx_rule_registry:remove_rule(?config(hook_points_rules, Config)); + ok = delete_rule(?config(hook_points_rules, Config)); end_per_testcase(_TestCase, _Config) -> ok. @@ -175,10 +175,10 @@ t_create_rule(_Config) -> id => <<"t_create_rule">>, outputs => [#{function => console}], description => <<"debug rule">>}), - ct:pal("======== emqx_rule_registry:get_rules :~p", [emqx_rule_registry:get_rules()]), + ct:pal("======== emqx_rule_engine:get_rules :~p", [emqx_rule_engine:get_rules()]), ?assertMatch({ok, #{id := Id, from := [<<"t/a">>]}}, - emqx_rule_registry:get_rule(Id)), - emqx_rule_registry:remove_rule(Id), + emqx_rule_engine:get_rule(Id)), + delete_rule(Id), ok. %%------------------------------------------------------------------------------ @@ -199,34 +199,27 @@ t_kv_store(_) -> t_add_get_remove_rule(_Config) -> RuleId0 = <<"rule-debug-0">>, - ok = emqx_rule_registry:add_rule(make_simple_rule(RuleId0)), - ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_registry:get_rule(RuleId0)), - ok = emqx_rule_registry:remove_rule(RuleId0), - ?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId0)), + ok = emqx_rule_engine:insert_rule(make_simple_rule(RuleId0)), + ?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)), + ok = delete_rule(RuleId0), + ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)), RuleId1 = <<"rule-debug-1">>, Rule1 = make_simple_rule(RuleId1), - ok = emqx_rule_registry:add_rule(Rule1), - ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_registry:get_rule(RuleId1)), - ok = emqx_rule_registry:remove_rule(Rule1), - ?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId1)), + ok = emqx_rule_engine:insert_rule(Rule1), + ?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)), + ok = delete_rule(Rule1), + ?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)), ok. t_add_get_remove_rules(_Config) -> - emqx_rule_registry:remove_rules(emqx_rule_registry:get_rules()), - ok = emqx_rule_registry:add_rules( + delete_rules_by_ids(emqx_rule_engine:get_rules()), + ok = insert_rules( [make_simple_rule(<<"rule-debug-1">>), make_simple_rule(<<"rule-debug-2">>)]), - ?assertEqual(2, length(emqx_rule_registry:get_rules())), - ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]), - ?assertEqual([], emqx_rule_registry:get_rules()), - - Rule3 = make_simple_rule(<<"rule-debug-3">>), - Rule4 = make_simple_rule(<<"rule-debug-4">>), - ok = emqx_rule_registry:add_rules([Rule3, Rule4]), - ?assertEqual(2, length(emqx_rule_registry:get_rules())), - ok = emqx_rule_registry:remove_rules([Rule3, Rule4]), - ?assertEqual([], emqx_rule_registry:get_rules()), + ?assertEqual(2, length(emqx_rule_engine:get_rules())), + ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]), + ?assertEqual([], emqx_rule_engine:get_rules()), ok. t_create_existing_rule(_Config) -> @@ -236,25 +229,25 @@ t_create_existing_rule(_Config) -> sql => <<"select * from \"t/#\"">>, outputs => [#{function => console}] }), - {ok, #{sql := SQL}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>), + {ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>), ?assertEqual(<<"select * from \"t/#\"">>, SQL), - ok = emqx_rule_engine:delete_rule(<<"an_existing_rule">>), - ?assertEqual(not_found, emqx_rule_registry:get_rule(<<"an_existing_rule">>)), + ok = delete_rule(<<"an_existing_rule">>), + ?assertEqual(not_found, emqx_rule_engine:get_rule(<<"an_existing_rule">>)), ok. t_get_rules_for_topic(_Config) -> - Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>)), - ok = emqx_rule_registry:add_rules( + Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)), + ok = insert_rules( [make_simple_rule(<<"rule-debug-1">>), make_simple_rule(<<"rule-debug-2">>)]), - ?assertEqual(Len0+2, length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>))), - ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]), + ?assertEqual(Len0+2, length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>))), + ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]), ok. t_get_rules_ordered_by_ts(_Config) -> Now = fun() -> erlang:system_time(nanosecond) end, - ok = emqx_rule_registry:add_rules( + ok = insert_rules( [make_simple_rule_with_ts(<<"rule-debug-0">>, Now()), make_simple_rule_with_ts(<<"rule-debug-1">>, Now()), make_simple_rule_with_ts(<<"rule-debug-2">>, Now()) @@ -263,11 +256,11 @@ t_get_rules_ordered_by_ts(_Config) -> #{id := <<"rule-debug-0">>}, #{id := <<"rule-debug-1">>}, #{id := <<"rule-debug-2">>} - ], emqx_rule_registry:get_rules_ordered_by_ts()). + ], emqx_rule_engine:get_rules_ordered_by_ts()). t_get_rules_for_topic_2(_Config) -> - Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>)), - ok = emqx_rule_registry:add_rules( + Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)), + ok = insert_rules( [make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]), make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [<<"simple/+/1">>]), @@ -275,21 +268,21 @@ t_get_rules_for_topic_2(_Config) -> make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]), make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>]) ]), - ?assertEqual(Len0+4, length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>))), - ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), + ?assertEqual(Len0+4, length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>))), + ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), ok. t_get_rules_with_same_event(_Config) -> PubT = <<"simple/1">>, - PubN = length(emqx_rule_registry:get_rules_with_same_event(PubT)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>)), - ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>)), - ok = emqx_rule_registry:add_rules( + PubN = length(emqx_rule_engine:get_rules_with_same_event(PubT)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)), + ?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)), + ok = insert_rules( [make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]), make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]), @@ -301,15 +294,15 @@ t_get_rules_with_same_event(_Config) -> make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]), make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]) ]), - ?assertEqual(PubN + 3, length(emqx_rule_registry:get_rules_with_same_event(PubT))), - ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>))), - ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>))), - ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>))), - ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>))), - ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>))), - ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>))), - ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>))), - ok = emqx_rule_registry:remove_rules([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]), + ?assertEqual(PubN + 3, length(emqx_rule_engine:get_rules_with_same_event(PubT))), + ?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>))), + ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>))), + ?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>))), + ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>))), + ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>))), + ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>))), + ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>))), + ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]), ok. %%------------------------------------------------------------------------------ @@ -405,7 +398,7 @@ t_match_atom_and_binary(_Config) -> end, emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule). + delete_rule(TopicRule). t_sqlselect_0(_Config) -> %% Verify SELECT with and without 'AS' @@ -524,7 +517,7 @@ t_sqlselect_01(_Config) -> end, emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule1). + delete_rule(TopicRule1). t_sqlselect_02(_Config) -> SQL = "SELECT * " @@ -562,7 +555,7 @@ t_sqlselect_02(_Config) -> end, emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule1). + delete_rule(TopicRule1). t_sqlselect_1(_Config) -> SQL = "SELECT json_decode(payload) as p, payload " @@ -592,7 +585,7 @@ t_sqlselect_1(_Config) -> end, emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule). + delete_rule(TopicRule). t_sqlselect_2(_Config) -> %% recursively republish to t2 @@ -618,7 +611,7 @@ t_sqlselect_2(_Config) -> received_nothing = Fun(), emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule). + delete_rule(TopicRule). t_sqlselect_3(_Config) -> %% republish the client.connected msg @@ -650,7 +643,7 @@ t_sqlselect_3(_Config) -> end, emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule). + delete_rule(TopicRule). t_sqlparse_event_1(_Config) -> Sql = "select topic as tp " @@ -1302,7 +1295,7 @@ republish_output(Topic) -> republish_output(Topic, <<"${payload}">>). republish_output(Topic, Payload) -> #{function => republish, - args => #{<<"payload">> => Payload, <<"topic">> => Topic, <<"qos">> => 0}}. + args => #{payload => Payload, topic => Topic, qos => 0, retain => false}}. make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> SQL = <<"select * from \"simple/topic\"">>, @@ -1597,3 +1590,17 @@ deps_path(App, RelativePath) -> local_path(RelativePath) -> deps_path(emqx_rule_engine, RelativePath). +insert_rules(Rules) -> + lists:foreach(fun(Rule) -> + ok = emqx_rule_engine:insert_rule(Rule) + end, Rules). + +delete_rules_by_ids(Ids) -> + lists:foreach(fun(Id) -> + ok = emqx_rule_engine:delete_rule(Id) + end, Ids). + +delete_rule(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id); +delete_rule(Id) when is_binary(Id) -> + ok = emqx_rule_engine:delete_rule(Id).