diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index e78f73c1e..37bca84aa 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -244,6 +244,7 @@ remove_rules(Rules) -> %% @private insert_rule(Rule) -> + _ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]), mnesia:write(?RULE_TAB, Rule, write). %% @private @@ -253,6 +254,7 @@ delete_rule(RuleId) when is_binary(RuleId) -> not_found -> ok end; delete_rule(Rule) -> + _ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]), mnesia:delete_object(?RULE_TAB, Rule, write). load_hooks_for_rule(#rule{for = Topics}) -> @@ -263,7 +265,7 @@ unload_hooks_for_rule(#rule{id = Id, for = Topics}) -> case get_rules_with_same_event(Topic) of [] -> %% no rules left, we can safely unload the hook emqx_rule_events:unload(Topic); - [#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule + [#rule{id = Id0}] when Id0 =:= Id -> %% we are now deleting the last rule emqx_rule_events:unload(Topic); _ -> ok end @@ -465,26 +467,17 @@ delete_resource_type(Type) -> init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), - {ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}), {ok, #{}}. -handle_call({add_rules, []}, _From, State) -> - {reply, ok, State}; -handle_call({add_rules, Rules}, From, State) -> +handle_call({add_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), - %% won't reply until the mnesia_table_event is received. - {noreply, State#{ - callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From) - }}; - -handle_call({remove_rules, []}, _From, State) -> + _ = ?CLUSTER_CALL(update_rules_cache, []), {reply, ok, State}; -handle_call({remove_rules, Rules}, From, State) -> + +handle_call({remove_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), - %% won't reply until the mnesia_table_event is received. - {noreply, State#{ - callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From) - }}; + _ = ?CLUSTER_CALL(update_rules_cache, []), + {reply, ok, State}; handle_call(Req, _From, State) -> ?LOG(error, "unexpected call - ~p", [Req]), @@ -498,26 +491,6 @@ handle_cast(Msg, State) -> ?LOG(error, "unexpected cast ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) -> - ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), - ok = load_hooks_for_rule(NewRule), - ok = update_rules_cache(), - {noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)}; - -handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State) - when Delete =:= delete; Delete =:= delete_object -> - {noreply, State}; - -handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State) - when Delete =:= delete; Delete =:= delete_object -> - ?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]), - ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules), - ok = update_rules_cache(), - NewState = lists:foldr(fun(R, AccState) -> - maybe_reply_callers(callers_delete, R#rule.id, AccState) - end, State, OldRules), - {noreply, NewState}; - handle_info(Info, State) -> ?LOG(error, "unexpected info ~p", [Info]), {noreply, State}. @@ -532,22 +505,6 @@ code_change(_OldVsn, State, _Extra) -> %% Private functions %%------------------------------------------------------------------------------ -add_caller(OldCallers, Rules, From) -> - RuleIds = lists:map(fun - (#rule{id = Id}) -> Id; - (Id) when is_binary(Id) -> Id - end, Rules), - maps:merge(OldCallers, maps:from_keys(RuleIds, From)). - -maybe_reply_callers(OperType, RuleId, State) -> - case maps:take(RuleId, maps:get(OperType, State, #{})) of - {Caller, NewState} -> - gen_server:reply(Caller, ok), - NewState; - error -> - State - end. - get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). ets:tab2list(Tab).