diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 46fb20666..3621a2442 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -406,8 +406,9 @@ refresh_rules() -> end end, emqx_rule_registry:get_rules()). -refresh_rule(#rule{id = RuleId, actions = Actions}) -> +refresh_rule(#rule{id = RuleId, for = Topics, actions = Actions}) -> ok = emqx_rule_metrics:create_rule_metrics(RuleId), + lists:foreach(fun emqx_rule_events:load/1, Topics), refresh_actions(Actions). -spec(refresh_resource_status() -> ok). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index b244f8323..ad9e5ba37 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -24,8 +24,6 @@ -export([stop/1]). --define(APP, emqx_rule_engine). - start(_Type, _Args) -> {ok, Sup} = emqx_rule_engine_sup:start_link(), _ = emqx_rule_engine_sup:start_locker(), @@ -33,13 +31,8 @@ start(_Type, _Args) -> ok = emqx_rule_engine:refresh_resources(), ok = emqx_rule_engine:refresh_rules(), ok = emqx_rule_engine_cli:load(), - ok = emqx_rule_events:load(env()), {ok, Sup}. stop(_State) -> - ok = emqx_rule_events:unload(env()), + ok = emqx_rule_events:unload(), ok = emqx_rule_engine_cli:unload(). - -env() -> - application:get_all_env(?APP) - . diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index c6df377bb..fc3096f6a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -16,12 +16,14 @@ -module(emqx_rule_events). +-include("rule_engine.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -logger_header("[RuleEvents]"). -export([ load/1 + , unload/0 , unload/1 , event_name/1 , eventmsg_publish/1 @@ -60,16 +62,22 @@ ]). -endif. -load(Env) -> - lists:foreach( - fun(HookPoint) -> - ok = emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [hook_conf(HookPoint, Env)]}) - end, ?SUPPORTED_HOOK). +load(Topic) -> + HookPoint = event_name(Topic), + emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), + [hook_conf(HookPoint, env())]}). -unload(_Env) -> - [emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) - || HookPoint <- ?SUPPORTED_HOOK], - ok. +unload() -> + lists:foreach(fun(HookPoint) -> + emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}) + end, ?SUPPORTED_HOOK). + +unload(Topic) -> + HookPoint = event_name(Topic), + emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}). + +env() -> + application:get_all_env(?APP). %%-------------------------------------------------------------------- %% Callbacks @@ -574,17 +582,19 @@ reason(_) -> internal_error. ntoa(undefined) -> undefined; ntoa({IpAddr, Port}) -> - iolist_to_binary([inet:ntoa(IpAddr),":",integer_to_list(Port)]); + iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; -event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed'; +event_name(<<"$events/session_unsubscribed", _/binary>>) -> + 'session.unsubscribed'; event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; -event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'. +event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; +event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 0e5c81d4a..3e4e4b74c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -26,6 +26,7 @@ %% Rule Management -export([ get_rules/0 , get_rules_for/1 + , get_rules_with_same_event/1 , get_rule/1 , add_rule/1 , add_rules/1 @@ -91,6 +92,8 @@ , {?RES_TAB, 'resources.count', 'resources.max'} ]). +-define(T_CALL, 10000). + %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -170,6 +173,15 @@ get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), emqx_rule_utils:can_topic_match_oneof(Topic, For)]. +-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). +get_rules_with_same_event(Topic) -> + EventName = emqx_rule_events:event_name(Topic), + [Rule || Rule = #rule{for = For} <- get_rules(), + lists:any(fun(T) -> is_of_event_name(EventName, T) end, For)]. + +is_of_event_name(EventName, Topic) -> + EventName =:= emqx_rule_events:event_name(Topic). + -spec(get_rule(Id :: rule_id()) -> {ok, emqx_rule_engine:rule()} | not_found). get_rule(Id) -> case mnesia:dirty_read(?RULE_TAB, Id) of @@ -179,22 +191,23 @@ get_rule(Id) -> -spec(add_rule(emqx_rule_engine:rule()) -> ok). add_rule(Rule) when is_record(Rule, rule) -> - trans(fun insert_rule/1, [Rule]). + add_rules([Rule]). -spec(add_rules(list(emqx_rule_engine:rule())) -> ok). add_rules(Rules) -> - trans(fun lists:foreach/2, [fun insert_rule/1, Rules]). + gen_server:call(?REGISTRY, {add_rules, Rules}, ?T_CALL). -spec(remove_rule(emqx_rule_engine:rule() | rule_id()) -> ok). remove_rule(RuleOrId) -> - trans(fun delete_rule/1, [RuleOrId]). + remove_rules([RuleOrId]). -spec(remove_rules(list(emqx_rule_engine:rule()) | list(rule_id())) -> ok). remove_rules(Rules) -> - trans(fun lists:foreach/2, [fun delete_rule/1, Rules]). + gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). %% @private -insert_rule(Rule = #rule{}) -> +insert_rule(Rule = #rule{for = Topics}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -203,7 +216,14 @@ delete_rule(RuleId) when is_binary(RuleId) -> {ok, Rule} -> delete_rule(Rule); not_found -> ok end; -delete_rule(Rule = #rule{}) when is_record(Rule, rule) -> +delete_rule(Rule = #rule{id = Id, for = Topics}) -> + lists:foreach(fun(Topic) -> + case get_rules_with_same_event(Topic) of + [#rule{id = Id}] -> %% we are now deleting the last rule + emqx_rule_events:unload(Topic); + _ -> ok + end + end, Topics), mnesia:delete_object(?RULE_TAB, Rule, write). %%------------------------------------------------------------------------------ @@ -391,6 +411,14 @@ init([]) -> {read_concurrency, true}]), {ok, #{}}. +handle_call({add_rules, Rules}, _From, State) -> + trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), + {reply, ok, State}; + +handle_call({remove_rules, Rules}, _From, State) -> + trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + {reply, ok, State}; + handle_call(Req, _From, State) -> ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), {reply, ignored, State}. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index b6e930011..ff78275c0 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -79,6 +79,7 @@ groups() -> t_update_rule, t_get_rules_for, t_get_rules_for_2, + t_get_rules_with_same_event, t_add_get_remove_action, t_add_get_remove_actions, t_remove_actions_of, @@ -714,6 +715,39 @@ t_get_rules_for_2(_Config) -> ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), ok. +t_get_rules_with_same_event(_Config) -> + PubT = <<"simple/1">>, + PubN = length(emqx_rule_registry:get_rules_with_same_event(PubT)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>)), + ?assertEqual([], emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>)), + ok = emqx_rule_registry:add_rules( + [make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), + make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]), + make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]), + make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]), + make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]), + make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]), + make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]), + make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]), + make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]), + make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]) + ]), + ?assertEqual(PubN + 3, length(emqx_rule_registry:get_rules_with_same_event(PubT))), + ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_connected">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/client_disconnected">>))), + ?assertEqual(2, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_subscribed">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/session_unsubscribed">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_delivered">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_acked">>))), + ?assertEqual(1, length(emqx_rule_registry:get_rules_with_same_event(<<"$events/message_dropped">>))), + ok = emqx_rule_registry:remove_rules([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]), + ok. + t_add_get_remove_action(_Config) -> ActionName0 = 'action-debug-0', Action0 = make_simple_action(ActionName0),