diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index b178a742c..7f1e60723 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -64,6 +64,7 @@ storage_backend(#{ builtin := #{ enable := true, n_shards := NShards, + n_sites := NSites, replication_factor := ReplicationFactor } }) -> @@ -71,6 +72,7 @@ storage_backend(#{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, n_shards => NShards, + n_sites => NSites, replication_factor => ReplicationFactor }; storage_backend(#{ diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d5989687d..9417c8b5e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1933,7 +1933,16 @@ fields("session_storage_backend_builtin") -> pos_integer(), #{ desc => ?DESC(session_builtin_n_shards), - default => 16 + %% FIXME + default => 4 + } + )}, + %% TODO: Minimum number of sites that will be responsible for the shards + {"n_sites", + sc( + pos_integer(), + #{ + default => 1 } )}, {"replication_factor", diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index ad5191793..5e52fd78e 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -520,7 +520,8 @@ app_specs(Opts) -> ]. cluster() -> - Spec = #{role => core, apps => app_specs()}, + ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2", + Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})}, [ {persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE2, Spec} diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 2062982b8..280094576 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -80,6 +80,7 @@ backend := builtin, storage := emqx_ds_storage_layer:prototype(), n_shards => pos_integer(), + n_sites => pos_integer(), replication_factor => pos_integer() }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 6b157ad6b..b0ef4903f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -251,11 +251,12 @@ open_db(DB, DefaultOpts) -> case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of {atomic, Opts} -> Opts; - {aborted, {siteless_nodes, Nodes}} -> - %% TODO - %% This is ugly. We need a good story of how to fairly allocate shards in a - %% fresh cluster. - logger:notice("Aborting shard allocation, siteless nodes found: ~p", [Nodes]), + {aborted, {insufficient_sites_online, NNeeded, Sites}} -> + %% TODO: Still ugly, it blocks the whole node startup. + logger:notice( + "Shard allocation still in progress, not enough sites: ~p, need: ~p", + [Sites, NNeeded] + ), ok = timer:sleep(1000), open_db(DB, DefaultOpts) end. @@ -337,9 +338,10 @@ open_db_trans(DB, CreateOpts) -> case mnesia:wread({?META_TAB, DB}) of [] when is_map(CreateOpts) -> NShards = maps:get(n_shards, CreateOpts), + NSites = maps:get(n_sites, CreateOpts), ReplicationFactor = maps:get(replication_factor, CreateOpts), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), - create_shards(DB, NShards, ReplicationFactor), + create_shards(DB, NSites, NShards, ReplicationFactor), CreateOpts; [#?META_TAB{db_props = Opts}] -> Opts @@ -465,16 +467,15 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. -create_shards(DB, NShards, ReplicationFactor) -> +-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer(), pos_integer()) -> ok. +create_shards(DB, NSites, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), - Nodes = mria_mnesia:running_nodes(), - case Nodes -- [N || #?NODE_TAB{node = N} <- AllSites] of - [] -> + case length(AllSites) of + N when N >= NSites -> ok; - NodesSiteless -> - mnesia:abort({siteless_nodes, NodesSiteless}) + _ -> + mnesia:abort({insufficient_sites_online, NSites, AllSites}) end, lists:foreach( fun(Shard) ->