test(dsrepl): test unresponsive nodes removal / node restarts

This commit is contained in:
Andrew Mayorov 2024-04-08 19:58:59 +02:00
parent 7a836317ac
commit 1e95bd4da6
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 77 additions and 5 deletions

View File

@ -50,7 +50,7 @@
-undef(TRANS_RETRY_TIMEOUT).
-undef(REMOVE_REPLICA_DELAY).
-define(TRANS_RETRY_TIMEOUT, 1_000).
-define(REMOVE_REPLICA_DELAY, {4_000, 2_000}).
-define(REMOVE_REPLICA_DELAY, {3_000, 2_000}).
-endif.
%%
@ -213,7 +213,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) ->
%% Putting this transition handler on separate "track" so that it
%% won't block any changes with higher priority (e.g. managing
%% local replicas).
{_Track = unresp, Handler};
{{unresp, Shard}, Handler};
false ->
undefined
end;

View File

@ -143,6 +143,12 @@ t_rebalance('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance(Config) ->
%% This testcase verifies that the storage rebalancing works correctly:
%% 1. Join/leave operations are applied successfully.
%% 2. Message data survives the rebalancing.
%% 3. Shard cluster membership converges to the target replica allocation.
%% 4. Replication factor is respected.
NMsgs = 800,
NClients = 5,
Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
@ -257,6 +263,9 @@ t_join_leave_errors('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_join_leave_errors(Config) ->
%% This testcase verifies that logical errors arising during handling of
%% join/leave operations are reported correctly.
[N1, N2] = ?config(nodes, Config),
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
@ -317,6 +326,10 @@ t_rebalance_chaotic_converges('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance_chaotic_converges(Config) ->
%% This testcase verifies that even a very chaotic sequence of join/leave
%% operations will still be handled consistently, and that the shard
%% allocation will converge to the target state.
NMsgs = 500,
Nodes = [N1, N2, N3] = ?config(nodes, Config),
@ -365,12 +378,12 @@ t_rebalance_chaotic_converges(Config) ->
%% Apply the sequence while also filling the storage with messages.
TransitionMessages = lists:map(
fun({N, Transition, Site}) ->
fun({N, Operation, Site}) ->
%% Apply the transition.
?assertEqual(ok, ds_repl_meta(N, Transition, [?DB, Site])),
?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])),
%% Give some time for at least one transition to complete.
Transitions = transitions(N, ?DB),
ct:pal("Transitions after ~p: ~p", [N, Transitions]),
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))),
%% Fill the storage with messages.
CID = integer_to_binary(erlang:system_time()),
@ -393,6 +406,65 @@ t_rebalance_chaotic_converges(Config) ->
?assertEqual(sample(20, Messages), sample(20, MessagesDB)),
?assertEqual(Messages, MessagesDB).
t_rebalance_offline_restarts(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Specs = emqx_cth_cluster:mk_nodespecs(
[
{t_rebalance_offline_restarts1, #{apps => Apps}},
{t_rebalance_offline_restarts2, #{apps => Apps}},
{t_rebalance_offline_restarts3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
t_rebalance_offline_restarts('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance_offline_restarts(Config) ->
%% This testcase verifies that rebalancing progresses if nodes restart or
%% go offline and never come back.
Nodes = [N1, N2, N3] = ?config(nodes, Config),
_Specs = [NS1, NS2, _] = ?config(nodespecs, Config),
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry(
500,
10,
?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes])
),
%% Find out which sites are there.
Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]),
%% Shut down N3 and then remove it from the DB.
ok = emqx_cth_cluster:stop_node(N3),
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
Transitions = transitions(N1, ?DB),
ct:pal("Transitions: ~p~n", [Transitions]),
%% Wait until at least one transition completes.
?block_until(#{?snk_kind := dsrepl_shard_transition_end}),
%% Restart N1 and N2.
[N1] = emqx_cth_cluster:restart(NS1),
[N2] = emqx_cth_cluster:restart(NS2),
?assertEqual(
[{ok, ok}, {ok, ok}],
erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
),
%% Target state should still be reached eventually.
?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))),
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
%%
shard_server_info(Node, DB, Shard, Site, Info) ->