From 64309a193be697b4cb62565cda0ad14c040d0dbe Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 3 Apr 2023 10:38:45 +0800 Subject: [PATCH] refactor: use mneisa:subscribe/1 for rule operations --- .../src/emqx_rule_registry.erl | 71 +++++++++++++++---- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index fd73ba25a..328f7052d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -21,6 +21,8 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-logger_header("[RuleRegistry] "). + -export([start_link/0]). %% Rule Management @@ -242,7 +244,6 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> - _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -252,7 +253,6 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> - _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -261,7 +261,9 @@ load_hooks_for_rule(#rule{for = Topics}) -> unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of - [#rule{id = Id}] -> %% we are now deleting the last rule + [] -> %% no rules left, we can safely unload the hook + emqx_rule_events:unload(Topic); + [#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end @@ -463,20 +465,29 @@ delete_resource_type(Type) -> init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), + {ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}), {ok, #{}}. -handle_call({add_rules, Rules}, _From, State) -> +handle_call({add_rules, []}, _From, State) -> + {reply, ok, State}; +handle_call({add_rules, Rules}, From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), - {reply, ok, State}; + %% won't reply until the mnesia_table_event is received. + {noreply, State#{ + callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From) + }}; -handle_call({remove_rules, Rules}, _From, State) -> - trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), +handle_call({remove_rules, []}, _From, State) -> {reply, ok, State}; +handle_call({remove_rules, Rules}, From, State) -> + trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + %% won't reply until the mnesia_table_event is received. + {noreply, State#{ + callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From) + }}; handle_call(Req, _From, State) -> - ?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]), + ?LOG(error, "unexpected call - ~p", [Req]), {reply, ignored, State}. handle_cast(update_rules_cache, State) -> @@ -484,11 +495,31 @@ handle_cast(update_rules_cache, State) -> {noreply, State}; handle_cast(Msg, State) -> - ?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]), + ?LOG(error, "unexpected cast ~p", [Msg]), {noreply, State}. +handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) -> + ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), + ok = load_hooks_for_rule(NewRule), + ok = update_rules_cache(), + {noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)}; + +handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State) + when Delete =:= delete; Delete =:= delete_object -> + {noreply, State}; + +handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State) + when Delete =:= delete; Delete =:= delete_object -> + ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), + ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules), + ok = update_rules_cache(), + NewState = lists:foldr(fun(R, AccState) -> + maybe_reply_callers(callers_delete, R#rule.id, AccState) + end, State, OldRules), + {noreply, NewState}; + handle_info(Info, State) -> - ?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]), + ?LOG(error, "unexpected info ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -501,6 +532,22 @@ code_change(_OldVsn, State, _Extra) -> %% Private functions %%------------------------------------------------------------------------------ +add_caller(OldCallers, Rules, From) -> + RuleIds = lists:map(fun + (#rule{id = Id}) -> Id; + (Id) when is_binary(Id) -> Id + end, Rules), + maps:merge(OldCallers, maps:from_keys(RuleIds, From)). + +maybe_reply_callers(OperType, RuleId, State) -> + case maps:take(RuleId, maps:get(OperType, State, #{})) of + {Caller, NewState} -> + gen_server:reply(Caller, ok), + NewState; + error -> + State + end. + get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). ets:tab2list(Tab).