perf: cache the result of emqx_rule_registry:get_rules/0
This commit is contained in:
parent
1a46add3e6
commit
8511dbfde3
|
@ -29,6 +29,7 @@ start(_Type, _Args) ->
|
||||||
_ = emqx_rule_engine_sup:start_locker(),
|
_ = emqx_rule_engine_sup:start_locker(),
|
||||||
ok = emqx_rule_engine:load_providers(),
|
ok = emqx_rule_engine:load_providers(),
|
||||||
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
ok = emqx_rule_monitor:async_refresh_resources_rules(),
|
||||||
|
ok = emqx_rule_registry:update_rules_cache(),
|
||||||
ok = emqx_rule_engine_cli:load(),
|
ok = emqx_rule_engine_cli:load(),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ on_message_publish(Message = #message{flags = #{sys := true}},
|
||||||
#{ignore_sys_message := true}) ->
|
#{ignore_sys_message := true}) ->
|
||||||
{ok, Message};
|
{ok, Message};
|
||||||
on_message_publish(Message = #message{topic = Topic}, _Env) ->
|
on_message_publish(Message = #message{topic = Topic}, _Env) ->
|
||||||
case emqx_rule_registry:get_rules_for(Topic) of
|
case emqx_rule_registry:get_active_rules_for(Topic) of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
|
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
|
||||||
end,
|
end,
|
||||||
|
@ -406,10 +406,10 @@ may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) ->
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg])
|
?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg])
|
||||||
end,
|
end,
|
||||||
emqx_rule_runtime:apply_rules(emqx_rule_registry:get_rules_for(EventTopic), EventMsg);
|
emqx_rule_runtime:apply_rules(emqx_rule_registry:get_active_rules_for(EventTopic), EventMsg);
|
||||||
may_publish_and_apply(EventName, GenEventMsg, _Env) ->
|
may_publish_and_apply(EventName, GenEventMsg, _Env) ->
|
||||||
EventTopic = event_topic(EventName),
|
EventTopic = event_topic(EventName),
|
||||||
case emqx_rule_registry:get_rules_for(EventTopic) of
|
case emqx_rule_registry:get_active_rules_for(EventTopic) of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
|
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
%% Rule Management
|
%% Rule Management
|
||||||
-export([ get_rules/0
|
-export([ get_rules/0
|
||||||
, get_rules_for/1
|
, get_rules_for/1
|
||||||
|
, get_active_rules_for/1
|
||||||
, get_rules_with_same_event/1
|
, get_rules_with_same_event/1
|
||||||
, get_rules_ordered_by_ts/0
|
, get_rules_ordered_by_ts/0
|
||||||
, get_rule/1
|
, get_rule/1
|
||||||
|
@ -73,6 +74,10 @@
|
||||||
, unload_hooks_for_rule/1
|
, unload_hooks_for_rule/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ update_rules_cache/0
|
||||||
|
, clear_rules_cache/0
|
||||||
|
]).
|
||||||
|
|
||||||
%% for debug purposes
|
%% for debug purposes
|
||||||
-export([dump/0]).
|
-export([dump/0]).
|
||||||
|
|
||||||
|
@ -164,24 +169,39 @@ start_link() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Rule Management
|
%% Rule Management
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
-define(PK_RULE_TAB, {?MODULE, ?RULE_TAB}).
|
||||||
-spec(get_rules() -> list(emqx_rule_engine:rule())).
|
-spec(get_rules() -> list(emqx_rule_engine:rule())).
|
||||||
get_rules() ->
|
get_rules() ->
|
||||||
get_all_records(?RULE_TAB).
|
case get_rules_from_cache() of
|
||||||
|
not_found -> get_all_records(?RULE_TAB);
|
||||||
|
CachedRules -> CachedRules
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_rules_from_cache() ->
|
||||||
|
persistent_term:get(?PK_RULE_TAB, not_found).
|
||||||
|
|
||||||
|
put_rules_to_cache(Rules) ->
|
||||||
|
persistent_term:put(?PK_RULE_TAB, Rules).
|
||||||
|
|
||||||
|
update_rules_cache() ->
|
||||||
|
put_rules_to_cache(get_all_records(?RULE_TAB)).
|
||||||
|
|
||||||
|
clear_rules_cache() ->
|
||||||
|
persistent_term:erase(?PK_RULE_TAB).
|
||||||
|
|
||||||
get_rules_ordered_by_ts() ->
|
get_rules_ordered_by_ts() ->
|
||||||
F = fun() ->
|
lists:keysort(#rule.created_at, get_rules()).
|
||||||
Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
|
|
||||||
qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
|
|
||||||
end,
|
|
||||||
{atomic, List} = mnesia:transaction(F),
|
|
||||||
List.
|
|
||||||
|
|
||||||
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
||||||
get_rules_for(Topic) ->
|
get_rules_for(Topic) ->
|
||||||
[Rule || Rule = #rule{for = For} <- get_rules(),
|
[Rule || Rule = #rule{for = For} <- get_rules(),
|
||||||
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
|
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
|
||||||
|
|
||||||
|
-spec(get_active_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
||||||
|
get_active_rules_for(Topic) ->
|
||||||
|
[Rule || Rule = #rule{enabled = true, for = For} <- get_rules(),
|
||||||
|
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
|
||||||
|
|
||||||
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
||||||
get_rules_with_same_event(Topic) ->
|
get_rules_with_same_event(Topic) ->
|
||||||
EventName = emqx_rule_events:event_name(Topic),
|
EventName = emqx_rule_events:event_name(Topic),
|
||||||
|
@ -441,10 +461,12 @@ init([]) ->
|
||||||
|
|
||||||
handle_call({add_rules, Rules}, _From, State) ->
|
handle_call({add_rules, Rules}, _From, State) ->
|
||||||
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
|
||||||
|
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call({remove_rules, Rules}, _From, State) ->
|
handle_call({remove_rules, Rules}, _From, State) ->
|
||||||
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
|
||||||
|
_ = ?CLUSTER_CALL(update_rules_cache, []),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
|
Loading…
Reference in New Issue