Merge pull request #5124 from k32/dev/rule-engine-shard

chore(rule_engine): Add an RLOG shard
This commit is contained in:
k32 2021-06-28 21:27:18 +02:00 committed by GitHub
commit 2bea95ff6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 4 deletions

View File

@ -89,6 +89,9 @@
-define(ROUTE_SHARD, route_shard). -define(ROUTE_SHARD, route_shard).
-define(RULE_ENGINE_SHARD, emqx_rule_engine_shard).
-record(route, { -record(route, {
topic :: binary(), topic :: binary(),
dest :: node() | {binary(), node()} dest :: node() | {binary(), node()}

View File

@ -28,7 +28,11 @@
-define(APP, emqx). -define(APP, emqx).
-define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]). -define(EMQX_SHARDS, [ ?ROUTE_SHARD
, ?COMMON_SHARD
, ?SHARED_SUB_SHARD
, ?RULE_ENGINE_SHARD
]).
-include("emqx_release.hrl"). -include("emqx_release.hrl").

View File

@ -19,6 +19,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("rule_engine.hrl"). -include("rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/qlc.hrl").
@ -95,6 +96,11 @@
-define(T_CALL, 10000). -define(T_CALL, 10000).
-rlog_shard({?RULE_ENGINE_SHARD, ?RULE_TAB}).
-rlog_shard({?RULE_ENGINE_SHARD, ?ACTION_TAB}).
-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TAB}).
-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TYPE_TAB}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -174,7 +180,7 @@ get_rules_ordered_by_ts() ->
Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]), Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
end, end,
{atomic, List} = mnesia:transaction(F), {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F),
List. List.
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
@ -471,11 +477,18 @@ code_change(_OldVsn, State, _Extra) ->
get_all_records(Tab) -> get_all_records(Tab) ->
%mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)). %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
ets:tab2list(Tab). %% Wrapping ets to a r/o transaction to avoid reading inconsistent
%% data during shard bootstrap
{atomic, Ret} =
ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD,
fun() ->
ets:tab2list(Tab)
end),
Ret.
trans(Fun) -> trans(Fun, []). trans(Fun) -> trans(Fun, []).
trans(Fun, Args) -> trans(Fun, Args) ->
case mnesia:transaction(Fun, Args) of case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of
{atomic, Result} -> Result; {atomic, Result} -> Result;
{aborted, Reason} -> error(Reason) {aborted, Reason} -> error(Reason)
end. end.