From c0a55344c5eeb113f09a8561c294e63f7b814fd8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 10 Apr 2023 23:21:26 +0800 Subject: [PATCH] revert: Revert "refactor: use mneisa:subscribe/1 for rule operations" This reverts commit 64309a193be697b4cb62565cda0ad14c040d0dbe. For relup from old versions, the emqx_rule_registry has not subscribes the mnesia events, so we cannot update caches only when mnesia events received. For rolling upgrade the nodes one by one, some of the nodes has not subscribes the mnesia events, so we cannot move the `load_hooks_for_rule` into the mnesia events handling. In a word, it is too complicated to use mnesia events, we use RPC instead. --- .../src/emqx_rule_registry.erl | 61 +++---------------- 1 file changed, 9 insertions(+), 52 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index e78f73c1e..37bca84aa 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -244,6 +244,7 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> + _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -253,6 +254,7 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> + _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -263,7 +265,7 @@ unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> case get_rules_with_same_event(Topic) of [] -> %% no rules left, we can safely unload the hook emqx_rule_events:unload(Topic); - [#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule + [#rule{id = Id0}] when Id0 =:= Id -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end @@ -465,26 +467,17 @@ delete_resource_type(Type) -> init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), - {ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}), {ok, #{}}. -handle_call({add_rules, []}, _From, State) -> - {reply, ok, State}; -handle_call({add_rules, Rules}, From, State) -> +handle_call({add_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), - %% won't reply until the mnesia_table_event is received. - {noreply, State#{ - callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From) - }}; - -handle_call({remove_rules, []}, _From, State) -> + _ = ?CLUSTER_CALL(update_rules_cache, []), {reply, ok, State}; -handle_call({remove_rules, Rules}, From, State) -> + +handle_call({remove_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), - %% won't reply until the mnesia_table_event is received. - {noreply, State#{ - callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From) - }}; + _ = ?CLUSTER_CALL(update_rules_cache, []), + {reply, ok, State}; handle_call(Req, _From, State) -> ?LOG(error, "unexpected call - ~p", [Req]), @@ -498,26 +491,6 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected cast ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) -> - ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), - ok = load_hooks_for_rule(NewRule), - ok = update_rules_cache(), - {noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)}; - -handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State) - when Delete =:= delete; Delete =:= delete_object -> - {noreply, State}; - -handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State) - when Delete =:= delete; Delete =:= delete_object -> - ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), - ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules), - ok = update_rules_cache(), - NewState = lists:foldr(fun(R, AccState) -> - maybe_reply_callers(callers_delete, R#rule.id, AccState) - end, State, OldRules), - {noreply, NewState}; - handle_info(Info, State) -> ?LOG(error, "unexpected info ~p", [Info]), {noreply, State}. @@ -532,22 +505,6 @@ code_change(_OldVsn, State, _Extra) -> %% Private functions %%------------------------------------------------------------------------------ -add_caller(OldCallers, Rules, From) -> - RuleIds = lists:map(fun - (#rule{id = Id}) -> Id; - (Id) when is_binary(Id) -> Id - end, Rules), - maps:merge(OldCallers, maps:from_keys(RuleIds, From)). - -maybe_reply_callers(OperType, RuleId, State) -> - case maps:take(RuleId, maps:get(OperType, State, #{})) of - {Caller, NewState} -> - gen_server:reply(Caller, ok), - NewState; - error -> - State - end. - get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). ets:tab2list(Tab).