fix: don't run mutil cluster-call on one transaction

This commit is contained in:
zhongwencool 2021-09-02 14:03:46 +08:00
parent 679edbc29c
commit ee2fccac02
1 changed files with 31 additions and 18 deletions

View File

@ -220,31 +220,44 @@ remove_rules(Rules) ->
gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL). gen_server:call(?REGISTRY, {remove_rules, Rules}, ?T_CALL).
%% @private %% @private
insert_rule(Rule) -> insert_rules([]) -> ok;
_ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rule]), insert_rules(Rules) ->
mnesia:write(?RULE_TAB, Rule, write). _ = emqx_rule_utils:cluster_call(?MODULE, load_hooks_for_rule, [Rules]),
[mnesia:write(?RULE_TAB, Rule, write) ||Rule <- Rules].
%% @private %% @private
delete_rule(RuleId) when is_binary(RuleId) -> delete_rules([]) -> ok;
delete_rules(Rules = [R|_]) when is_binary(R) ->
RuleRecs =
lists:foldl(fun(RuleId, Acc) ->
case get_rule(RuleId) of case get_rule(RuleId) of
{ok, Rule} -> delete_rule(Rule); {ok, Rule} -> [Rule|Acc];
not_found -> ok not_found -> Acc
end; end
delete_rule(Rule) -> end, [], Rules),
_ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rule]), delete_rules_unload_hooks(RuleRecs);
mnesia:delete_object(?RULE_TAB, Rule, write). delete_rules(Rules = [Rule|_]) when is_record(Rule, rule) ->
delete_rules_unload_hooks(Rules).
load_hooks_for_rule(#rule{for = Topics}) -> delete_rules_unload_hooks(Rules) ->
lists:foreach(fun emqx_rule_events:load/1, Topics). _ = emqx_rule_utils:cluster_call(?MODULE, unload_hooks_for_rule, [Rules]),
[mnesia:delete_object(?RULE_TAB, Rule, write) ||Rule <- Rules].
unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> load_hooks_for_rule(Rules) ->
lists:foreach(fun(#rule{for = Topics}) ->
lists:foreach(fun emqx_rule_events:load/1, Topics)
end, Rules).
unload_hooks_for_rule(Rules) ->
lists:foreach(fun(#rule{id = Id, for = Topics}) ->
lists:foreach(fun(Topic) -> lists:foreach(fun(Topic) ->
case get_rules_with_same_event(Topic) of case get_rules_with_same_event(Topic) of
[#rule{id = Id}] -> %% we are now deleting the last rule [#rule{id = Id}] -> %% we are now deleting the last rule
emqx_rule_events:unload(Topic); emqx_rule_events:unload(Topic);
_ -> ok _ -> ok
end end
end, Topics). end, Topics)
end, Rules).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Action Management %% Action Management
@ -445,11 +458,11 @@ init([]) ->
{ok, #{}}. {ok, #{}}.
handle_call({add_rules, Rules}, _From, State) -> handle_call({add_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), trans(fun insert_rules/1, [Rules]),
{reply, ok, State}; {reply, ok, State};
handle_call({remove_rules, Rules}, _From, State) -> handle_call({remove_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), trans(fun delete_rules/1, [Rules]),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->