test(dsrepl): add two more testcases for rebalancing

This commit is contained in:
Andrew Mayorov 2024-04-08 13:22:31 +02:00
parent 2ace9bb893
commit dcde30c38a
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 164 additions and 7 deletions

View File

@ -134,6 +134,7 @@
ok
| {error, {nonexistent_db, emqx_ds:db()}}
| {error, {nonexistent_sites, [site()]}}
| {error, {too_few_sites, [site()]}}
| {error, _}.
%% Subject of the subscription:
@ -452,6 +453,8 @@ allocate_shards_trans(DB) ->
assign_db_sites_trans(DB, Sites) ->
Opts = db_config_trans(DB),
case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
[] when length(Sites) == 0 ->
mnesia:abort({too_few_sites, Sites});
[] ->
ok;
NonexistentSites ->

View File

@ -323,6 +323,10 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
State = State0#{transitions := maps:remove(Track, Ts)},
handle_transition_exit(Shard, Trans, Reason, State);
[] ->
%% NOTE
%% Actually, it's sort of expected to have a portion of exit signals here,
%% because of `mria:with_middleman/3`. But it's impossible to tell them apart
%% from other singals.
logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}),
State0
end.

View File

@ -127,22 +127,22 @@ t_replication_transfers_snapshots(Config) ->
MessagesOffline
).
t_replication_rebalance(init, Config) ->
t_rebalance(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_replication_rebalance1, #{apps => Apps}},
{t_replication_rebalance2, #{apps => Apps}},
{t_replication_rebalance3, #{apps => Apps}},
{t_replication_rebalance4, #{apps => Apps}}
{t_rebalance1, #{apps => Apps}},
{t_rebalance2, #{apps => Apps}},
{t_rebalance3, #{apps => Apps}},
{t_rebalance4, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_replication_rebalance('end', Config) ->
t_rebalance('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_replication_rebalance(Config) ->
t_rebalance(Config) ->
NMsgs = 800,
NClients = 5,
Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
@ -243,6 +243,156 @@ t_replication_rebalance(Config) ->
?assertEqual(sample(20, Messages), sample(20, MessagesN3)),
?assertEqual(Messages, MessagesN3).
t_join_leave_errors(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_join_leave_errors1, #{apps => Apps}},
{t_join_leave_errors2, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_join_leave_errors('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_join_leave_errors(Config) ->
[N1, N2] = ?config(nodes, Config),
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])),
[S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]],
?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])),
%% Attempts to join a nonexistent DB / site.
?assertEqual(
{error, {nonexistent_db, boo}},
ds_repl_meta(N1, join_db_site, [_DB = boo, S1])
),
?assertEqual(
{error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}},
ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>])
),
%% NOTE: Leaving a non-existent site is not an error.
?assertEqual(
ok,
ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>])
),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)),
%% Impossible to leave the last site.
?assertEqual(
{error, {too_few_sites, []}},
ds_repl_meta(N1, leave_db_site, [?DB, S1])
),
%% "Move" the DB to the other node.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch([_ | _], transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)).
t_rebalance_chaotic_converges(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_rebalance_chaotic_converges1, #{apps => Apps}},
{t_rebalance_chaotic_converges2, #{apps => Apps}},
{t_rebalance_chaotic_converges3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance_chaotic_converges('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance_chaotic_converges(Config) ->
NMsgs = 500,
Nodes = [N1, N2, N3] = ?config(nodes, Config),
%% Initialize DB on first two nodes.
Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
?assertEqual(
[{ok, ok}, {ok, ok}],
erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
),
%% Open DB on the last node.
?assertEqual(
ok,
erpc:call(N3, emqx_ds, open_db, [?DB, Opts])
),
%% Find out which sites there are.
Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]),
%% Initially, the DB is assigned to [S1, S2].
?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])),
?assertEqual(
lists:sort([S1, S2]),
ds_repl_meta(N1, db_sites, [?DB])
),
%% Fill the storage with messages and few additional generations.
Messages0 = lists:append([
fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}),
fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}),
fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>})
]),
%% Construct a chaotic transition sequence that changes assignment to [S2, S3].
Sequence = [
{N1, join_db_site, S3},
{N2, leave_db_site, S2},
{N3, leave_db_site, S1},
{N1, join_db_site, S2},
{N2, join_db_site, S1},
{N3, leave_db_site, S3},
{N1, leave_db_site, S1},
{N2, join_db_site, S3}
],
%% Apply the sequence while also filling the storage with messages.
TransitionMessages = lists:map(
fun({N, Transition, Site}) ->
%% Apply the transition.
?assertEqual(ok, ds_repl_meta(N, Transition, [?DB, Site])),
%% Give some time for at least one transition to complete.
Transitions = transitions(N, ?DB),
ct:pal("Transitions after ~p: ~p", [N, Transitions]),
?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))),
%% Fill the storage with messages.
CID = integer_to_binary(erlang:system_time()),
fill_storage(N, ?DB, NMsgs, #{client_id => CID})
end,
Sequence
),
%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
?assertEqual(
lists:sort([S2, S3]),
ds_repl_meta(N1, db_sites, [?DB])
),
%% Check that all messages are still there.
Messages = lists:append(TransitionMessages) ++ Messages0,
MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesDB)),
?assertEqual(Messages, MessagesDB).
%%
shard_server_info(Node, DB, Shard, Site, Info) ->