diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index fbc55fb2d..a8bc0814a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -292,6 +292,7 @@ allocate_shards_trans(DB, Opts) -> NShards = maps:get(n_shards, Opts), NSites = maps:get(n_sites, Opts), ReplicationFactor = maps:get(replication_factor, Opts), + NReplicas = min(NSites, ReplicationFactor), Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), case length(AllSites) of @@ -307,12 +308,18 @@ allocate_shards_trans(DB, Opts) -> ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], mnesia:abort({shards_already_allocated, ShardsAllocated}) end, + {Allocation, _} = lists:mapfoldl( + fun(Shard, SSites) -> + {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites), + {_, SRest} = emqx_utils_stream:consume(1, SSites), + {{Shard, Sites}, SRest} + end, + emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)), + Shards + ), lists:map( - fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], - Hashes = lists:sort(Hashes0), - {_, Sites} = lists:unzip(Hashes), - ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + fun({Shard, Sites}) -> + ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites], Record = #?SHARD_TAB{ shard = {DB, Shard}, replica_set = ReplicaSet @@ -320,7 +327,7 @@ allocate_shards_trans(DB, Opts) -> ok = mnesia:write(Record), Shard end, - Shards + Allocation ). -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> @@ -413,10 +420,6 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). -hash(Shard, Site) -> - erlang:phash2({Shard, Site}). - eval_qlc(Q) -> case mnesia:is_transaction() of true ->