Merge pull request #10283 from terry-xiaoyu/perf-rules

pref: cache the result of emqx_rule_registry:get_rules/0
This commit is contained in:
Xinyu Liu 2023-03-31 13:18:55 +08:00 committed by GitHub
commit 1141fcf87b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 92 additions and 33 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine,
[{description, "EMQX Rule Engine"},
{vsn, "4.4.16"}, % strict semver, bump manually!
{vsn, "4.4.17"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]},
{applications, [kernel,stdlib,rulesql,getopt,jose]},

View File

@ -1,17 +1,24 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.15",
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{"4.4.16",
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]},
{"4.4.15",
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.1[3-4]">>,
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.12",
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
@ -20,7 +27,8 @@
{update,emqx_rule_engine_jwt_sup,supervisor},
{load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
@ -238,17 +246,24 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.15",
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{"4.4.16",
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]},
{"4.4.15",
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.1[3-4]">>,
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.12",
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
@ -257,7 +272,8 @@
{update,emqx_rule_engine_jwt_sup,supervisor},
{load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}]},
{"4.4.11",
[{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
[{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},

View File

@ -29,6 +29,7 @@ start(_Type, _Args) ->
_ = emqx_rule_engine_sup:start_locker(),
ok = emqx_rule_engine:load_providers(),
ok = emqx_rule_monitor:async_refresh_resources_rules(),
ok = emqx_rule_registry:update_rules_cache(),
ok = emqx_rule_engine_cli:load(),
{ok, Sup}.

View File

@ -96,7 +96,7 @@ on_message_publish(Message = #message{flags = #{sys := true}},
#{ignore_sys_message := true}) ->
{ok, Message};
on_message_publish(Message = #message{topic = Topic}, _Env) ->
case emqx_rule_registry:get_rules_for(Topic) of
case emqx_rule_registry:get_active_rules_for(Topic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message))
end,
@ -406,10 +406,10 @@ may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) ->
{error, _Reason} ->
?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg])
end,
emqx_rule_runtime:apply_rules(emqx_rule_registry:get_rules_for(EventTopic), EventMsg);
emqx_rule_runtime:apply_rules(emqx_rule_registry:get_active_rules_for(EventTopic), EventMsg);
may_publish_and_apply(EventName, GenEventMsg, _Env) ->
EventTopic = event_topic(EventName),
case emqx_rule_registry:get_rules_for(EventTopic) of
case emqx_rule_registry:get_active_rules_for(EventTopic) of
[] -> ok;
Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
end.

View File

@ -27,6 +27,7 @@
%% Rule Management
-export([ get_rules/0
, get_rules_for/1
, get_active_rules_for/1
, get_rules_with_same_event/1
, get_rules_ordered_by_ts/0
, get_rule/1
@ -73,6 +74,10 @@
, unload_hooks_for_rule/1
]).
-export([ update_rules_cache/0
, clear_rules_cache/0
]).
%% for debug purposes
-export([dump/0]).
@ -164,24 +169,39 @@ start_link() ->
%%------------------------------------------------------------------------------
%% Rule Management
%%------------------------------------------------------------------------------
-define(PK_RULE_TAB, {?MODULE, ?RULE_TAB}).
-spec(get_rules() -> list(emqx_rule_engine:rule())).
get_rules() ->
get_all_records(?RULE_TAB).
case get_rules_from_cache() of
not_found -> get_all_records(?RULE_TAB);
CachedRules -> CachedRules
end.
get_rules_from_cache() ->
persistent_term:get(?PK_RULE_TAB, not_found).
put_rules_to_cache(Rules) ->
persistent_term:put(?PK_RULE_TAB, Rules).
update_rules_cache() ->
put_rules_to_cache(get_all_records(?RULE_TAB)).
clear_rules_cache() ->
persistent_term:erase(?PK_RULE_TAB).
get_rules_ordered_by_ts() ->
F = fun() ->
Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
end,
{atomic, List} = mnesia:transaction(F),
List.
lists:keysort(#rule.created_at, get_rules()).
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_for(Topic) ->
[Rule || Rule = #rule{for = For} <- get_rules(),
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
-spec(get_active_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_active_rules_for(Topic) ->
[Rule || Rule = #rule{enabled = true, for = For} <- get_rules(),
emqx_rule_utils:can_topic_match_oneof(Topic, For)].
-spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())).
get_rules_with_same_event(Topic) ->
EventName = emqx_rule_events:event_name(Topic),
@ -441,10 +461,12 @@ init([]) ->
handle_call({add_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun insert_rule/1, Rules]),
_ = ?CLUSTER_CALL(update_rules_cache, []),
{reply, ok, State};
handle_call({remove_rules, Rules}, _From, State) ->
trans(fun lists:foreach/2, [fun delete_rule/1, Rules]),
_ = ?CLUSTER_CALL(update_rules_cache, []),
{reply, ok, State};
handle_call(Req, _From, State) ->

View File

@ -21,6 +21,11 @@
[error] Accept error on 0.0.0.0:1883: EMFILE (Too many open files)
```
- Improve the performance of the rule engine when there are many rules [#10283](https://github.com/emqx/emqx/pull/10283)
Before this change, when there were many rules, the execution of the rule engine became a bottleneck, it consumed a lot of CPU time on rule queries and matches.
In this optimization, the efficiency of rule execution in this scenario was greatly improved by simply adding a cache to the rule list.
In our tests (with over 700 rules), the rule engine achieved a performance improvement of about 5 times after applying this optimization.
## Bug fixes
- Fix that `Erlang distribution` can't use TLS [#9981](https://github.com/emqx/emqx/pull/9981).

View File

@ -20,6 +20,11 @@
[error] Accept error on 0.0.0.0:1883: EMFILE (Too many open files)
```
- 提升规则数量较多时规则引擎的执行性能 [#10283](https://github.com/emqx/emqx/pull/10283)
在此改动之前,当规则数量比较多的时候,规则引擎的执行会成为瓶颈,规则引擎将耗费大量 CPU 在规则的查询和匹配上。
本次优化中,通过简单地给规则列表添加一个缓存,大幅提升了此场景下的规则执行效率。
在我们的测试中700多条规则应用此优化后规则引擎获得了 5 倍左右的性能提升。
## 修复
- 修复 `Erlang distribution` 无法使用 TLS 的问题 [#9981](https://github.com/emqx/emqx/pull/9981)。
@ -33,4 +38,4 @@
问题详情见 [#9409](https://github.com/emqx/emqx/issues/9409)。
在此修复中,我们给 `emqx_retainer` 插件创建了单独的进程池,从而避免了该问题。
- 修复了 Helm Chart 中模板文件路径的错误。[#10229](https://github.com/emqx/emqx/pull/10229)
- 修复了 Helm Chart 中模板文件路径的错误。[#10229](https://github.com/emqx/emqx/pull/10229)

View File

@ -2,19 +2,22 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.16",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.15",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.14",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_keepalive,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -28,7 +31,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]},
{"4.4.13",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_keepalive,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -43,7 +47,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.12",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_keepalive,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -522,19 +527,22 @@
[gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{<<".*">>,[]}],
[{"4.4.16",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.15",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.14",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_keepalive,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -548,7 +556,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]},
{"4.4.13",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_keepalive,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
@ -563,7 +572,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.12",
[{load_module,emqx_pool,brutal_purge,soft_purge,[]},
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_pool,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_keepalive,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},