wip: allocate shards predictably

This commit is contained in:
Andrew Mayorov 2024-02-27 21:17:05 +01:00
parent 41f1fd8ebc
commit da6844957e
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 13 additions and 10 deletions

View File

@ -292,6 +292,7 @@ allocate_shards_trans(DB, Opts) ->
NShards = maps:get(n_shards, Opts), NShards = maps:get(n_shards, Opts),
NSites = maps:get(n_sites, Opts), NSites = maps:get(n_sites, Opts),
ReplicationFactor = maps:get(replication_factor, Opts), ReplicationFactor = maps:get(replication_factor, Opts),
NReplicas = min(NSites, ReplicationFactor),
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
case length(AllSites) of case length(AllSites) of
@ -307,12 +308,18 @@ allocate_shards_trans(DB, Opts) ->
ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
mnesia:abort({shards_already_allocated, ShardsAllocated}) mnesia:abort({shards_already_allocated, ShardsAllocated})
end, end,
{Allocation, _} = lists:mapfoldl(
fun(Shard, SSites) ->
{Sites, _} = emqx_utils_stream:consume(NReplicas, SSites),
{_, SRest} = emqx_utils_stream:consume(1, SSites),
{{Shard, Sites}, SRest}
end,
emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)),
Shards
),
lists:map( lists:map(
fun(Shard) -> fun({Shard, Sites}) ->
Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites],
Hashes = lists:sort(Hashes0),
{_, Sites} = lists:unzip(Hashes),
ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
Record = #?SHARD_TAB{ Record = #?SHARD_TAB{
shard = {DB, Shard}, shard = {DB, Shard},
replica_set = ReplicaSet replica_set = ReplicaSet
@ -320,7 +327,7 @@ allocate_shards_trans(DB, Opts) ->
ok = mnesia:write(Record), ok = mnesia:write(Record),
Shard Shard
end, end,
Shards Allocation
). ).
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
@ -413,10 +420,6 @@ ensure_site() ->
persistent_term:put(?emqx_ds_builtin_site, Site), persistent_term:put(?emqx_ds_builtin_site, Site),
ok. ok.
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
hash(Shard, Site) ->
erlang:phash2({Shard, Site}).
eval_qlc(Q) -> eval_qlc(Q) ->
case mnesia:is_transaction() of case mnesia:is_transaction() of
true -> true ->