refactor(rule_engine): merge code for emqx_rule_registry and emqx_rule_engine

This commit is contained in:
Shawn 2021-10-12 10:54:41 +08:00
parent b063b6f253
commit 1ffae5d1b0
11 changed files with 282 additions and 329 deletions

View File

@ -88,4 +88,4 @@
end()).
%% Tables
-define(RULE_TAB, emqx_rule).
-define(RULE_TAB, emqx_rule_engine).

View File

@ -3,7 +3,7 @@
[{description, "EMQ X Rule Engine"},
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel,stdlib,rulesql,getopt]},
{mod, {emqx_rule_engine_app, []}},
{env, []},

View File

@ -16,19 +16,66 @@
-module(emqx_rule_engine).
-behaviour(gen_server).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl").
-export([start_link/0]).
%% Rule Management
-export([ load_rules/0
]).
-export([ create_rule/1
, insert_rule/1
, update_rule/1
, delete_rule/1
, get_rule/1
]).
-export([ get_rules/0
, get_rules_for_topic/1
, get_rules_with_same_event/1
, get_rules_ordered_by_ts/0
]).
%% exported for cluster_call
-export([ do_delete_rule/1
, do_insert_rule/1
]).
-export([ load_hooks_for_rule/1
, unload_hooks_for_rule/1
, add_metrics_for_rule/1
, clear_metrics_for_rule/1
]).
%% gen_server Callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(RULE_ENGINE, ?MODULE).
-define(T_CALL, 10000).
%%------------------------------------------------------------------------------
%% APIs for rules and resources
%% Start the gen_server
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% APIs for rules
%%------------------------------------------------------------------------------
-spec load_rules() -> ok.
@ -39,26 +86,111 @@ load_rules() ->
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params = #{id := RuleId}) ->
case emqx_rule_registry:get_rule(RuleId) of
case get_rule(RuleId) of
not_found -> do_create_rule(Params);
{ok, _} -> {error, {already_exists, RuleId}}
end.
-spec update_rule(map()) -> {ok, rule()} | {error, term()}.
update_rule(Params = #{id := RuleId}) ->
case delete_rule(RuleId) of
ok -> do_create_rule(Params);
Error -> Error
ok = delete_rule(RuleId),
do_create_rule(Params).
-spec(delete_rule(RuleId :: rule_id()) -> ok).
delete_rule(RuleId) ->
gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL).
-spec(insert_rule(Rule :: rule()) -> ok).
insert_rule(Rule) ->
gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
%%------------------------------------------------------------------------------
%% Rule Management
%%------------------------------------------------------------------------------
-spec(get_rules() -> [rule()]).
get_rules() ->
get_all_records(?RULE_TAB).
get_rules_ordered_by_ts() ->
lists:sort(fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
CreatedA =< CreatedB
end, get_rules()).
-spec(get_rules_for_topic(Topic :: binary()) -> [rule()]).
get_rules_for_topic(Topic) ->
[Rule || Rule = #{from := From} <- get_rules(),
emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)].
-spec(get_rules_with_same_event(Topic :: binary()) -> [rule()]).
get_rules_with_same_event(Topic) ->
EventName = emqx_rule_events:event_name(Topic),
[Rule || Rule = #{from := From} <- get_rules(),
lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)].
is_of_event_name(EventName, Topic) ->
EventName =:= emqx_rule_events:event_name(Topic).
-spec(get_rule(Id :: rule_id()) -> {ok, rule()} | not_found).
get_rule(Id) ->
case ets:lookup(?RULE_TAB, Id) of
[{Id, Rule}] -> {ok, Rule#{id => Id}};
[] -> not_found
end.
-spec(delete_rule(RuleId :: rule_id()) -> ok | {error, term()}).
delete_rule(RuleId) ->
case emqx_rule_registry:get_rule(RuleId) of
{ok, Rule} ->
emqx_rule_registry:remove_rule(Rule);
not_found ->
{error, not_found}
end.
load_hooks_for_rule(#{from := Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics).
add_metrics_for_rule(#{id := Id}) ->
ok = emqx_rule_metrics:create_rule_metrics(Id).
clear_metrics_for_rule(#{id := Id}) ->
ok = emqx_rule_metrics:clear_rule_metrics(Id).
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of
[#{id := Id0}] when Id0 == Id -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic);
_ -> ok
end
end, Topics).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
ok = load_rules(),
{ok, #{}}.
handle_call({insert_rule, Rule}, _From, State) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_insert_rule, [Rule]),
{reply, ok, State};
handle_call({delete_rule, Rule}, _From, State) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_delete_rule, [Rule]),
{reply, ok, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", request => Req}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
{noreply, State}.
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", request => Info}),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Internal Functions
@ -83,11 +215,27 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
conditions => emqx_rule_sqlparser:select_where(Select)
%% -- calculated fields end
},
ok = emqx_rule_registry:add_rule(Rule),
ok = insert_rule(Rule),
{ok, Rule};
{error, Reason} -> {error, Reason}
end.
do_insert_rule(#{id := Id} = Rule) ->
ok = load_hooks_for_rule(Rule),
ok = add_metrics_for_rule(Rule),
true = ets:insert(?RULE_TAB, {Id, maps:remove(id, Rule)}),
ok.
do_delete_rule(RuleId) ->
case get_rule(RuleId) of
{ok, Rule} ->
ok = unload_hooks_for_rule(Rule),
ok = clear_metrics_for_rule(Rule),
true = ets:delete(?RULE_TAB, RuleId),
ok;
not_found -> ok
end.
parse_outputs(Outputs) ->
[do_parse_outputs(Out) || Out <- Outputs].
@ -109,3 +257,6 @@ parse_output_func(BinFunc) when is_binary(BinFunc) ->
end;
parse_output_func(Func) when is_function(Func) ->
Func.
get_all_records(Tab) ->
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].

View File

@ -141,21 +141,24 @@ put_req_schema() ->
description => <<"The outputs of the rule">>,
type => array,
items => #{
type => object,
properties => #{
type => #{
oneOf => [
#{
type => string,
enum => [<<"bridge">>, <<"builtin">>],
example => <<"builtin">>
description => <<"The channel id of an emqx bridge">>
},
target => #{
type => string,
example => <<"console">>
},
args => #{
type => object
#{
type => object,
properties => #{
function => #{
type => string,
example => <<"console">>
},
args => #{
type => object
}
}
}
}
]
}
},
description => #{
@ -241,7 +244,7 @@ list_events(#{}, _Params) ->
{200, emqx_rule_events:event_info()}.
crud_rules(get, _Params) ->
Records = emqx_rule_registry:get_rules_ordered_by_ts(),
Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)};
crud_rules(post, #{body := Params}) ->
@ -259,7 +262,7 @@ rule_test(post, #{body := Params}) ->
end).
crud_rules_by_id(get, #{bindings := #{id := Id}}) ->
case emqx_rule_registry:get_rule(Id) of
case emqx_rule_engine:get_rule(Id) of
{ok, Rule} ->
{200, format_rule_resp(Rule)};
not_found ->

View File

@ -25,9 +25,8 @@
-export([stop/1]).
start(_Type, _Args) ->
ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]),
_ = ets:new(?RULE_TAB, [named_table, public, set, {read_concurrency, true}]),
ok = emqx_rule_events:reload(),
ok = emqx_rule_engine:load_rules(),
emqx_rule_engine_sup:start_link().
stop(_State) ->

