refactor: use mneisa:subscribe/1 for rule operations
This commit is contained in:
parent
e33a5bfc89
commit
64309a193b
|
@ -21,6 +21,8 @@
|
|||
-include("rule_engine.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-logger_header("[RuleRegistry] ").
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
%% Rule Management
|
||||
|
@ -242,7 +244,6 @@ remove_rules(Rules) ->
|
|||
|
||||
%% @private
|
||||
insert_rule(Rule) ->
|
||||
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
|
||||
mnesia:write(?RULE_TAB, Rule, write).
|
||||
|
||||
%% @private
|
||||
|
@ -252,7 +253,6 @@ delete_rule(RuleId) when is_binary(RuleId) ->
|
|||
not_found -> ok
|
||||
end;
|
||||
delete_rule(Rule) ->
|
||||
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
|
||||
mnesia:delete_object(?RULE_TAB, Rule, write).
|
||||
|
||||
load_hooks_for_rule(#rule{for = Topics}) ->
|
||||
|
@ -261,7 +261,9 @@ load_hooks_for_rule(#rule{for = Topics}) ->
|
|||
unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
|
||||
lists:foreach(fun(Topic) ->
|
||||
case get_rules_with_same_event(Topic) of
|
||||
[#rule{id = Id}] -> %% we are now deleting the last rule
|
||||
[] -> %% no rules left, we can safely unload the hook
|
||||
emqx_rule_events:unload(Topic);
|
||||
[#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule
|
||||
emqx_rule_events:unload(Topic);
|
||||
_ -> ok
|
||||
end
|
||||
|
@ -463,20 +465,29 @@ delete_resource_type(Type) ->
|
|||
init([]) ->
|
||||
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||
{read_concurrency, true}]),
|
||||
{ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call({add_rules, Rules}, _From, State) ->
|
||||
handle_call({add_rules, []}, _From, State) ->
|
||||
{reply, ok, State};
|
||||
handle_call({add_rules, Rules}, From, State) ->
|
||||
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||
{reply, ok, State};
|
||||
%% won't reply until the mnesia_table_event is received.
|
||||
{noreply, State#{
|
||||
callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From)
|
||||
}};
|
||||
|
||||
handle_call({remove_rules, Rules}, _From, State) ->
|
||||
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||
handle_call({remove_rules, []}, _From, State) ->
|
||||
{reply, ok, State};
|
||||
handle_call({remove_rules, Rules}, From, State) ->
|
||||
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
||||
%% won't reply until the mnesia_table_event is received.
|
||||
{noreply, State#{
|
||||
callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From)
|
||||
}};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]),
|
||||
?LOG(error, "unexpected call - ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(update_rules_cache, State) ->
|
||||
|
@ -484,11 +495,31 @@ handle_cast(update_rules_cache, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]),
|
||||
?LOG(error, "unexpected cast ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) ->
|
||||
?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]),
|
||||
ok = load_hooks_for_rule(NewRule),
|
||||
ok = update_rules_cache(),
|
||||
{noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)};
|
||||
|
||||
handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State)
|
||||
when Delete =:= delete; Delete =:= delete_object ->
|
||||
{noreply, State};
|
||||
|
||||
handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State)
|
||||
when Delete =:= delete; Delete =:= delete_object ->
|
||||
?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]),
|
||||
ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules),
|
||||
ok = update_rules_cache(),
|
||||
NewState = lists:foldr(fun(R, AccState) ->
|
||||
maybe_reply_callers(callers_delete, R#rule.id, AccState)
|
||||
end, State, OldRules),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]),
|
||||
?LOG(error, "unexpected info ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
@ -501,6 +532,22 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Private functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
add_caller(OldCallers, Rules, From) ->
|
||||
RuleIds = lists:map(fun
|
||||
(#rule{id = Id}) -> Id;
|
||||
(Id) when is_binary(Id) -> Id
|
||||
end, Rules),
|
||||
maps:merge(OldCallers, maps:from_keys(RuleIds, From)).
|
||||
|
||||
maybe_reply_callers(OperType, RuleId, State) ->
|
||||
case maps:take(RuleId, maps:get(OperType, State, #{})) of
|
||||
{Caller, NewState} ->
|
||||
gen_server:reply(Caller, ok),
|
||||
NewState;
|
||||
error ->
|
||||
State
|
||||
end.
|
||||
|
||||
get_all_records(Tab) ->
|
||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||
ets:tab2list(Tab).
|
||||
|
|
Loading…
Reference in New Issue