fix: make rule cache update more reliable
This commit is contained in:
parent
a2dae3425f
commit
c1e47914a6
|
@ -75,6 +75,8 @@
|
|||
|
||||
-export([ update_rules_cache/0
|
||||
, clear_rules_cache/0
|
||||
, get_rules_from_cache/0
|
||||
, update_rules_cache_locally/0
|
||||
]).
|
||||
|
||||
%% for debug purposes
|
||||
|
@ -465,16 +467,19 @@ delete_resource_type(Type) ->
|
|||
init([]) ->
|
||||
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||
{read_concurrency, true}]),
|
||||
ok = ensure_table_subscribed(),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call({add_rules, Rules}, _From, State) ->
|
||||
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||
%% the multicall is necessary, because the other nodes maybe running an older emqx version
|
||||
%% so the table has not been subscribed
|
||||
update_rules_cache_on_all_nodes(),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call({remove_rules, Rules}, _From, State) ->
|
||||
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||
update_rules_cache_on_all_nodes(),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
|
@ -482,13 +487,25 @@ handle_call(Req, _From, State) ->
|
|||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(update_rules_cache, State) ->
|
||||
_ = update_rules_cache(),
|
||||
ok = ensure_table_subscribed(),
|
||||
ok = update_rules_cache(),
|
||||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "unexpected cast ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, _Tab, _NewRule, _OldRules, _Tid} = Event}, State) ->
|
||||
?LOG(debug, "mnesia_table_event: ~p~n", [Event]),
|
||||
ok = update_rules_cache_locally(),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({mnesia_table_event, {Delete, _Tab, _What, _OldRules, _Tid} = Event}, State)
|
||||
when Delete =:= delete; Delete =:= delete_object ->
|
||||
?LOG(debug, "mnesia_table_event: ~p~n", [Event]),
|
||||
ok = update_rules_cache_locally(),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "unexpected info ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
@ -502,6 +519,20 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
%% Private functions
|
||||
%%------------------------------------------------------------------------------
|
||||
update_rules_cache_on_all_nodes() ->
|
||||
ok = update_rules_cache(),
|
||||
case ekka_mnesia:running_nodes() -- [node()] of
|
||||
[] -> ok;
|
||||
OtherNodes ->
|
||||
_ = rpc:multicall(OtherNodes, ?MODULE, update_rules_cache_locally, [], 5000),
|
||||
ok
|
||||
end.
|
||||
|
||||
ensure_table_subscribed() ->
|
||||
case mnesia:subscribe({table, ?RULE_TAB, detailed}) of
|
||||
{error, {already_exists, _}} -> ok;
|
||||
{ok, _} -> ok
|
||||
end.
|
||||
|
||||
get_all_records(Tab) ->
|
||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||
|
|
Loading…
Reference in New Issue