diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 530be9a4b..b2a41b0d2 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -41,6 +41,7 @@ end_per_suite(_Config) -> init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Cluster = cluster(), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), + _ = wait_shards_online(Nodes), [{nodes, Nodes} | Config]; init_per_testcase(t_message_gc = TestCase, Config) -> Opts = #{ @@ -53,7 +54,6 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config, _Opts = #{}). common_init_per_testcase(TestCase, Config, Opts) -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), Apps = emqx_cth_suite:start( app_specs(Opts), #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} @@ -63,14 +63,11 @@ common_init_per_testcase(TestCase, Config, Opts) -> end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), - ok = emqx_cth_cluster:stop(Nodes), - end_per_testcase(common, Config); + ok = emqx_cth_cluster:stop(Nodes); end_per_testcase(_TestCase, Config) -> Apps = proplists:get_value(apps, Config, []), emqx_common_test_helpers:call_janitor(60_000), - clear_db(), - emqx_cth_suite:stop(Apps), - ok. + ok = emqx_cth_suite:stop(Apps). t_messages_persisted(_Config) -> C1 = connect(<>, true, 30), @@ -520,23 +517,24 @@ app_specs(Opts) -> ]. cluster() -> - ExtraConf = "\n session_persistence.storage.builtin.n_sites = 2", + ExtraConf = "\n durable_storage.messages.n_sites = 2", Spec = #{role => core, apps => app_specs(#{extra_emqx_conf => ExtraConf})}, [ {persistent_messages_SUITE1, Spec}, {persistent_messages_SUITE2, Spec} ]. +wait_shards_online(Nodes = [Node | _]) -> + NShards = erpc:call(Node, emqx_ds_replication_layer_meta, n_shards, [?PERSISTENT_MESSAGE_DB]), + ?retry(500, 10, [?assertEqual(NShards, shards_online(N)) || N <- Nodes]). + +shards_online(Node) -> + length(erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [?PERSISTENT_MESSAGE_DB])). + get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -clear_db() -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), - mria:stop(), - ok = mnesia:delete_schema([node()]), - ok. - message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 726c81ff3..a4331cdea 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -276,6 +276,7 @@ allocate_shards_trans(DB, Opts) -> NShards = maps:get(n_shards, Opts), NSites = maps:get(n_sites, Opts), ReplicationFactor = maps:get(replication_factor, Opts), + NReplicas = min(NSites, ReplicationFactor), Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), case length(AllSites) of @@ -291,12 +292,18 @@ allocate_shards_trans(DB, Opts) -> ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records], mnesia:abort({shards_already_allocated, ShardsAllocated}) end, + {Allocation, _} = lists:mapfoldl( + fun(Shard, SSites) -> + {Sites, _} = emqx_utils_stream:consume(NReplicas, SSites), + {_, SRest} = emqx_utils_stream:consume(1, SSites), + {{Shard, Sites}, SRest} + end, + emqx_utils_stream:repeat(emqx_utils_stream:list(AllSites)), + Shards + ), lists:map( - fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], - Hashes = lists:sort(Hashes0), - {_, Sites} = lists:unzip(Hashes), - ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + fun({Shard, Sites}) -> + ReplicaSet = [Site || #?NODE_TAB{site = Site} <- Sites], Record = #?SHARD_TAB{ shard = {DB, Shard}, replica_set = ReplicaSet @@ -304,7 +311,7 @@ allocate_shards_trans(DB, Opts) -> ok = mnesia:write(Record), Shard end, - Shards + Allocation ). -spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> @@ -387,10 +394,6 @@ ensure_site() -> persistent_term:put(?emqx_ds_builtin_site, Site), ok. --spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). -hash(Shard, Site) -> - erlang:phash2({Shard, Site}). - eval_qlc(Q) -> case mnesia:is_transaction() of true ->