wip: allocate shards only when predefined number of sites online

This commit is contained in:
Andrew Mayorov 2024-02-01 14:01:04 +01:00
parent fcb5ed346f
commit 46e8118e1c
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 29 additions and 15 deletions

View File

@ -64,6 +64,7 @@ storage_backend(#{
builtin := #{ builtin := #{
enable := true, enable := true,
n_shards := NShards, n_shards := NShards,
n_sites := NSites,
replication_factor := ReplicationFactor replication_factor := ReplicationFactor
} }
}) -> }) ->
@ -71,6 +72,7 @@ storage_backend(#{
backend => builtin, backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}}, storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => NShards, n_shards => NShards,
n_sites => NSites,
replication_factor => ReplicationFactor replication_factor => ReplicationFactor
}; };
storage_backend(#{ storage_backend(#{

View File

@ -1933,7 +1933,16 @@ fields("session_storage_backend_builtin") ->
pos_integer(), pos_integer(),
#{ #{
desc => ?DESC(session_builtin_n_shards), desc => ?DESC(session_builtin_n_shards),
default => 16 %% FIXME
default => 4
}
)},
%% TODO: Minimum number of sites that will be responsible for the shards
{"n_sites",
sc(
pos_integer(),
#{
default => 1
} }
)}, )},
{"replication_factor", {"replication_factor",

View File

@ -520,7 +520,8 @@ app_specs(Opts) ->
]. ].
cluster() -> cluster() ->
Spec = #{role => core, apps => app_specs()}, ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2",
Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
[ [
{persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE1, Spec},
{persistent_messages_SUITE2, Spec} {persistent_messages_SUITE2, Spec}

View File

@ -80,6 +80,7 @@
backend := builtin, backend := builtin,
storage := emqx_ds_storage_layer:prototype(), storage := emqx_ds_storage_layer:prototype(),
n_shards => pos_integer(), n_shards => pos_integer(),
n_sites => pos_integer(),
replication_factor => pos_integer() replication_factor => pos_integer()
}. }.

View File

@ -251,11 +251,12 @@ open_db(DB, DefaultOpts) ->
case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of
{atomic, Opts} -> {atomic, Opts} ->
Opts; Opts;
{aborted, {siteless_nodes, Nodes}} -> {aborted, {insufficient_sites_online, NNeeded, Sites}} ->
%% TODO %% TODO: Still ugly, it blocks the whole node startup.
%% This is ugly. We need a good story of how to fairly allocate shards in a logger:notice(
%% fresh cluster. "Shard allocation still in progress, not enough sites: ~p, need: ~p",
logger:notice("Aborting shard allocation, siteless nodes found: ~p", [Nodes]), [Sites, NNeeded]
),
ok = timer:sleep(1000), ok = timer:sleep(1000),
open_db(DB, DefaultOpts) open_db(DB, DefaultOpts)
end. end.
@ -337,9 +338,10 @@ open_db_trans(DB, CreateOpts) ->
case mnesia:wread({?META_TAB, DB}) of case mnesia:wread({?META_TAB, DB}) of
[] when is_map(CreateOpts) -> [] when is_map(CreateOpts) ->
NShards = maps:get(n_shards, CreateOpts), NShards = maps:get(n_shards, CreateOpts),
NSites = maps:get(n_sites, CreateOpts),
ReplicationFactor = maps:get(replication_factor, CreateOpts), ReplicationFactor = maps:get(replication_factor, CreateOpts),
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
create_shards(DB, NShards, ReplicationFactor), create_shards(DB, NSites, NShards, ReplicationFactor),
CreateOpts; CreateOpts;
[#?META_TAB{db_props = Opts}] -> [#?META_TAB{db_props = Opts}] ->
Opts Opts
@ -465,16 +467,15 @@ ensure_site() ->
persistent_term:put(?emqx_ds_builtin_site, Site), persistent_term:put(?emqx_ds_builtin_site, Site),
ok. ok.
-spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer(), pos_integer()) -> ok.
create_shards(DB, NShards, ReplicationFactor) -> create_shards(DB, NSites, NShards, ReplicationFactor) ->
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
Nodes = mria_mnesia:running_nodes(), case length(AllSites) of
case Nodes -- [N || #?NODE_TAB{node = N} <- AllSites] of N when N >= NSites ->
[] ->
ok; ok;
NodesSiteless -> _ ->
mnesia:abort({siteless_nodes, NodesSiteless}) mnesia:abort({insufficient_sites_online, NSites, AllSites})
end, end,
lists:foreach( lists:foreach(
fun(Shard) -> fun(Shard) ->