feat(dsrepl): allocate shards predictably

To ensure strictly optimal and fair shard allocation across
cluster. Before this commit it was quite easy to end up with
an allocation significantly skewed towards some node, because
of the nature of randomness and relatively small number of
shards.
This commit is contained in:
Andrew Mayorov 2024-02-27 21:17:05 +01:00
parent d30c99512a
commit 54b5adf868
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 24 additions and 23 deletions

View File

@ -41,6 +41,7 @@ end_per_suite(_Config) ->
init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
Cluster = cluster(),
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
_ = wait_shards_online(Nodes),
[{nodes, Nodes} | Config];
init_per_testcase(t_message_gc = TestCase, Config) ->
Opts = #{
@ -53,7 +54,6 @@ init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config, _Opts = #{}).
common_init_per_testcase(TestCase, Config, Opts) ->
ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
Apps = emqx_cth_suite:start(
app_specs(Opts),
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
@ -63,14 +63,11 @@ common_init_per_testcase(TestCase, Config, Opts) ->
end_per_testcase(t_session_subscription_iterators, Config) ->
Nodes = ?config(nodes, Config),
emqx_common_test_helpers:call_janitor(60_000),
ok = emqx_cth_cluster:stop(Nodes),
end_per_testcase(common, Config);
ok = emqx_cth_cluster:stop(Nodes);
end_per_testcase(_TestCase, Config) ->
Apps = proplists:get_value(apps, Config, []),
emqx_common_test_helpers:call_janitor(60_000),
clear_db(),
emqx_cth_suite:stop(Apps),
ok.
ok = emqx_cth_suite:stop(Apps).
t_messages_persisted(_Config) ->
C1 = connect(<<?MODULE_STRING "1">>, true, 30),
@ -520,23 +517,24 @@ app_specs(Opts) ->
].
cluster() ->
ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2",
ExtraConf = "\n durable_storage.messages.n_sites = 2",
Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})},
[
{persistent_messages_SUITE1, Spec},
{persistent_messages_SUITE2, Spec}
].
wait_shards_online(Nodes = [Node | _]) ->
NShards = erpc:call(Node, emqx_ds_replication_layer_meta, n_shards, [?PERSISTENT_MESSAGE_DB]),
?retry(500, 10, [?assertEqual(NShards, shards_online(N)) || N <- Nodes]).
shards_online(Node) ->
length(erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [?PERSISTENT_MESSAGE_DB])).
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.
clear_db() ->
ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
mria:stop(),
ok = mnesia:delete_schema([node()]),
ok.
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,

View File

@ -276,6 +276,7 @@ allocate_shards_trans(DB, Opts) ->
NShards = maps:get(n_shards, Opts),
NSites = maps:get(n_sites, Opts),
ReplicationFactor = maps:get(replication_factor, Opts),
NReplicas = min(NSites, ReplicationFactor),
Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)],
AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read),
case length(AllSites) of
@ -291,12 +292,18 @@ allocate_shards_trans(DB, Opts) ->
ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
mnesia:abort({shards_already_allocated, ShardsAllocated})
end,
{Allocation, _} = lists:mapfoldl(
fun(Shard, SSites) ->
{Sites, _} = emqx_utils_stream:consume(NReplicas, SSites),
{_, SRest} = emqx_utils_stream:consume(1, SSites),
{{Shard, Sites}, SRest}
end,
emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)),
Shards
),
lists:map(
fun(Shard) ->
Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites],
Hashes = lists:sort(Hashes0),
{_, Sites} = lists:unzip(Hashes),
ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor),
fun({Shard, Sites}) ->
ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites],
Record = #?SHARD_TAB{
shard = {DB, Shard},
replica_set = ReplicaSet
@ -304,7 +311,7 @@ allocate_shards_trans(DB, Opts) ->
ok = mnesia:write(Record),
Shard
end,
Shards
Allocation
).
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
@ -387,10 +394,6 @@ ensure_site() ->
persistent_term:put(?emqx_ds_builtin_site, Site),
ok.
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
hash(Shard, Site) ->
erlang:phash2({Shard, Site}).
eval_qlc(Q) ->
case mnesia:is_transaction() of
true ->