diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index e315403ca..9e228622e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQX Rule Engine"}, - {vsn, "4.4.16"}, % strict semver, bump manually! + {vsn, "4.4.17"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]}, {applications, [kernel,stdlib,rulesql,getopt,jose]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 2b0493d3b..b2071bb45 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,17 +1,24 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.15", - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{"4.4.16", + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]}, + {"4.4.15", + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.1[3-4]">>, - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, @@ -20,7 +27,8 @@ {update,emqx_rule_engine_jwt_sup,supervisor}, {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, @@ -238,17 +246,24 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.15", - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{"4.4.16", + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}]}, + {"4.4.15", + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {<<"4\\.4\\.1[3-4]">>, - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, @@ -257,7 +272,8 @@ {update,emqx_rule_engine_jwt_sup,supervisor}, {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 4d35ce5c2..941ec59f6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -29,6 +29,7 @@ start(_Type, _Args) -> _ = emqx_rule_engine_sup:start_locker(), ok = emqx_rule_engine:load_providers(), ok = emqx_rule_monitor:async_refresh_resources_rules(), + ok = emqx_rule_registry:update_rules_cache(), ok = emqx_rule_engine_cli:load(), {ok, Sup}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 5406845d8..f5c18d639 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -96,7 +96,7 @@ on_message_publish(Message = #message{flags = #{sys := true}}, #{ignore_sys_message := true}) -> {ok, Message}; on_message_publish(Message = #message{topic = Topic}, _Env) -> - case emqx_rule_registry:get_rules_for(Topic) of + case emqx_rule_registry:get_active_rules_for(Topic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, eventmsg_publish(Message)) end, @@ -406,10 +406,10 @@ may_publish_and_apply(EventName, GenEventMsg, #{enabled := true, qos := QoS}) -> {error, _Reason} -> ?LOG(error, "Failed to encode event msg for ~p, msg: ~p", [EventName, EventMsg]) end, - emqx_rule_runtime:apply_rules(emqx_rule_registry:get_rules_for(EventTopic), EventMsg); + emqx_rule_runtime:apply_rules(emqx_rule_registry:get_active_rules_for(EventTopic), EventMsg); may_publish_and_apply(EventName, GenEventMsg, _Env) -> EventTopic = event_topic(EventName), - case emqx_rule_registry:get_rules_for(EventTopic) of + case emqx_rule_registry:get_active_rules_for(EventTopic) of [] -> ok; Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg()) end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 75532a92a..0391e8f59 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -27,6 +27,7 @@ %% Rule Management -export([ get_rules/0 , get_rules_for/1 + , get_active_rules_for/1 , get_rules_with_same_event/1 , get_rules_ordered_by_ts/0 , get_rule/1 @@ -73,6 +74,10 @@ , unload_hooks_for_rule/1 ]). +-export([ update_rules_cache/0 + , clear_rules_cache/0 + ]). + %% for debug purposes -export([dump/0]). @@ -164,24 +169,39 @@ start_link() -> %%------------------------------------------------------------------------------ %% Rule Management %%------------------------------------------------------------------------------ - +-define(PK_RULE_TAB, {?MODULE, ?RULE_TAB}). -spec(get_rules() -> list(emqx_rule_engine:rule())). get_rules() -> - get_all_records(?RULE_TAB). + case get_rules_from_cache() of + not_found -> get_all_records(?RULE_TAB); + CachedRules -> CachedRules + end. + +get_rules_from_cache() -> + persistent_term:get(?PK_RULE_TAB, not_found). + +put_rules_to_cache(Rules) -> + persistent_term:put(?PK_RULE_TAB, Rules). + +update_rules_cache() -> + put_rules_to_cache(get_all_records(?RULE_TAB)). + +clear_rules_cache() -> + persistent_term:erase(?PK_RULE_TAB). get_rules_ordered_by_ts() -> - F = fun() -> - Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]), - qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) - end, - {atomic, List} = mnesia:transaction(F), - List. + lists:keysort(#rule.created_at, get_rules()). -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), emqx_rule_utils:can_topic_match_oneof(Topic, For)]. +-spec(get_active_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). +get_active_rules_for(Topic) -> + [Rule || Rule = #rule{enabled = true, for = For} <- get_rules(), + emqx_rule_utils:can_topic_match_oneof(Topic, For)]. + -spec(get_rules_with_same_event(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_with_same_event(Topic) -> EventName = emqx_rule_events:event_name(Topic), @@ -441,10 +461,12 @@ init([]) -> handle_call({add_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun insert_rule/1, Rules]), + _ = ?CLUSTER_CALL(update_rules_cache, []), {reply, ok, State}; handle_call({remove_rules, Rules}, _From, State) -> trans(fun lists:foreach/2, [fun delete_rule/1, Rules]), + _ = ?CLUSTER_CALL(update_rules_cache, []), {reply, ok, State}; handle_call(Req, _From, State) -> diff --git a/changes/v4.4.17-en.md b/changes/v4.4.17-en.md index ab4fcf701..4044b6bdf 100644 --- a/changes/v4.4.17-en.md +++ b/changes/v4.4.17-en.md @@ -21,6 +21,11 @@ [error] Accept error on 0.0.0.0:1883: EMFILE (Too many open files) ``` +- Improve the performance of the rule engine when there are many rules [#10283](https://github.com/emqx/emqx/pull/10283) + Before this change, when there were many rules, the execution of the rule engine became a bottleneck, it consumed a lot of CPU time on rule queries and matches. + In this optimization, the efficiency of rule execution in this scenario was greatly improved by simply adding a cache to the rule list. + In our tests (with over 700 rules), the rule engine achieved a performance improvement of about 5 times after applying this optimization. + ## Bug fixes - Fix that `Erlang distribution` can't use TLS [#9981](https://github.com/emqx/emqx/pull/9981). diff --git a/changes/v4.4.17-zh.md b/changes/v4.4.17-zh.md index 42f84ac6e..34a19d23e 100644 --- a/changes/v4.4.17-zh.md +++ b/changes/v4.4.17-zh.md @@ -20,6 +20,11 @@ [error] Accept error on 0.0.0.0:1883: EMFILE (Too many open files) ``` +- 提升规则数量较多时规则引擎的执行性能 [#10283](https://github.com/emqx/emqx/pull/10283) + 在此改动之前,当规则数量比较多的时候,规则引擎的执行会成为瓶颈,规则引擎将耗费大量 CPU 在规则的查询和匹配上。 + 本次优化中,通过简单地给规则列表添加一个缓存,大幅提升了此场景下的规则执行效率。 + 在我们的测试中(700多条规则),应用此优化后规则引擎获得了 5 倍左右的性能提升。 + ## 修复 - 修复 `Erlang distribution` 无法使用 TLS 的问题 [#9981](https://github.com/emqx/emqx/pull/9981)。 @@ -33,4 +38,4 @@ 问题详情见 [#9409](https://github.com/emqx/emqx/issues/9409)。 在此修复中,我们给 `emqx_retainer` 插件创建了单独的进程池,从而避免了该问题。 -- 修复了 Helm Chart 中模板文件路径的错误。[#10229](https://github.com/emqx/emqx/pull/10229) \ No newline at end of file +- 修复了 Helm Chart 中模板文件路径的错误。[#10229](https://github.com/emqx/emqx/pull/10229) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 96e8a8a17..47de7747d 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,19 +2,22 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.16", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.15", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.14", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -28,7 +31,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -43,7 +47,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -522,19 +527,22 @@ [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {<<".*">>,[]}], [{"4.4.16", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.15", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.14", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -548,7 +556,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, @@ -563,7 +572,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_pool,brutal_purge,soft_purge,[]}, + [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_pool,brutal_purge,soft_purge,[]}, {load_module,emqx_vm,brutal_purge,soft_purge,[]}, {load_module,emqx_keepalive,brutal_purge,soft_purge,[]}, {load_module,emqx_broker,brutal_purge,soft_purge,[]},