From 1e95bd4da6d04d7426930b6225c0a6c569c67c54 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 19:58:59 +0200 Subject: [PATCH] test(dsrepl): test unresponsive nodes removal / node restarts --- .../emqx_ds_replication_shard_allocator.erl | 4 +- .../test/emqx_ds_replication_SUITE.erl | 78 ++++++++++++++++++- 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index cfc2b7c81..f02335a10 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -50,7 +50,7 @@ -undef(TRANS_RETRY_TIMEOUT). -undef(REMOVE_REPLICA_DELAY). -define(TRANS_RETRY_TIMEOUT, 1_000). --define(REMOVE_REPLICA_DELAY, {4_000, 2_000}). +-define(REMOVE_REPLICA_DELAY, {3_000, 2_000}). -endif. %% @@ -213,7 +213,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) -> %% Putting this transition handler on separate "track" so that it %% won't block any changes with higher priority (e.g. managing %% local replicas). - {_Track = unresp, Handler}; + {{unresp, Shard}, Handler}; false -> undefined end; diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 6a2c36b30..9fc55d170 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -143,6 +143,12 @@ t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_rebalance(Config) -> + %% This testcase verifies that the storage rebalancing works correctly: + %% 1. Join/leave operations are applied successfully. + %% 2. Message data survives the rebalancing. + %% 3. Shard cluster membership converges to the target replica allocation. + %% 4. Replication factor is respected. + NMsgs = 800, NClients = 5, Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), @@ -257,6 +263,9 @@ t_join_leave_errors('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_join_leave_errors(Config) -> + %% This testcase verifies that logical errors arising during handling of + %% join/leave operations are reported correctly. + [N1, N2] = ?config(nodes, Config), Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), @@ -317,6 +326,10 @@ t_rebalance_chaotic_converges('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_rebalance_chaotic_converges(Config) -> + %% This testcase verifies that even a very chaotic sequence of join/leave + %% operations will still be handled consistently, and that the shard + %% allocation will converge to the target state. + NMsgs = 500, Nodes = [N1, N2, N3] = ?config(nodes, Config), @@ -365,12 +378,12 @@ t_rebalance_chaotic_converges(Config) -> %% Apply the sequence while also filling the storage with messages. TransitionMessages = lists:map( - fun({N, Transition, Site}) -> + fun({N, Operation, Site}) -> %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(N, Transition, [?DB, Site])), + ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), %% Give some time for at least one transition to complete. Transitions = transitions(N, ?DB), - ct:pal("Transitions after ~p: ~p", [N, Transitions]), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), %% Fill the storage with messages. CID = integer_to_binary(erlang:system_time()), @@ -393,6 +406,65 @@ t_rebalance_chaotic_converges(Config) -> ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), ?assertEqual(Messages, MessagesDB). +t_rebalance_offline_restarts(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {t_rebalance_offline_restarts1, #{apps => Apps}}, + {t_rebalance_offline_restarts2, #{apps => Apps}}, + {t_rebalance_offline_restarts3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + [{nodes, Nodes}, {nodespecs, Specs} | Config]; +t_rebalance_offline_restarts('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance_offline_restarts(Config) -> + %% This testcase verifies that rebalancing progresses if nodes restart or + %% go offline and never come back. + + Nodes = [N1, N2, N3] = ?config(nodes, Config), + _Specs = [NS1, NS2, _] = ?config(nodespecs, Config), + + %% Initialize DB on all 3 nodes. + Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + ?retry( + 500, + 10, + ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes]) + ), + + %% Find out which sites are there. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Shut down N3 and then remove it from the DB. + ok = emqx_cth_cluster:stop_node(N3), + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), + Transitions = transitions(N1, ?DB), + ct:pal("Transitions: ~p~n", [Transitions]), + + %% Wait until at least one transition completes. + ?block_until(#{?snk_kind := dsrepl_shard_transition_end}), + + %% Restart N1 and N2. + [N1] = emqx_cth_cluster:restart(NS1), + [N2] = emqx_cth_cluster:restart(NS2), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), + + %% Target state should still be reached eventually. + ?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), + ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). + %% shard_server_info(Node, DB, Shard, Site, Info) ->