From ee2fccac02c79d54283a9c9dc1659ab4f52c3722 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 2 Sep 2021 14:03:46 +0800 Subject: [PATCH] fix: don't run mutil cluster-call on one transaction --- .../src/emqx_rule_registry.erl | 49 ++++++++++++------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 096534585..1ae16df90 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -220,31 +220,44 @@ remove_rules(Rules) -> gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). %% @private -insert_rule(Rule) -> - _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), - mnesia:write(?RULE_TAB, Rule, write). +insert_rules([]) -> ok; +insert_rules(Rules) -> + _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rules]), + [mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules]. %% @private -delete_rule(RuleId) when is_binary(RuleId) -> - case get_rule(RuleId) of - {ok, Rule} -> delete_rule(Rule); - not_found -> ok - end; -delete_rule(Rule) -> - _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), - mnesia:delete_object(?RULE_TAB, Rule, write). +delete_rules([]) -> ok; +delete_rules(Rules = [R|_]) when is_binary(R) -> + RuleRecs = + lists:foldl(fun(RuleId, Acc) -> + case get_rule(RuleId) of + {ok, Rule} -> [Rule|Acc]; + not_found -> Acc + end + end, [], Rules), + delete_rules_unload_hooks(RuleRecs); +delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) -> + delete_rules_unload_hooks(Rules). -load_hooks_for_rule(#rule{for = Topics}) -> - lists:foreach(fun emqx_rule_events:load/1, Topics). +delete_rules_unload_hooks(Rules) -> + _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rules]), + [mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules]. -unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> - lists:foreach(fun(Topic) -> +load_hooks_for_rule(Rules) -> + lists:foreach(fun(#rule{for = Topics}) -> + lists:foreach(fun emqx_rule_events:load/1, Topics) + end, Rules). + +unload_hooks_for_rule(Rules) -> + lists:foreach(fun(#rule{id = Id, for = Topics}) -> + lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of [#rule{id = Id}] -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end - end, Topics). + end, Topics) + end, Rules). %%------------------------------------------------------------------------------ %% Action Management @@ -445,11 +458,11 @@ init([]) -> {ok, #{}}. handle_call({add_rules, Rules}, _From, State) -> - trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), + trans(fun insert_rules/1, [Rules]), {reply, ok, State}; handle_call({remove_rules, Rules}, _From, State) -> - trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + trans(fun delete_rules/1, [Rules]), {reply, ok, State}; handle_call(Req, _From, State) ->