From 8d88d14f0acb29a8bd0961919d8fbba37def684a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 7 Aug 2024 10:38:22 +0200 Subject: [PATCH] test(dsraft): use bootstrap as readiness criterion In another attempt to stabilize the rest of flaky testcases. --- .../test/emqx_ds_replication_SUITE.erl | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 96edd1043..66d94225e 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -131,7 +131,6 @@ t_replication_transfers_snapshots(Config) -> %% Initialize DB on all nodes and wait for it to be online. Opts = opts(Config, #{n_shards => 1, n_sites => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% Stop the DB on the "offline" node. ?wait_async_action( @@ -207,7 +206,6 @@ t_rebalance(Config) -> %% 1. Initialize DB on the first node. Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% 1.1 Kick all sites except S1 from the replica set as %% the initial condition: @@ -419,7 +417,6 @@ t_rebalance_chaotic_converges(Config) -> %% Open DB: assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), %% Kick N3 from the replica set as the initial condition: ?assertMatch( @@ -503,7 +500,6 @@ t_rebalance_offline_restarts(Config) -> %% Initialize DB on all 3 nodes. Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), assert_db_open(Nodes, ?DB, Opts), - assert_db_stable(Nodes, ?DB), ?retry( 1000, @@ -845,13 +841,11 @@ t_crash_restart_recover(Config) -> ?check_trace( begin %% Initialize DB on all nodes. - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts]) - ), + assert_db_open(Nodes, ?DB, DBOpts), %% Apply the test events, including simulated node crashes. NodeStream = emqx_utils_stream:const(N1), + StartedAt = erlang:monotonic_time(millisecond), emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), %% It's expected to lose few messages when leaders are abruptly killed. @@ -865,6 +859,10 @@ t_crash_restart_recover(Config) -> ct:pal("Some messages were lost: ~p", [LostMessages]), ?assert(length(LostMessages) < NMsgs div 20), + %% Wait until crashed nodes are ready. + SinceStarted = erlang:monotonic_time(millisecond) - StartedAt, + wait_db_bootstrapped([N2, N3], ?DB, infinity, SinceStarted), + %% Verify that all the successfully persisted messages are there. VerifyClient = fun({ClientId, ExpectedStream}) -> Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), @@ -926,7 +924,8 @@ assert_db_open(Nodes, DB, Opts) -> ?assertEqual( [{ok, ok} || _ <- Nodes], erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) - ). + ), + wait_db_bootstrapped(Nodes, ?DB). assert_db_stable([Node | _], DB) -> Shards = ds_repl_meta(Node, shards, [DB]), @@ -935,6 +934,32 @@ assert_db_stable([Node | _], DB) -> db_leadership(Node, DB, Shards) ). +wait_db_bootstrapped(Nodes, DB) -> + wait_db_bootstrapped(Nodes, DB, infinity, infinity). + +wait_db_bootstrapped(Nodes, DB, Timeout, BackInTime) -> + SRefs = [ + snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := emqx_ds_replshard_bootstrapped, + ?snk_meta := #{node := Node}, + db := DB, + shard := Shard + }), + 1, + Timeout, + BackInTime + ) + || Node <- Nodes, + Shard <- ds_repl_meta(Node, my_shards, [DB]) + ], + lists:foreach( + fun({ok, SRef}) -> + ?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)) + end, + SRefs + ). + %% db_leadership(Node, DB, Shards) ->