Merge pull request #10303 from terry-xiaoyu/perf-rules
Improve the rule_cache
This commit is contained in:
commit
074f36cc6a
|
@ -3,12 +3,16 @@
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.4.16",
|
[{"4.4.16",
|
||||||
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.4.15",
|
{"4.4.15",
|
||||||
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{<<"4\\.4\\.1[3-4]">>,
|
{<<"4\\.4\\.1[3-4]">>,
|
||||||
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
@ -56,7 +60,8 @@
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.4.9",
|
{"4.4.9",
|
||||||
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{add_module,emqx_rule_engine_jwt},
|
{add_module,emqx_rule_engine_jwt},
|
||||||
|
@ -113,7 +118,8 @@
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.4.5",
|
{"4.4.5",
|
||||||
[{add_module,emqx_rule_engine_jwt},
|
[{add_module,emqx_rule_engine_jwt},
|
||||||
{add_module,emqx_rule_engine_jwt_worker},
|
{add_module,emqx_rule_engine_jwt_worker},
|
||||||
|
@ -221,7 +227,8 @@
|
||||||
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
|
||||||
{add_module,emqx_rule_date},
|
{add_module,emqx_rule_date},
|
||||||
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.4.0",
|
{"4.4.0",
|
||||||
[{add_module,emqx_rule_engine_jwt},
|
[{add_module,emqx_rule_engine_jwt},
|
||||||
{add_module,emqx_rule_engine_jwt_worker},
|
{add_module,emqx_rule_engine_jwt_worker},
|
||||||
|
@ -248,12 +255,16 @@
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.4.16",
|
[{"4.4.16",
|
||||||
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.4.15",
|
{"4.4.15",
|
||||||
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{<<"4\\.4\\.1[3-4]">>,
|
{<<"4\\.4\\.1[3-4]">>,
|
||||||
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -20,7 +20,8 @@
|
||||||
|
|
||||||
-include("rule_engine.hrl").
|
-include("rule_engine.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
|
||||||
|
-logger_header("[RuleRegistry] ").
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
@ -166,6 +167,10 @@ dump() ->
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
|
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%% Use a single process to protect the cache updating to avoid race conditions
|
||||||
|
update_rules_cache_locally() ->
|
||||||
|
gen_server:cast(?REGISTRY, update_rules_cache).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Rule Management
|
%% Rule Management
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -173,7 +178,9 @@ start_link() ->
|
||||||
-spec(get_rules() -> list(emqx_rule_engine:rule())).
|
-spec(get_rules() -> list(emqx_rule_engine:rule())).
|
||||||
get_rules() ->
|
get_rules() ->
|
||||||
case get_rules_from_cache() of
|
case get_rules_from_cache() of
|
||||||
not_found -> get_all_records(?RULE_TAB);
|
not_found ->
|
||||||
|
update_rules_cache_locally(),
|
||||||
|
get_all_records(?RULE_TAB);
|
||||||
CachedRules -> CachedRules
|
CachedRules -> CachedRules
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -187,7 +194,8 @@ update_rules_cache() ->
|
||||||
put_rules_to_cache(get_all_records(?RULE_TAB)).
|
put_rules_to_cache(get_all_records(?RULE_TAB)).
|
||||||
|
|
||||||
clear_rules_cache() ->
|
clear_rules_cache() ->
|
||||||
persistent_term:erase(?PK_RULE_TAB).
|
_ = persistent_term:erase(?PK_RULE_TAB),
|
||||||
|
ok.
|
||||||
|
|
||||||
get_rules_ordered_by_ts() ->
|
get_rules_ordered_by_ts() ->
|
||||||
lists:keysort(#rule.created_at, get_rules()).
|
lists:keysort(#rule.created_at, get_rules()).
|
||||||
|
@ -236,7 +244,6 @@ remove_rules(Rules) ->
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
insert_rule(Rule) ->
|
insert_rule(Rule) ->
|
||||||
_ = ?CLUSTER_CALL(load_hooks_for_rule, [Rule]),
|
|
||||||
mnesia:write(?RULE_TAB, Rule, write).
|
mnesia:write(?RULE_TAB, Rule, write).
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -246,7 +253,6 @@ delete_rule(RuleId) when is_binary(RuleId) ->
|
||||||
not_found -> ok
|
not_found -> ok
|
||||||
end;
|
end;
|
||||||
delete_rule(Rule) ->
|
delete_rule(Rule) ->
|
||||||
_ = ?CLUSTER_CALL(unload_hooks_for_rule, [Rule]),
|
|
||||||
mnesia:delete_object(?RULE_TAB, Rule, write).
|
mnesia:delete_object(?RULE_TAB, Rule, write).
|
||||||
|
|
||||||
load_hooks_for_rule(#rule{for = Topics}) ->
|
load_hooks_for_rule(#rule{for = Topics}) ->
|
||||||
|
@ -255,7 +261,9 @@ load_hooks_for_rule(#rule{for = Topics}) ->
|
||||||
unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
|
unload_hooks_for_rule(#rule{id = Id, for = Topics}) ->
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
case get_rules_with_same_event(Topic) of
|
case get_rules_with_same_event(Topic) of
|
||||||
[#rule{id = Id}] -> %% we are now deleting the last rule
|
[] -> %% no rules left, we can safely unload the hook
|
||||||
|
emqx_rule_events:unload(Topic);
|
||||||
|
[#rule{id = Id0}] when Id0 =:= Id-> %% we are now deleting the last rule
|
||||||
emqx_rule_events:unload(Topic);
|
emqx_rule_events:unload(Topic);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end
|
end
|
||||||
|
@ -457,28 +465,61 @@ delete_resource_type(Type) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
_TableId = ets:new(?KV_TAB, [named_table, set, public, {write_concurrency, true},
|
||||||
{read_concurrency, true}]),
|
{read_concurrency, true}]),
|
||||||
|
{ok, _} = mnesia:subscribe({table, ?RULE_TAB, detailed}),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call({add_rules, Rules}, _From, State) ->
|
handle_call({add_rules, []}, _From, State) ->
|
||||||
|
{reply, ok, State};
|
||||||
|
handle_call({add_rules, Rules}, From, State) ->
|
||||||
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
||||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
%% won't reply until the mnesia_table_event is received.
|
||||||
{reply, ok, State};
|
{noreply, State#{
|
||||||
|
callers_create => add_caller(maps:get(callers_create, State, #{}), Rules, From)
|
||||||
|
}};
|
||||||
|
|
||||||
handle_call({remove_rules, Rules}, _From, State) ->
|
handle_call({remove_rules, []}, _From, State) ->
|
||||||
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
|
||||||
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
handle_call({remove_rules, Rules}, From, State) ->
|
||||||
|
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
||||||
|
%% won't reply until the mnesia_table_event is received.
|
||||||
|
{noreply, State#{
|
||||||
|
callers_delete => add_caller(maps:get(callers_delete, State, #{}), Rules, From)
|
||||||
|
}};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "[RuleRegistry]: unexpected call - ~p", [Req]),
|
?LOG(error, "unexpected call - ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast(update_rules_cache, State) ->
|
||||||
|
_ = update_rules_cache(),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
?LOG(error, "[RuleRegistry]: unexpected cast ~p", [Msg]),
|
?LOG(error, "unexpected cast ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info({mnesia_table_event, {write, Tab, NewRule, _OldRules, _Tid} = Event}, State) ->
|
||||||
|
?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]),
|
||||||
|
ok = load_hooks_for_rule(NewRule),
|
||||||
|
ok = update_rules_cache(),
|
||||||
|
{noreply, maybe_reply_callers(callers_create, NewRule#rule.id, State)};
|
||||||
|
|
||||||
|
handle_info({mnesia_table_event, {Delete, _Tab, _What, [], _Tid}}, State)
|
||||||
|
when Delete =:= delete; Delete =:= delete_object ->
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({mnesia_table_event, {Delete, Tab, _What, OldRules, _Tid} = Event}, State)
|
||||||
|
when Delete =:= delete; Delete =:= delete_object ->
|
||||||
|
?LOG(debug, "mnesia_table_event on tab: ~p, event: ~p~n", [Tab, Event]),
|
||||||
|
ok = lists:foreach(fun unload_hooks_for_rule/1, OldRules),
|
||||||
|
ok = update_rules_cache(),
|
||||||
|
NewState = lists:foldr(fun(R, AccState) ->
|
||||||
|
maybe_reply_callers(callers_delete, R#rule.id, AccState)
|
||||||
|
end, State, OldRules),
|
||||||
|
{noreply, NewState};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?LOG(error, "[RuleRegistry]: unexpected info ~p", [Info]),
|
?LOG(error, "unexpected info ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
@ -491,6 +532,22 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Private functions
|
%% Private functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
add_caller(OldCallers, Rules, From) ->
|
||||||
|
RuleIds = lists:map(fun
|
||||||
|
(#rule{id = Id}) -> Id;
|
||||||
|
(Id) when is_binary(Id) -> Id
|
||||||
|
end, Rules),
|
||||||
|
maps:merge(OldCallers, maps:from_keys(RuleIds, From)).
|
||||||
|
|
||||||
|
maybe_reply_callers(OperType, RuleId, State) ->
|
||||||
|
case maps:take(RuleId, maps:get(OperType, State, #{})) of
|
||||||
|
{Caller, NewState} ->
|
||||||
|
gen_server:reply(Caller, ok),
|
||||||
|
NewState;
|
||||||
|
error ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
get_all_records(Tab) ->
|
get_all_records(Tab) ->
|
||||||
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
|
||||||
ets:tab2list(Tab).
|
ets:tab2list(Tab).
|
||||||
|
|
|
@ -77,7 +77,6 @@ qlc_modules0() ->
|
||||||
"apps/emqx_management/src/emqx_mgmt_api.erl",
|
"apps/emqx_management/src/emqx_mgmt_api.erl",
|
||||||
"apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl",
|
"apps/emqx_auth_mnesia/src/emqx_auth_mnesia_api.erl",
|
||||||
"apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl",
|
"apps/emqx_auth_mnesia/src/emqx_acl_mnesia_db.erl",
|
||||||
"apps/emqx_rule_engine/src/emqx_rule_registry.erl",
|
|
||||||
"lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl",
|
"lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl",
|
||||||
"lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl"
|
"lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_agent.erl"
|
||||||
].
|
].
|
||||||
|
|
Loading…
Reference in New Issue