diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 9b53bddff..146b69e2b 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -281,7 +281,7 @@ t_rebalance(Config) -> %% Verify that the set of shard servers matches the target allocation. Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], ShardServers = [ - shard_server_info(N, ?DB, Shard, Site, readiness) + {{Shard, N}, shard_server_info(N, ?DB, Shard, Site, readiness)} || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), Shard <- Shards ], @@ -416,10 +416,8 @@ t_rebalance_chaotic_converges(Config) -> Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}), %% Open DB: - ?assertEqual( - [{ok, ok}, {ok, ok}, {ok, ok}], - erpc:multicall([N1, N2, N3], emqx_ds, open_db, [?DB, Opts]) - ), + assert_db_open(Nodes, ?DB, Opts), + assert_db_stable(Nodes, ?DB), %% Kick N3 from the replica set as the initial condition: ?assertMatch( @@ -922,12 +920,47 @@ kill_restart_node(Node, Spec, DBOpts) -> %% +assert_db_open(Nodes, DB, Opts) -> + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) + ). + +assert_db_stable(Nodes = [N1 | _], DB) -> + Shards = shards(N1, DB), + ?assertMatch( + _Leadership = [_ | _], + db_leadership(Nodes, DB, Shards) + ). + +%% + +db_leadership(Nodes, DB, Shards) -> + Leadership = [{S, shard_leadership(Nodes, DB, S)} || S <- Shards], + Inconsistent = [SL || SL = {_, Leaders} <- Leadership, map_size(Leaders) > 1], + case Inconsistent of + [] -> + Leadership; + [_ | _] -> + {error, inconsistent, Inconsistent} + end. + +shard_leadership(Nodes, DB, Shard) -> + lists:foldl( + fun(N, Acc) -> Acc#{shard_leader(N, DB, Shard) => N} end, + #{}, + Nodes + ). + +shard_leader(Node, DB, Shard) -> + shard_server_info(Node, DB, Shard, ds_repl_meta(Node, this_site), leader). + shard_server_info(Node, DB, Shard, Site, Info) -> ?ON( Node, begin Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), - {Server, emqx_ds_replication_layer_shard:server_info(Info, Server)} + emqx_ds_replication_layer_shard:server_info(Info, Server) end ).