From 73ec8c47cc3200f7733d9d71062a3fa7a937e94e Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 28 Jun 2021 18:56:13 +0200 Subject: [PATCH] chore(rule_engine): Add an RLOG shard --- apps/emqx/include/emqx.hrl | 3 +++ apps/emqx/src/emqx_app.erl | 6 +++++- .../src/emqx_rule_registry.erl | 19 ++++++++++++++++--- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 9fe69fd30..67744306e 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -89,6 +89,9 @@ -define(ROUTE_SHARD, route_shard). + +-define(RULE_ENGINE_SHARD, emqx_rule_engine_shard). + -record(route, { topic :: binary(), dest :: node() | {binary(), node()} diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index dfdc9c0f8..06bebe465 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -28,7 +28,11 @@ -define(APP, emqx). --define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]). +-define(EMQX_SHARDS, [ ?ROUTE_SHARD + , ?COMMON_SHARD + , ?SHARED_SUB_SHARD + , ?RULE_ENGINE_SHARD + ]). -include("emqx_release.hrl"). diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index 2d029f8e3..f2d717dba 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("rule_engine.hrl"). +-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -95,6 +96,11 @@ -define(T_CALL, 10000). +-rlog_shard({?RULE_ENGINE_SHARD, ?RULE_TAB}). +-rlog_shard({?RULE_ENGINE_SHARD, ?ACTION_TAB}). +-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TAB}). +-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TYPE_TAB}). + %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -174,7 +180,7 @@ get_rules_ordered_by_ts() -> Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]), qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) end, - {atomic, List} = mnesia:transaction(F), + {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F), List. -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). @@ -471,11 +477,18 @@ code_change(_OldVsn, State, _Extra) -> get_all_records(Tab) -> %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). - ets:tab2list(Tab). + %% Wrapping ets to a r/o transaction to avoid reading inconsistent + %% data during shard bootstrap + {atomic, Ret} = + ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD, + fun() -> + ets:tab2list(Tab) + end), + Ret. trans(Fun) -> trans(Fun, []). trans(Fun, Args) -> - case mnesia:transaction(Fun, Args) of + case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of {atomic, Result} -> Result; {aborted, Reason} -> error(Reason) end.