From 7dbbef30c9c9165c5e4be74df1933c34e880f05d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Mar 2023 13:51:30 +0800 Subject: [PATCH 1/3] fix: remove emqx_rule_registry from the qlc_mod list --- .../src/emqx_rule_engine.appup.src | 37 +++++++++++++++---- .../src/emqx_rule_registry.erl | 1 - scripts/update_appup.escript | 1 - 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index b2071bb45..3c677abe5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,16 +3,22 @@ {VSN, [{"4.4.16", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}} + ]}, {"4.4.15", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}]}, {<<"4\\.4\\.1[3-4]">>, [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, @@ -20,6 +26,7 @@ [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -35,6 +42,7 @@ {update,emqx_rule_engine_jwt_sup,supervisor}, {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}}, @@ -56,7 +64,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}]}, {"4.4.9", [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_engine_jwt}, @@ -74,6 +83,7 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", @@ -92,6 +102,7 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, @@ -113,7 +124,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}]}, {"4.4.5", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -130,6 +142,7 @@ {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -151,6 +164,7 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, @@ -175,6 +189,7 @@ {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.2", [{add_module,emqx_rule_engine_jwt}, @@ -194,6 +209,7 @@ {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -221,7 +237,9 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}} + ]}, {"4.4.0", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -241,6 +259,7 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {apply,{emqx_rule_registry,update_rules_cache,[]}}, {update,emqx_rule_metrics,{advanced,["4.4.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -248,12 +267,16 @@ {<<".*">>,[]}], [{"4.4.16", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.15", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {<<"4\\.4\\.1[3-4]">>, [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 0391e8f59..36950da87 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -20,7 +20,6 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("stdlib/include/qlc.hrl"). -export([start_link/0]). diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 07f38873d..c5ae8c870 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -77,7 +77,6 @@ qlc_modules0() -> "apps/emqx_management/src/emqx_mgmt_api.erl", "apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl", "apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl", - "apps/emqx_rule_engine/src/emqx_rule_registry.erl", "lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl", "lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl" ]. From e33a5bfc890b04e77246251a4f3f1e70420b2e5c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Mar 2023 14:17:06 +0800 Subject: [PATCH 2/3] fix: update rule_cache when get_rules/0 --- .../src/emqx_rule_engine.appup.src | 32 ++++++------------- .../src/emqx_rule_registry.erl | 15 +++++++-- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 3c677abe5..b2a339fe5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -4,21 +4,19 @@ [{"4.4.16", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}} - ]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.15", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {<<"4\\.4\\.1[3-4]">>, [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, @@ -26,7 +24,6 @@ [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, @@ -42,7 +39,6 @@ {update,emqx_rule_engine_jwt_sup,supervisor}, {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}}, @@ -64,8 +60,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.9", [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_engine_jwt}, @@ -83,7 +79,6 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.8", @@ -102,7 +97,6 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, @@ -124,8 +118,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.5", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -142,7 +136,6 @@ {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -164,7 +157,6 @@ {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]}, @@ -189,7 +181,6 @@ {add_module,emqx_rule_date}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.2", [{add_module,emqx_rule_engine_jwt}, @@ -209,7 +200,6 @@ {load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, @@ -237,9 +227,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}} - ]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.0", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -259,7 +248,6 @@ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {apply,{emqx_rule_registry,update_rules_cache,[]}}, {update,emqx_rule_metrics,{advanced,["4.4.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 36950da87..fd73ba25a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -165,6 +165,10 @@ dump() -> start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). +%% Use a single process to protect the cache updating to avoid race conditions +update_rules_cache_locally() -> + gen_server:cast(?REGISTRY, update_rules_cache). + %%------------------------------------------------------------------------------ %% Rule Management %%------------------------------------------------------------------------------ @@ -172,7 +176,9 @@ start_link() -> -spec(get_rules() -> list(emqx_rule_engine:rule())). get_rules() -> case get_rules_from_cache() of - not_found -> get_all_records(?RULE_TAB); + not_found -> + update_rules_cache_locally(), + get_all_records(?RULE_TAB); CachedRules -> CachedRules end. @@ -186,7 +192,8 @@ update_rules_cache() -> put_rules_to_cache(get_all_records(?RULE_TAB)). clear_rules_cache() -> - persistent_term:erase(?PK_RULE_TAB). + _ = persistent_term:erase(?PK_RULE_TAB), + ok. get_rules_ordered_by_ts() -> lists:keysort(#rule.created_at, get_rules()). @@ -472,6 +479,10 @@ handle_call(Req, _From, State) -> ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), {reply, ignored, State}. +handle_cast(update_rules_cache, State) -> + _ = update_rules_cache(), + {noreply, State}; + handle_cast(Msg, State) -> ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), {noreply, State}. From 64309a193be697b4cb62565cda0ad14c040d0dbe Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 3 Apr 2023 10:38:45 +0800 Subject: [PATCH 3/3] refactor: use mneisa:subscribe/1 for rule operations --- .../src/emqx_rule_registry.erl | 71 +++++++++++++++---- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index fd73ba25a..328f7052d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -21,6 +21,8 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-logger_header("[RuleRegistry] "). + -export([start_link/0]). %% Rule Management @@ -242,7 +244,6 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -252,7 +253,6 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -261,7 +261,9 @@ load_hooks_for_rule(#rule{for = Topics}) -> unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of - [#rule{id = Id}] -> %% we are now deleting the last rule + [] -> %% no rules left, we can safely unload the hook + emqx_rule_events:unload(Topic); + [#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end @@ -463,20 +465,29 @@ delete_resource_type(Type) -> init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), + {ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}), {ok, #{}}. -handle_call({add_rules, Rules}, _From, State) -> +handle_call({add_rules, []}, _From, State) -> + {reply, ok, State}; +handle_call({add_rules, Rules}, From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), - {reply, ok, State}; + %% won't reply until the mnesia_table_event is received. + {noreply, State#{ + callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From) + }}; -handle_call({remove_rules, Rules}, _From, State) -> - trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), +handle_call({remove_rules, []}, _From, State) -> {reply, ok, State}; +handle_call({remove_rules, Rules}, From, State) -> + trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + %% won't reply until the mnesia_table_event is received. + {noreply, State#{ + callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From) + }}; handle_call(Req, _From, State) -> - ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), + ?LOG(error, "unexpected call - ~p", [Req]), {reply, ignored, State}. handle_cast(update_rules_cache, State) -> @@ -484,11 +495,31 @@ handle_cast(update_rules_cache, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), + ?LOG(error, "unexpected cast ~p", [Msg]), {noreply, State}. +handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) -> + ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), + ok = load_hooks_for_rule(NewRule), + ok = update_rules_cache(), + {noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)}; + +handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State) + when Delete =:= delete; Delete =:= delete_object -> + {noreply, State}; + +handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State) + when Delete =:= delete; Delete =:= delete_object -> + ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), + ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules), + ok = update_rules_cache(), + NewState = lists:foldr(fun(R, AccState) -> + maybe_reply_callers(callers_delete, R#rule.id, AccState) + end, State, OldRules), + {noreply, NewState}; + handle_info(Info, State) -> - ?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), + ?LOG(error, "unexpected info ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -501,6 +532,22 @@ code_change(_OldVsn, State, _Extra) -> %% Private functions %%------------------------------------------------------------------------------ +add_caller(OldCallers, Rules, From) -> + RuleIds = lists:map(fun + (#rule{id = Id}) -> Id; + (Id) when is_binary(Id) -> Id + end, Rules), + maps:merge(OldCallers, maps:from_keys(RuleIds, From)). + +maybe_reply_callers(OperType, RuleId, State) -> + case maps:take(RuleId, maps:get(OperType, State, #{})) of + {Caller, NewState} -> + gen_server:reply(Caller, ok), + NewState; + error -> + State + end. + get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). ets:tab2list(Tab).