View File

@ -24,6 +24,9 @@
, roots/0
, fields/1]).
-export([ validate_sql/1
]).
namespace() -> rule_engine.
roots() -> ["rule_engine"].
@ -34,7 +37,8 @@ fields("rule_engine") ->
];
fields("rules") ->
[ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false})}
[ {"sql", sc(binary(), #{desc => "The SQL of the rule", nullable => false,
validator => fun ?MODULE:validate_sql/1})}
, {"outputs", sc(hoconsc:array(hoconsc:union(
[ binary()
, ref("builtin_output_republish")
@ -79,5 +83,11 @@ fields("republish_args") ->
default => <<"${payload}">>})}
].
validate_sql(Sql) ->
case emqx_rule_sqlparser:parse(Sql) of
{ok, _Result} -> ok;
{error, Reason} -> {error, Reason}
end.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -28,12 +28,12 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Registry = #{id => emqx_rule_registry,
start => {emqx_rule_registry, start_link, []},
Registry = #{id => emqx_rule_engine,
start => {emqx_rule_engine, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_rule_registry]},
modules => [emqx_rule_engine]},
Metrics = #{id => emqx_rule_metrics,
start => {emqx_rule_metrics, start_link, []},
restart => permanent,

View File

@ -64,7 +64,9 @@
-endif.
reload() ->
emqx_rule_registry:load_hooks_for_rules(emqx_rule_registry:get_rules()).
lists:foreach(fun(Rule) ->
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
end, emqx_rule_engine:get_rules()).
load(<<"$bridges/", _ChannelId/binary>> = BridgeTopic) ->
emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
@ -86,7 +88,7 @@ unload(Topic) ->
%% Callbacks
%%--------------------------------------------------------------------
on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) ->
case emqx_rule_registry:get_rules_for_topic(BridgeTopic) of
case emqx_rule_engine:get_rules_for_topic(BridgeTopic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, Message)
end.
@ -95,7 +97,7 @@ on_message_publish(Message = #message{topic = Topic}, _Env) ->
case ignore_sys_message(Message) of
true -> ok;
false ->
case emqx_rule_registry:get_rules_for_topic(Topic) of
case emqx_rule_engine:get_rules_for_topic(Topic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
end
@ -310,7 +312,7 @@ with_basic_columns(EventName, Data) when is_map(Data) ->
%%--------------------------------------------------------------------
apply_event(EventName, GenEventMsg, _Env) ->
EventTopic = event_topic(EventName),
case emqx_rule_registry:get_rules_for_topic(EventTopic) of
case emqx_rule_engine:get_rules_for_topic(EventTopic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
end.

View File

@ -90,17 +90,14 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
_ = emqx_broker:safe_publish(Msg),
emqx_metrics:inc_msg(Msg).
pre_process_repub_args(#{<<"topic">> := Topic} = Args) ->
QoS = maps:get(<<"qos">>, Args, <<"${qos}">>),
Retain = maps:get(<<"retain">>, Args, <<"${retain}">>),
Payload = maps:get(<<"payload">>, Args, <<"${payload}">>),
#{topic => Topic, qos => QoS, payload => Payload, retain => Retain,
preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
}}.
pre_process_repub_args(#{topic := Topic, qos := QoS, retain := Retain,
payload := Payload} = Args) ->
Args#{preprocessed_tmpl => #{
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
}}.
preproc_vars(Data) when is_binary(Data) ->
emqx_plugin_libs_rule:preproc_tmpl(Data);

