From 54b5adf8682fe722d771a47f93ac2aad9d860ed3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Feb 2024 21:17:05 +0100 Subject: [PATCH] feat(dsrepl): allocate shards predictably To ensure strictly optimal and fair shard allocation across cluster. Before this commit it was quite easy to end up with an allocation significantly skewed towards some node, because of the nature of randomness and relatively small number of shards. --- .../test/emqx_persistent_messages_SUITE.erl | 24 +++++++++---------- .../src/emqx_ds_replication_layer_meta.erl | 23 ++++++++++-------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 530be9a4b..b2a41b0d2 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -41,6 +41,7 @@ end_per_suite(_Config) -> init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Cluster = cluster(), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), + _ = wait_shards_online(Nodes), [{nodes, Nodes} | Config]; init_per_testcase(t_message_gc = TestCase, Config) -> Opts = #{ @@ -53,7 +54,6 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config, _Opts = #{}). common_init_per_testcase(TestCase, Config, Opts) -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), Apps = emqx_cth_suite:start( app_specs(Opts), #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} @@ -63,14 +63,11 @@ common_init_per_testcase(TestCase, Config, Opts) -> end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), - ok = emqx_cth_cluster:stop(Nodes), - end_per_testcase(common, Config); + ok = emqx_cth_cluster:stop(Nodes); end_per_testcase(_TestCase, Config) -> Apps = proplists:get_value(apps, Config, []), emqx_common_test_helpers:call_janitor(60_000), - clear_db(), - emqx_cth_suite:stop(Apps), - ok. + ok = emqx_cth_suite:stop(Apps). t_messages_persisted(_Config) -> C1 = connect(<>, true, 30), @@ -520,23 +517,24 @@ app_specs(Opts) -> ]. cluster() -> - ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2", + ExtraConf = "\n durable_storage.messages.n_sites = 2", Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})}, [ {persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE2, Spec} ]. +wait_shards_online(Nodes = [Node | _]) -> + NShards = erpc:call(Node, emqx_ds_replication_layer_meta, n_shards, [?PERSISTENT_MESSAGE_DB]), + ?retry(500, 10, [?assertEqual(NShards, shards_online(N)) || N <- Nodes]). + +shards_online(Node) -> + length(erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [?PERSISTENT_MESSAGE_DB])). + get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -clear_db() -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), - mria:stop(), - ok = mnesia:delete_schema([node()]), - ok. - message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 726c81ff3..a4331cdea 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -276,6 +276,7 @@ allocate_shards_trans(DB, Opts) -> NShards = maps:get(n_shards, Opts), NSites = maps:get(n_sites, Opts), ReplicationFactor = maps:get(replication_factor, Opts), + NReplicas = min(NSites, ReplicationFactor), Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), case length(AllSites) of @@ -291,12 +292,18 @@ allocate_shards_trans(DB, Opts) -> ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], mnesia:abort({shards_already_allocated, ShardsAllocated}) end, + {Allocation, _} = lists:mapfoldl( + fun(Shard, SSites) -> + {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites), + {_, SRest} = emqx_utils_stream:consume(1, SSites), + {{Shard, Sites}, SRest} + end, + emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)), + Shards + ), lists:map( - fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], - Hashes = lists:sort(Hashes0), - {_, Sites} = lists:unzip(Hashes), - ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + fun({Shard, Sites}) -> + ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites], Record = #?SHARD_TAB{ shard = {DB, Shard}, replica_set = ReplicaSet @@ -304,7 +311,7 @@ allocate_shards_trans(DB, Opts) -> ok = mnesia:write(Record), Shard end, - Shards + Allocation ). -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> @@ -387,10 +394,6 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). -hash(Shard, Site) -> - erlang:phash2({Shard, Site}). - eval_qlc(Q) -> case mnesia:is_transaction() of true ->