chore(mria): ekka_mnesia:create_table -> mria:create_table

This commit is contained in:
k32 2021-10-14 20:51:36 +02:00 committed by x1001100011
parent 75c9267473
commit 814623edae
23 changed files with 67 additions and 115 deletions

View File

@ -95,21 +95,18 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ACTIVATED_ALARM, ok = mria:create_table(?ACTIVATED_ALARM,
[{type, set}, [{type, set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, activated_alarm}, {record_name, activated_alarm},
{attributes, record_info(fields, activated_alarm)}]), {attributes, record_info(fields, activated_alarm)}]),
ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM, ok = mria:create_table(?DEACTIVATED_ALARM,
[{type, ordered_set}, [{type, ordered_set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, deactivated_alarm}, {record_name, deactivated_alarm},
{attributes, record_info(fields, deactivated_alarm)}]); {attributes, record_info(fields, deactivated_alarm)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ACTIVATED_ALARM, disc_copies),
ok = ekka_mnesia:copy_table(?DEACTIVATED_ALARM, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API

View File

@ -58,16 +58,13 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?BANNED_TAB, [ ok = mria:create_table(?BANNED_TAB, [
{type, set}, {type, set},
{rlog_shard, ?COMMON_SHARD}, {rlog_shard, ?COMMON_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, banned}, {record_name, banned},
{attributes, record_info(fields, banned)}, {attributes, record_info(fields, banned)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?BANNED_TAB, disc_copies).
%% @doc Start the banned server. %% @doc Start the banned server.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).

View File

@ -100,15 +100,14 @@ record(ClientId, ChanPid) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, bag}, {type, bag},
{rlog_shard, ?CM_SHARD}, {rlog_shard, ?CM_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, channel}, {record_name, channel},
{attributes, record_info(fields, channel)}, {attributes, record_info(fields, channel)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]), {write_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(?TAB, ram_copies),
ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
ok = ekka:monitor(membership), ok = ekka:monitor(membership),
{ok, #{}}. {ok, #{}}.

View File

@ -74,16 +74,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTE_TAB, [ ok = mria:create_table(?ROUTE_TAB, [
{type, bag}, {type, bag},
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, route}, {record_name, route},
{attributes, record_info(fields, route)}, {attributes, record_info(fields, route)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]); {write_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Start a router %% Start a router

View File

@ -59,16 +59,13 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTING_NODE, [ ok = mria:create_table(?ROUTING_NODE, [
{type, set}, {type, set},
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, routing_node}, {record_name, routing_node},
{attributes, record_info(fields, routing_node)}, {attributes, record_info(fields, routing_node)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTING_NODE, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API

View File

@ -85,15 +85,12 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, bag}, {type, bag},
{rlog_shard, ?SHARED_SUB_SHARD}, {rlog_shard, ?SHARED_SUB_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, emqx_shared_subscription}, {record_name, emqx_shared_subscription},
{attributes, record_info(fields, emqx_shared_subscription)}]); {attributes, record_info(fields, emqx_shared_subscription)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API

View File

@ -61,16 +61,13 @@ mnesia(boot) ->
StoreProps = [{ets, [{read_concurrency, true}, StoreProps = [{ets, [{read_concurrency, true},
{write_concurrency, true} {write_concurrency, true}
]}], ]}],
ok = ekka_mnesia:create_table(?TRIE, [ ok = mria:create_table(?TRIE, [
{rlog_shard, ?ROUTE_SHARD}, {rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, ?TRIE}, {record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)}, {attributes, record_info(fields, ?TRIE)},
{type, ordered_set}, {type, ordered_set},
{storage_properties, StoreProps}]); {storage_properties, StoreProps}]).
mnesia(copy) ->
%% Copy topics table
ok = ekka_mnesia:copy_table(?TRIE, ram_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Topics APIs %% Topics APIs

View File

@ -63,15 +63,12 @@
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTH_SHARD}, {rlog_shard, ?AUTH_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, user_info}, {record_name, user_info},
{attributes, record_info(fields, user_info)}, {attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hocon Schema %% Hocon Schema

View File

@ -70,15 +70,12 @@
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTH_SHARD}, {rlog_shard, ?AUTH_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, user_info}, {record_name, user_info},
{attributes, record_info(fields, user_info)}, {attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Hocon Schema %% Hocon Schema

View File

@ -36,14 +36,12 @@
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?ACL_TABLE, [ ok = mria:create_table(?ACL_TABLE, [
{type, ordered_set}, {type, ordered_set},
{rlog_shard, ?ACL_SHARDED}, {rlog_shard, ?ACL_SHARDED},
{disc_copies, [node()]}, {storage, disc_copies},
{attributes, record_info(fields, ?ACL_TABLE)}, {attributes, record_info(fields, ?ACL_TABLE)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ACL_TABLE, disc_copies).
description() -> description() ->
"AuthZ with Mnesia". "AuthZ with Mnesia".

View File

@ -50,16 +50,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(mqtt_admin, [ ok = mria:create_table(mqtt_admin, [
{type, set}, {type, set},
{rlog_shard, ?DASHBOARD_SHARD}, {rlog_shard, ?DASHBOARD_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, mqtt_admin}, {record_name, mqtt_admin},
{attributes, record_info(fields, mqtt_admin)}, {attributes, record_info(fields, mqtt_admin)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]); {write_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(mqtt_admin, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API

View File

@ -41,14 +41,12 @@
-define(EXPIRE_INTERVAL, 86400000 * 7). -define(EXPIRE_INTERVAL, 86400000 * 7).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(emqx_collect, [ ok = mria:create_table(emqx_collect, [
{type, set}, {type, set},
{local_content, true}, {local_content, true},
{disc_only_copies, [node()]}, {storage, disc_only_copies},
{record_name, mqtt_collect}, {record_name, mqtt_collect},
{attributes, record_info(fields, mqtt_collect)}]); {attributes, record_info(fields, mqtt_collect)}]).
mnesia(copy) ->
mnesia:add_table_copy(emqx_collect, node(), disc_only_copies).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

View File

@ -77,16 +77,14 @@ destroy_by_username(Username) ->
do_destroy_by_username(Username). do_destroy_by_username(Username).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, set}, {type, set},
{rlog_shard, ?DASHBOARD_SHARD}, {rlog_shard, ?DASHBOARD_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, mqtt_admin_jwt}, {record_name, mqtt_admin_jwt},
{attributes, record_info(fields, mqtt_admin_jwt)}, {attributes, record_info(fields, mqtt_admin_jwt)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]); {write_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% jwt apply %% jwt apply

View File

@ -89,16 +89,15 @@ record(ClientId, ChanPid) ->
init([Type]) -> init([Type]) ->
Tab = tabname(Type), Tab = tabname(Type),
ok = ekka_mnesia:create_table(Tab, [ ok = mria:create_table(Tab, [
{type, bag}, {type, bag},
{rlog_shard, ?CM_SHARD}, {rlog_shard, ?CM_SHARD},
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, channel}, {record_name, channel},
{attributes, record_info(fields, channel)}, {attributes, record_info(fields, channel)},
{storage_properties, [{ets, [{read_concurrency, true}, {storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]), {write_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(Tab, ram_copies), ok = mria:wait_for_tables([Tab]),
%%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
ok = ekka:monitor(membership), ok = ekka:monitor(membership),
{ok, #{type => Type}}. {ok, #{type => Type}}.

View File

@ -63,13 +63,10 @@
%mnesia(boot) -> %mnesia(boot) ->
% %% Optimize storage % %% Optimize storage
% StoreProps = [{ets, [{read_concurrency, true}]}], % StoreProps = [{ets, [{read_concurrency, true}]}],
% ok = ekka_mnesia:create_table(?MODULE, [ % ok = mria:create_table(?MODULE, [
% {attributes, record_info(fields, emqx_sn_registry)}, % {attributes, record_info(fields, emqx_sn_registry)},
% {ram_copies, [node()]}, % {ram_copies, [node()]},
% {storage_properties, StoreProps}]); % {storage_properties, StoreProps}]).
%
%mnesia(copy) ->
% ok = ekka_mnesia:copy_table(?MODULE, ram_copies).
-type registry() :: {Tab :: atom(), -type registry() :: {Tab :: atom(),
RegistryPid :: pid()}. RegistryPid :: pid()}.
@ -141,15 +138,14 @@ init([InstaId, PredefTopics]) ->
%% {ClientId, TopicId} -> TopicName %% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId %% {ClientId, TopicName} -> TopicId
Tab = name(InstaId), Tab = name(InstaId),
ok = ekka_mnesia:create_table(Tab, [ ok = mria:create_table(Tab, [
{ram_copies, [node()]}, {storage, ram_copies},
{record_name, emqx_sn_registry}, {record_name, emqx_sn_registry},
{attributes, record_info(fields, emqx_sn_registry)}, {attributes, record_info(fields, emqx_sn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}, {storage_properties, [{ets, [{read_concurrency, true}]}]},
{rlog_shard, ?SN_SHARD} {rlog_shard, ?SN_SHARD}
]), ]),
ok = ekka_mnesia:copy_table(Tab, ram_copies), ok = mria:wait_for_tables([Tab]),
ok = ekka_rlog:wait_for_shards([?SN_SHARD], infinity),
% FIXME: % FIXME:
%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl( MaxPredefId = lists:foldl(

View File

@ -45,21 +45,18 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ ok = mria:create_table(?CLUSTER_MFA, [
{type, ordered_set}, {type, ordered_set},
{rlog_shard, ?EMQX_MACHINE_SHARD}, {rlog_shard, ?EMQX_MACHINE_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, cluster_rpc_mfa}, {record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]), {attributes, record_info(fields, cluster_rpc_mfa)}]),
ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ ok = mria:create_table(?CLUSTER_COMMIT, [
{type, set}, {type, set},
{rlog_shard, ?EMQX_MACHINE_SHARD}, {rlog_shard, ?EMQX_MACHINE_SHARD},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, cluster_rpc_commit}, {record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]); {attributes, record_info(fields, cluster_rpc_commit)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(cluster_rpc_mfa, disc_copies),
ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
start_link() -> start_link() ->
start_link(node(), ?MODULE, get_retry_ms()). start_link(node(), ?MODULE, get_retry_ms()).

View File

@ -64,14 +64,12 @@
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, ordered_set}, {type, ordered_set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, delayed_message}, {record_name, delayed_message},
{attributes, record_info(fields, delayed_message)}]); {attributes, record_info(fields, delayed_message)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hooks %% Hooks

View File

@ -91,14 +91,12 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TELEMETRY, ok = mria:create_table(?TELEMETRY,
[{type, set}, [{type, set},
{disc_copies, [node()]}, {storage, disc_copies},
{local_content, true}, {local_content, true},
{record_name, telemetry}, {record_name, telemetry},
{attributes, record_info(fields, telemetry)}]); {attributes, record_info(fields, telemetry)}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TELEMETRY, disc_copies).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API

View File

@ -35,7 +35,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ekka_mnesia:start(), mria:start(),
ok = emqx_delayed:mnesia(boot), ok = emqx_delayed:mnesia(boot),
emqx_common_test_helpers:start_apps([emqx_modules]), emqx_common_test_helpers:start_apps([emqx_modules]),
Config. Config.

View File

@ -28,7 +28,7 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
ok = ekka_mnesia:start(), ok = mria:start(),
ok = emqx_telemetry:mnesia(boot), ok = emqx_telemetry:mnesia(boot),
emqx_common_test_helpers:start_apps([emqx_modules]), emqx_common_test_helpers:start_apps([emqx_modules]),
Config. Config.

View File

@ -590,6 +590,6 @@ emqx_cluster() ->
]. ].
emqx_cluster_data() -> emqx_cluster_data() ->
#{running_nodes := Running, stopped_nodes := Stopped} = ekka_mnesia:cluster_info(), #{running_nodes := Running, stopped_nodes := Stopped} = mria:cluster_info(),
[{nodes_running, length(Running)}, [{nodes_running, length(Running)},
{nodes_stopped, length(Stopped)}]. {nodes_stopped, length(Stopped)}].

View File

@ -64,16 +64,13 @@
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{rlog_shard, ?PSK_SHARD}, {rlog_shard, ?PSK_SHARD},
{type, ordered_set}, {type, ordered_set},
{disc_copies, [node()]}, {storage, disc_copies},
{record_name, psk_entry}, {record_name, psk_entry},
{attributes, record_info(fields, psk_entry)}, {attributes, record_info(fields, psk_entry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}]); {storage_properties, [{ets, [{read_concurrency, true}]}]}]).
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% APIs %% APIs

View File

@ -52,14 +52,13 @@ create_resource(#{storage_type := StorageType}) ->
{read_concurrency, true}, {read_concurrency, true},
{write_concurrency, true}]}, {write_concurrency, true}]},
{dets, [{auto_save, 1000}]}], {dets, [{auto_save, 1000}]}],
ok = ekka_mnesia:create_table(?TAB, [ ok = mria:create_table(?TAB, [
{type, set}, {type, set},
{rlog_shard, ?RETAINER_SHARD}, {rlog_shard, ?RETAINER_SHARD},
{Copies, [node()]}, {storage, Copies},
{record_name, retained}, {record_name, retained},
{attributes, record_info(fields, retained)}, {attributes, record_info(fields, retained)},
{storage_properties, StoreProps}]), {storage_properties, StoreProps}]),
ok = ekka_mnesia:copy_table(?TAB, Copies),
ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity), ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
case mnesia:table_info(?TAB, storage_type) of case mnesia:table_info(?TAB, storage_type) of
Copies -> ok; Copies -> ok;