View File

@ -1,216 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_registry).
-behaviour(gen_server).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl").
-export([start_link/0]).
%% Rule Management
-export([ get_rules/0
, get_rules_for_topic/1
, get_rules_with_same_event/1
, get_rules_ordered_by_ts/0
, get_rule/1
, add_rule/1
, add_rules/1
, remove_rule/1
, remove_rules/1
]).
-export([ do_remove_rules/1
, do_add_rules/1
]).
-export([ load_hooks_for_rules/1
, unload_hooks_for_rule/1
, add_metrics_for_rules/1
, clear_metrics_for_rules/1
]).
%% gen_server Callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(REGISTRY, ?MODULE).
-define(T_CALL, 10000).
%%------------------------------------------------------------------------------
%% Start the registry
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% Rule Management
%%------------------------------------------------------------------------------
-spec(get_rules() -> [rule()]).
get_rules() ->
get_all_records(?RULE_TAB).
get_rules_ordered_by_ts() ->
lists:sort(fun(#{created_at := CreatedA}, #{created_at := CreatedB}) ->
CreatedA =< CreatedB
end, get_rules()).
-spec(get_rules_for_topic(Topic :: binary()) -> [rule()]).
get_rules_for_topic(Topic) ->
[Rule || Rule = #{from := From} <- get_rules(),
emqx_plugin_libs_rule:can_topic_match_oneof(Topic, From)].
-spec(get_rules_with_same_event(Topic :: binary()) -> [rule()]).
get_rules_with_same_event(Topic) ->
EventName = emqx_rule_events:event_name(Topic),
[Rule || Rule = #{from := From} <- get_rules(),
lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)].
is_of_event_name(EventName, Topic) ->
EventName =:= emqx_rule_events:event_name(Topic).
-spec(get_rule(Id :: rule_id()) -> {ok, rule()} | not_found).
get_rule(Id) ->
case ets:lookup(?RULE_TAB, Id) of
[{Id, Rule}] -> {ok, Rule#{id => Id}};
[] -> not_found
end.
-spec(add_rule(rule()) -> ok).
add_rule(Rule) ->
add_rules([Rule]).
-spec(add_rules([rule()]) -> ok).
add_rules(Rules) ->
gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL).
-spec(remove_rule(rule() | rule_id()) -> ok).
remove_rule(RuleOrId) ->
remove_rules([RuleOrId]).
-spec(remove_rules([rule()] | list(rule_id())) -> ok).
remove_rules(Rules) ->
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
%% @private
do_add_rules([]) -> ok;
do_add_rules(Rules) ->
load_hooks_for_rules(Rules),
add_metrics_for_rules(Rules),
ets:insert(?RULE_TAB, [{Id, maps:remove(id, R)} || #{id := Id} = R <- Rules]),
ok.
%% @private
do_remove_rules([]) -> ok;
do_remove_rules(RuleIds = [Id|_]) when is_binary(Id) ->
RuleRecs =
lists:foldl(fun(RuleId, Acc) ->
case get_rule(RuleId) of
{ok, Rule} -> [Rule|Acc];
not_found -> Acc
end
end, [], RuleIds),
remove_rules_unload_hooks(RuleRecs);
do_remove_rules(Rules = [Rule|_]) when is_map(Rule) ->
remove_rules_unload_hooks(Rules).
remove_rules_unload_hooks(Rules) ->
unload_hooks_for_rule(Rules),
clear_metrics_for_rules(Rules),
lists:foreach(fun(#{id := Id}) ->
ets:delete(?RULE_TAB, Id)
end, Rules).
load_hooks_for_rules(Rules) ->
lists:foreach(fun(#{from := Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics)
end, Rules).
add_metrics_for_rules(Rules) ->
lists:foreach(fun(#{id := Id}) ->
ok = emqx_rule_metrics:create_rule_metrics(Id)
end, Rules).
clear_metrics_for_rules(Rules) ->
lists:foreach(fun(#{id := Id}) ->
ok = emqx_rule_metrics:clear_rule_metrics(Id)
end, Rules).
unload_hooks_for_rule(Rules) ->
lists:foreach(fun(#{id := Id, from := Topics}) ->
lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of
[#{id := Id0}] when Id0 == Id -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic);
_ -> ok
end
end, Topics)
end, Rules).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
{read_concurrency, true}]),
{ok, #{}}.
handle_call({add_rules, Rules}, _From, State) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_add_rules, [Rules]),
{reply, ok, State};
handle_call({remove_rules, Rules}, _From, State) ->
_ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_remove_rules, [Rules]),
{reply, ok, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", request => Req}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", request => Msg}),
{noreply, State}.
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", request => Info}),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Private functions
%%------------------------------------------------------------------------------
get_all_records(Tab) ->
[Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)].

View File

@ -162,7 +162,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(t_events, Config) ->
ets:delete(events_record_tab),
ok = emqx_rule_registry:remove_rule(?config(hook_points_rules, Config));
ok = delete_rule(?config(hook_points_rules, Config));
end_per_testcase(_TestCase, _Config) ->
ok.
@ -175,10 +175,10 @@ t_create_rule(_Config) ->
id => <<"t_create_rule">>,
outputs => [#{function => console}],
description => <<"debug rule">>}),
ct:pal("======== emqx_rule_registry:get_rules :~p", [emqx_rule_registry:get_rules()]),
ct:pal("======== emqx_rule_engine:get_rules :~p", [emqx_rule_engine:get_rules()]),
?assertMatch({ok, #{id := Id, from := [<<"t/a">>]}},
emqx_rule_registry:get_rule(Id)),
emqx_rule_registry:remove_rule(Id),
emqx_rule_engine:get_rule(Id)),
delete_rule(Id),
ok.
%%------------------------------------------------------------------------------
@ -199,34 +199,27 @@ t_kv_store(_) ->
t_add_get_remove_rule(_Config) ->
RuleId0 = <<"rule-debug-0">>,
ok = emqx_rule_registry:add_rule(make_simple_rule(RuleId0)),
?assertMatch({ok, #{id := RuleId0}}, emqx_rule_registry:get_rule(RuleId0)),
ok = emqx_rule_registry:remove_rule(RuleId0),
?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId0)),
ok = emqx_rule_engine:insert_rule(make_simple_rule(RuleId0)),
?assertMatch({ok, #{id := RuleId0}}, emqx_rule_engine:get_rule(RuleId0)),
ok = delete_rule(RuleId0),
?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId0)),
RuleId1 = <<"rule-debug-1">>,
Rule1 = make_simple_rule(RuleId1),
ok = emqx_rule_registry:add_rule(Rule1),
?assertMatch({ok, #{id := RuleId1}}, emqx_rule_registry:get_rule(RuleId1)),
ok = emqx_rule_registry:remove_rule(Rule1),
?assertEqual(not_found, emqx_rule_registry:get_rule(RuleId1)),
ok = emqx_rule_engine:insert_rule(Rule1),
?assertMatch({ok, #{id := RuleId1}}, emqx_rule_engine:get_rule(RuleId1)),
ok = delete_rule(Rule1),
?assertEqual(not_found, emqx_rule_engine:get_rule(RuleId1)),
ok.
t_add_get_remove_rules(_Config) ->
emqx_rule_registry:remove_rules(emqx_rule_registry:get_rules()),
ok = emqx_rule_registry:add_rules(
delete_rules_by_ids(emqx_rule_engine:get_rules()),
ok = insert_rules(
[make_simple_rule(<<"rule-debug-1">>),
make_simple_rule(<<"rule-debug-2">>)]),
?assertEqual(2, length(emqx_rule_registry:get_rules())),
ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]),
?assertEqual([], emqx_rule_registry:get_rules()),
Rule3 = make_simple_rule(<<"rule-debug-3">>),
Rule4 = make_simple_rule(<<"rule-debug-4">>),
ok = emqx_rule_registry:add_rules([Rule3, Rule4]),
?assertEqual(2, length(emqx_rule_registry:get_rules())),
ok = emqx_rule_registry:remove_rules([Rule3, Rule4]),
?assertEqual([], emqx_rule_registry:get_rules()),
?assertEqual(2, length(emqx_rule_engine:get_rules())),
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]),
?assertEqual([], emqx_rule_engine:get_rules()),
ok.
t_create_existing_rule(_Config) ->
@ -236,25 +229,25 @@ t_create_existing_rule(_Config) ->
sql => <<"select * from \"t/#\"">>,
outputs => [#{function => console}]
}),
{ok, #{sql := SQL}} = emqx_rule_registry:get_rule(<<"an_existing_rule">>),
{ok, #{sql := SQL}} = emqx_rule_engine:get_rule(<<"an_existing_rule">>),
?assertEqual(<<"select * from \"t/#\"">>, SQL),
ok = emqx_rule_engine:delete_rule(<<"an_existing_rule">>),
?assertEqual(not_found, emqx_rule_registry:get_rule(<<"an_existing_rule">>)),
ok = delete_rule(<<"an_existing_rule">>),
?assertEqual(not_found, emqx_rule_engine:get_rule(<<"an_existing_rule">>)),
ok.
t_get_rules_for_topic(_Config) ->
Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>)),
ok = emqx_rule_registry:add_rules(
Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>)),
ok = insert_rules(
[make_simple_rule(<<"rule-debug-1">>),
make_simple_rule(<<"rule-debug-2">>)]),
?assertEqual(Len0+2, length(emqx_rule_registry:get_rules_for_topic(<<"simple/topic">>))),
ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]),
?assertEqual(Len0+2, length(emqx_rule_engine:get_rules_for_topic(<<"simple/topic">>))),
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>]),
ok.
t_get_rules_ordered_by_ts(_Config) ->
Now = fun() -> erlang:system_time(nanosecond) end,
ok = emqx_rule_registry:add_rules(
ok = insert_rules(
[make_simple_rule_with_ts(<<"rule-debug-0">>, Now()),
make_simple_rule_with_ts(<<"rule-debug-1">>, Now()),
make_simple_rule_with_ts(<<"rule-debug-2">>, Now())
@ -263,11 +256,11 @@ t_get_rules_ordered_by_ts(_Config) ->
#{id := <<"rule-debug-0">>},
#{id := <<"rule-debug-1">>},
#{id := <<"rule-debug-2">>}
], emqx_rule_registry:get_rules_ordered_by_ts()).
], emqx_rule_engine:get_rules_ordered_by_ts()).
t_get_rules_for_topic_2(_Config) ->
Len0 = length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>)),
ok = emqx_rule_registry:add_rules(
Len0 = length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>)),
ok = insert_rules(
[make_simple_rule(<<"rule-debug-1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]),
make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [<<"simple/+/1">>]),
@ -275,21 +268,21 @@ t_get_rules_for_topic_2(_Config) ->
make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]),
make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>])
]),
?assertEqual(Len0+4, length(emqx_rule_registry:get_rules_for_topic(<<"simple/1">>))),
ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
?assertEqual(Len0+4, length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>))),
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
ok.
t_get_rules_with_same_event(_Config) ->
PubT = <<"simple/1">>,
PubN = length(emqx_rule_registry:get_rules_with_same_event(PubT)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>)),
?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>)),
ok = emqx_rule_registry:add_rules(
PubN = length(emqx_rule_engine:get_rules_with_same_event(PubT)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>)),
?assertEqual([], emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>)),
ok = insert_rules(
[make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]),
@ -301,15 +294,15 @@ t_get_rules_with_same_event(_Config) ->
make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]),
make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>])
]),
?assertEqual(PubN + 3, length(emqx_rule_registry:get_rules_with_same_event(PubT))),
?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>))),
?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>))),
?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>))),
ok = emqx_rule_registry:remove_rules([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
?assertEqual(PubN + 3, length(emqx_rule_engine:get_rules_with_same_event(PubT))),
?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_disconnected">>))),
?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_subscribed">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/session_unsubscribed">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>))),
ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
ok.
%%------------------------------------------------------------------------------
@ -405,7 +398,7 @@ t_match_atom_and_binary(_Config) ->
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
delete_rule(TopicRule).
t_sqlselect_0(_Config) ->
%% Verify SELECT with and without 'AS'
@ -524,7 +517,7 @@ t_sqlselect_01(_Config) ->
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule1).
delete_rule(TopicRule1).
t_sqlselect_02(_Config) ->
SQL = "SELECT * "
@ -562,7 +555,7 @@ t_sqlselect_02(_Config) ->
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule1).
delete_rule(TopicRule1).
t_sqlselect_1(_Config) ->
SQL = "SELECT json_decode(payload) as p, payload "
@ -592,7 +585,7 @@ t_sqlselect_1(_Config) ->
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
delete_rule(TopicRule).
t_sqlselect_2(_Config) ->
%% recursively republish to t2
@ -618,7 +611,7 @@ t_sqlselect_2(_Config) ->
received_nothing = Fun(),
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
delete_rule(TopicRule).
t_sqlselect_3(_Config) ->
%% republish the client.connected msg
@ -650,7 +643,7 @@ t_sqlselect_3(_Config) ->
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
delete_rule(TopicRule).
t_sqlparse_event_1(_Config) ->
Sql = "select topic as tp "
@ -1302,7 +1295,7 @@ republish_output(Topic) ->
republish_output(Topic, <<"${payload}">>).
republish_output(Topic, Payload) ->
#{function => republish,
args => #{<<"payload">> => Payload, <<"topic">> => Topic, <<"qos">> => 0}}.
args => #{payload => Payload, topic => Topic, qos => 0, retain => false}}.
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
SQL = <<"select * from \"simple/topic\"">>,
@ -1597,3 +1590,17 @@ deps_path(App, RelativePath) ->
local_path(RelativePath) ->
deps_path(emqx_rule_engine, RelativePath).
insert_rules(Rules) ->
lists:foreach(fun(Rule) ->
ok = emqx_rule_engine:insert_rule(Rule)
end, Rules).
delete_rules_by_ids(Ids) ->
lists:foreach(fun(Id) ->
ok = emqx_rule_engine:delete_rule(Id)
end, Ids).
delete_rule(#{id := Id}) ->
ok = emqx_rule_engine:delete_rule(Id);
delete_rule(Id) when is_binary(Id) ->
ok = emqx_rule_engine:delete_rule(Id).