diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index b2071bb45..b2a339fe5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -3,12 +3,16 @@ {VSN, [{"4.4.16", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.15", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {<<"4\\.4\\.1[3-4]">>, [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, @@ -56,7 +60,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.9", [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_engine_jwt}, @@ -113,7 +118,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.5", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -221,7 +227,8 @@ {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {add_module,emqx_rule_date}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.0", [{add_module,emqx_rule_engine_jwt}, {add_module,emqx_rule_engine_jwt_worker}, @@ -248,12 +255,16 @@ {<<".*">>,[]}], [{"4.4.16", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {"4.4.15", [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + ]}, {<<"4\\.4\\.1[3-4]">>, [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 0391e8f59..328f7052d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -20,7 +20,8 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("stdlib/include/qlc.hrl"). + +-logger_header("[RuleRegistry] "). -export([start_link/0]). @@ -166,6 +167,10 @@ dump() -> start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). +%% Use a single process to protect the cache updating to avoid race conditions +update_rules_cache_locally() -> + gen_server:cast(?REGISTRY, update_rules_cache). + %%------------------------------------------------------------------------------ %% Rule Management %%------------------------------------------------------------------------------ @@ -173,7 +178,9 @@ start_link() -> -spec(get_rules() -> list(emqx_rule_engine:rule())). get_rules() -> case get_rules_from_cache() of - not_found -> get_all_records(?RULE_TAB); + not_found -> + update_rules_cache_locally(), + get_all_records(?RULE_TAB); CachedRules -> CachedRules end. @@ -187,7 +194,8 @@ update_rules_cache() -> put_rules_to_cache(get_all_records(?RULE_TAB)). clear_rules_cache() -> - persistent_term:erase(?PK_RULE_TAB). + _ = persistent_term:erase(?PK_RULE_TAB), + ok. get_rules_ordered_by_ts() -> lists:keysort(#rule.created_at, get_rules()). @@ -236,7 +244,6 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -246,7 +253,6 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -255,7 +261,9 @@ load_hooks_for_rule(#rule{for = Topics}) -> unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of - [#rule{id = Id}] -> %% we are now deleting the last rule + [] -> %% no rules left, we can safely unload the hook + emqx_rule_events:unload(Topic); + [#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end @@ -457,28 +465,61 @@ delete_resource_type(Type) -> init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), + {ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}), {ok, #{}}. -handle_call({add_rules, Rules}, _From, State) -> +handle_call({add_rules, []}, _From, State) -> + {reply, ok, State}; +handle_call({add_rules, Rules}, From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), - {reply, ok, State}; + %% won't reply until the mnesia_table_event is received. + {noreply, State#{ + callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From) + }}; -handle_call({remove_rules, Rules}, _From, State) -> - trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), +handle_call({remove_rules, []}, _From, State) -> {reply, ok, State}; +handle_call({remove_rules, Rules}, From, State) -> + trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + %% won't reply until the mnesia_table_event is received. + {noreply, State#{ + callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From) + }}; handle_call(Req, _From, State) -> - ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), + ?LOG(error, "unexpected call - ~p", [Req]), {reply, ignored, State}. +handle_cast(update_rules_cache, State) -> + _ = update_rules_cache(), + {noreply, State}; + handle_cast(Msg, State) -> - ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), + ?LOG(error, "unexpected cast ~p", [Msg]), {noreply, State}. +handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) -> + ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), + ok = load_hooks_for_rule(NewRule), + ok = update_rules_cache(), + {noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)}; + +handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State) + when Delete =:= delete; Delete =:= delete_object -> + {noreply, State}; + +handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State) + when Delete =:= delete; Delete =:= delete_object -> + ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), + ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules), + ok = update_rules_cache(), + NewState = lists:foldr(fun(R, AccState) -> + maybe_reply_callers(callers_delete, R#rule.id, AccState) + end, State, OldRules), + {noreply, NewState}; + handle_info(Info, State) -> - ?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), + ?LOG(error, "unexpected info ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -491,6 +532,22 @@ code_change(_OldVsn, State, _Extra) -> %% Private functions %%------------------------------------------------------------------------------ +add_caller(OldCallers, Rules, From) -> + RuleIds = lists:map(fun + (#rule{id = Id}) -> Id; + (Id) when is_binary(Id) -> Id + end, Rules), + maps:merge(OldCallers, maps:from_keys(RuleIds, From)). + +maybe_reply_callers(OperType, RuleId, State) -> + case maps:take(RuleId, maps:get(OperType, State, #{})) of + {Caller, NewState} -> + gen_server:reply(Caller, ok), + NewState; + error -> + State + end. + get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). ets:tab2list(Tab). diff --git a/scripts/update_appup.escript b/scripts/update_appup.escript index 07f38873d..c5ae8c870 100755 --- a/scripts/update_appup.escript +++ b/scripts/update_appup.escript @@ -77,7 +77,6 @@ qlc_modules0() -> "apps/emqx_management/src/emqx_mgmt_api.erl", "apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl", "apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl", - "apps/emqx_rule_engine/src/emqx_rule_registry.erl", "lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl", "lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl" ].