diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 64551be13..ef1f81b2b 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -39,6 +39,7 @@ translate_builtin(#{ backend := builtin, n_shards := NShards, + n_sites := NSites, replication_factor := ReplFactor, layout := Layout }) -> @@ -61,6 +62,7 @@ translate_builtin(#{ #{ backend => builtin, n_shards => NShards, + n_sites => NSites, replication_factor => ReplFactor, storage => Storage }. @@ -126,6 +128,14 @@ fields(builtin) -> desc => ?DESC(builtin_n_shards) } )}, + %% TODO: Minimum number of sites that will be responsible for the shards + {"n_sites", + sc( + pos_integer(), + #{ + default => 1 + } + )}, {replication_factor, sc( pos_integer(), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 5c9637e2e..530be9a4b 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -520,7 +520,8 @@ app_specs(Opts) -> ]. cluster() -> - Spec = #{role => core, apps => app_specs()}, + ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2", + Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})}, [ {persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE2, Spec} diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index cb2001d6e..fcd3e81a0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -94,6 +94,7 @@ backend := builtin, storage := emqx_ds_storage_layer:prototype(), n_shards => pos_integer(), + n_sites => pos_integer(), replication_factor => pos_integer() }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 6b157ad6b..b0ef4903f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -251,11 +251,12 @@ open_db(DB, DefaultOpts) -> case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of {atomic, Opts} -> Opts; - {aborted, {siteless_nodes, Nodes}} -> - %% TODO - %% This is ugly. We need a good story of how to fairly allocate shards in a - %% fresh cluster. - logger:notice("Aborting shard allocation, siteless nodes found: ~p", [Nodes]), + {aborted, {insufficient_sites_online, NNeeded, Sites}} -> + %% TODO: Still ugly, it blocks the whole node startup. + logger:notice( + "Shard allocation still in progress, not enough sites: ~p, need: ~p", + [Sites, NNeeded] + ), ok = timer:sleep(1000), open_db(DB, DefaultOpts) end. @@ -337,9 +338,10 @@ open_db_trans(DB, CreateOpts) -> case mnesia:wread({?META_TAB, DB}) of [] when is_map(CreateOpts) -> NShards = maps:get(n_shards, CreateOpts), + NSites = maps:get(n_sites, CreateOpts), ReplicationFactor = maps:get(replication_factor, CreateOpts), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), - create_shards(DB, NShards, ReplicationFactor), + create_shards(DB, NSites, NShards, ReplicationFactor), CreateOpts; [#?META_TAB{db_props = Opts}] -> Opts @@ -465,16 +467,15 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. -create_shards(DB, NShards, ReplicationFactor) -> +-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer(), pos_integer()) -> ok. +create_shards(DB, NSites, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), - Nodes = mria_mnesia:running_nodes(), - case Nodes -- [N || #?NODE_TAB{node = N} <- AllSites] of - [] -> + case length(AllSites) of + N when N >= NSites -> ok; - NodesSiteless -> - mnesia:abort({siteless_nodes, NodesSiteless}) + _ -> + mnesia:abort({insufficient_sites_online, NSites, AllSites}) end, lists:foreach( fun(Shard) ->