revert: Revert "refactor: use mneisa:subscribe/1 for rule operations"
This reverts commit 64309a193b
.
For relup from old versions, the emqx_rule_registry has not subscribes
the mnesia events, so we cannot update caches only when mnesia events
received.
For rolling upgrade the nodes one by one, some of the nodes has not
subscribes the mnesia events, so we cannot move the `load_hooks_for_rule`
into the mnesia events handling.
In a word, it is too complicated to use mnesia events, we use RPC instead.
This commit is contained in:
parent
891ed4bfad
commit
c0a55344c5
|
@ -244,6 +244,7 @@ remove_rules(Rules) ->
|
|||
|
||||
%% @private
|
||||
insert_rule(Rule) ->
|
||||
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
|
||||
mnesia:write(?RULE_TAB, Rule, write).
|
||||
|
||||
%% @private
|
||||
|
@ -253,6 +254,7 @@ delete_rule(RuleId) when is_binary(RuleId) ->
|
|||
not_found -> ok
|
||||
end;
|
||||
delete_rule(Rule) ->
|
||||
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
|
||||
mnesia:delete_object(?RULE_TAB, Rule, write).
|
||||
|
||||
load_hooks_for_rule(#rule{for = Topics}) ->
|
||||
|
@ -263,7 +265,7 @@ unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
|
|||
case get_rules_with_same_event(Topic) of
|
||||
[] -> %% no rules left, we can safely unload the hook
|
||||
emqx_rule_events:unload(Topic);
|
||||
[#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule
|
||||
[#rule{id = Id0}] when Id0 =:= Id -> %% we are now deleting the last rule
|
||||
emqx_rule_events:unload(Topic);
|
||||
_ -> ok
|
||||
end
|
||||
|
@ -465,26 +467,17 @@ delete_resource_type(Type) ->
|
|||
init([]) ->
|
||||
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||
{read_concurrency, true}]),
|
||||
{ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call({add_rules, []}, _From, State) ->
|
||||
{reply, ok, State};
|
||||
handle_call({add_rules, Rules}, From, State) ->
|
||||
handle_call({add_rules, Rules}, _From, State) ->
|
||||
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
||||
%% won't reply until the mnesia_table_event is received.
|
||||
{noreply, State#{
|
||||
callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From)
|
||||
}};
|
||||
|
||||
handle_call({remove_rules, []}, _From, State) ->
|
||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||
{reply, ok, State};
|
||||
handle_call({remove_rules, Rules}, From, State) ->
|
||||
|
||||
handle_call({remove_rules, Rules}, _From, State) ->
|
||||
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
||||
%% won't reply until the mnesia_table_event is received.
|
||||
{noreply, State#{
|
||||
callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From)
|
||||
}};
|
||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "unexpected call - ~p", [Req]),
|
||||
|
@ -498,26 +491,6 @@ handle_cast(Msg, State) ->
|
|||
?LOG(error, "unexpected cast ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) ->
|
||||
?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]),
|
||||
ok = load_hooks_for_rule(NewRule),
|
||||
ok = update_rules_cache(),
|
||||
{noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)};
|
||||
|
||||
handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State)
|
||||
when Delete =:= delete; Delete =:= delete_object ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State)
|
||||
when Delete =:= delete; Delete =:= delete_object ->
|
||||
?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]),
|
||||
ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules),
|
||||
ok = update_rules_cache(),
|
||||
NewState = lists:foldr(fun(R, AccState) ->
|
||||
maybe_reply_callers(callers_delete, R#rule.id, AccState)
|
||||
end, State, OldRules),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "unexpected info ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
@ -532,22 +505,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Private functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
add_caller(OldCallers, Rules, From) ->
|
||||
RuleIds = lists:map(fun
|
||||
(#rule{id = Id}) -> Id;
|
||||
(Id) when is_binary(Id) -> Id
|
||||
end, Rules),
|
||||
maps:merge(OldCallers, maps:from_keys(RuleIds, From)).
|
||||
|
||||
maybe_reply_callers(OperType, RuleId, State) ->
|
||||
case maps:take(RuleId, maps:get(OperType, State, #{})) of
|
||||
{Caller, NewState} ->
|
||||
gen_server:reply(Caller, ok),
|
||||
NewState;
|
||||
error ->
|
||||
State
|
||||
end.
|
||||
|
||||
get_all_records(Tab) ->
|
||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||
ets:tab2list(Tab).
|
||||
|
|
Loading…
Reference in New Issue