diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 140b4c2d6..fc18976e3 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -209,10 +209,8 @@ t_rebalance(Config) -> Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], %% 1. Initialize DB on the first node. Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), - [ - ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) - || Node <- Nodes - ], + assert_db_open(Nodes, ?DB, Opts), + assert_db_stable(Nodes, ?DB), %% 1.1 Kick all sites except S1 from the replica set as %% the initial condition: @@ -506,10 +504,9 @@ t_rebalance_offline_restarts(Config) -> %% Initialize DB on all 3 nodes. Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) - ), + assert_db_open(Nodes, ?DB, Opts), + assert_db_stable(Nodes, ?DB), + ?retry( 1000, 5, @@ -933,17 +930,17 @@ assert_db_open(Nodes, DB, Opts) -> erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) ). -assert_db_stable(Nodes = [N1 | _], DB) -> - Shards = shards(N1, DB), +assert_db_stable([Node | _], DB) -> + Shards = ds_repl_meta(Node, shards, [DB]), ?assertMatch( _Leadership = [_ | _], - db_leadership(Nodes, DB, Shards) + db_leadership(Node, DB, Shards) ). %% -db_leadership(Nodes, DB, Shards) -> - Leadership = [{S, shard_leadership(Nodes, DB, S)} || S <- Shards], +db_leadership(Node, DB, Shards) -> + Leadership = [{S, shard_leadership(Node, DB, S)} || S <- Shards], Inconsistent = [SL || SL = {_, Leaders} <- Leadership, map_size(Leaders) > 1], case Inconsistent of [] -> @@ -952,15 +949,17 @@ db_leadership(Nodes, DB, Shards) -> {error, inconsistent, Inconsistent} end. -shard_leadership(Nodes, DB, Shard) -> +shard_leadership(Node, DB, Shard) -> + ReplicaSet = ds_repl_meta(Node, replica_set, [DB, Shard]), + Nodes = [ds_repl_meta(Node, node, [Site]) || Site <- ReplicaSet], lists:foldl( - fun(N, Acc) -> Acc#{shard_leader(N, DB, Shard) => N} end, + fun({Site, SN}, Acc) -> Acc#{shard_leader(SN, DB, Shard, Site) => SN} end, #{}, - Nodes + lists:zip(ReplicaSet, Nodes) ). -shard_leader(Node, DB, Shard) -> - shard_server_info(Node, DB, Shard, ds_repl_meta(Node, this_site), leader). +shard_leader(Node, DB, Shard, Site) -> + shard_server_info(Node, DB, Shard, Site, leader). shard_server_info(Node, DB, Shard, Site, Info) -> ?ON( @@ -985,9 +984,6 @@ ds_repl_meta(Node, Fun, Args) -> error(meta_op_failed) end. -shards(Node, DB) -> - erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). - shards_online(Node, DB) -> erpc:call(Node, emqx_ds_builtin_raft_db_sup, which_shards, [DB]).