diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 7b038e97a..6185efdb9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -75,6 +75,8 @@ -export([ update_rules_cache/0 , clear_rules_cache/0 + , get_rules_from_cache/0 + , update_rules_cache_locally/0 ]). %% for debug purposes @@ -465,16 +467,19 @@ delete_resource_type(Type) -> init([]) -> _TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true}, {read_concurrency, true}]), + ok = ensure_table_subscribed(), {ok, #{}}. handle_call({add_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), + %% the multicall is necessary, because the other nodes maybe running an older emqx version + %% so the table has not been subscribed + update_rules_cache_on_all_nodes(), {reply, ok, State}; handle_call({remove_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), - _ = ?CLUSTER_CALL(update_rules_cache, []), + update_rules_cache_on_all_nodes(), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -482,13 +487,25 @@ handle_call(Req, _From, State) -> {reply, ignored, State}. handle_cast(update_rules_cache, State) -> - _ = update_rules_cache(), + ok = ensure_table_subscribed(), + ok = update_rules_cache(), {noreply, State}; handle_cast(Msg, State) -> ?LOG(error, "unexpected cast ~p", [Msg]), {noreply, State}. +handle_info({mnesia_table_event, {write, _Tab, _NewRule, _OldRules, _Tid} = Event}, State) -> + ?LOG(debug, "mnesia_table_event: ~p~n", [Event]), + ok = update_rules_cache_locally(), + {noreply, State}; + +handle_info({mnesia_table_event, {Delete, _Tab, _What, _OldRules, _Tid} = Event}, State) + when Delete =:= delete; Delete =:= delete_object -> + ?LOG(debug, "mnesia_table_event: ~p~n", [Event]), + ok = update_rules_cache_locally(), + {noreply, State}; + handle_info(Info, State) -> ?LOG(error, "unexpected info ~p", [Info]), {noreply, State}. @@ -502,6 +519,20 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ %% Private functions %%------------------------------------------------------------------------------ +update_rules_cache_on_all_nodes() -> + ok = update_rules_cache(), + case ekka_mnesia:running_nodes() -- [node()] of + [] -> ok; + OtherNodes -> + _ = rpc:multicall(OtherNodes, ?MODULE, update_rules_cache_locally, [], 5000), + ok + end. + +ensure_table_subscribed() -> + case mnesia:subscribe({table, ?RULE_TAB, detailed}) of + {error, {already_exists, _}} -> ok; + {ok, _} -> ok + end. get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).