From dcde30c38a98ae8112b4e8f71b41e0d9a94508f4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 13:22:31 +0200 Subject: [PATCH] test(dsrepl): add two more testcases for rebalancing --- .../src/emqx_ds_replication_layer_meta.erl | 3 + .../emqx_ds_replication_shard_allocator.erl | 4 + .../test/emqx_ds_replication_SUITE.erl | 164 +++++++++++++++++- 3 files changed, 164 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 31ed62fcb..dca2442b8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -134,6 +134,7 @@ ok | {error, {nonexistent_db, emqx_ds:db()}} | {error, {nonexistent_sites, [site()]}} + | {error, {too_few_sites, [site()]}} | {error, _}. %% Subject of the subscription: @@ -452,6 +453,8 @@ allocate_shards_trans(DB) -> assign_db_sites_trans(DB, Sites) -> Opts = db_config_trans(DB), case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of + [] when length(Sites) == 0 -> + mnesia:abort({too_few_sites, Sites}); [] -> ok; NonexistentSites -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 363a453d6..2c9cc44fa 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -323,6 +323,10 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> State = State0#{transitions := maps:remove(Track, Ts)}, handle_transition_exit(Shard, Trans, Reason, State); [] -> + %% NOTE + %% Actually, it's sort of expected to have a portion of exit signals here, + %% because of `mria:with_middleman/3`. But it's impossible to tell them apart + %% from other singals. logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), State0 end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 872169765..6a2c36b30 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -127,22 +127,22 @@ t_replication_transfers_snapshots(Config) -> MessagesOffline ). -t_replication_rebalance(init, Config) -> +t_rebalance(init, Config) -> Apps = [appspec(emqx_durable_storage)], Nodes = emqx_cth_cluster:start( [ - {t_replication_rebalance1, #{apps => Apps}}, - {t_replication_rebalance2, #{apps => Apps}}, - {t_replication_rebalance3, #{apps => Apps}}, - {t_replication_rebalance4, #{apps => Apps}} + {t_rebalance1, #{apps => Apps}}, + {t_rebalance2, #{apps => Apps}}, + {t_rebalance3, #{apps => Apps}}, + {t_rebalance4, #{apps => Apps}} ], #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} ), [{nodes, Nodes} | Config]; -t_replication_rebalance('end', Config) -> +t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). -t_replication_rebalance(Config) -> +t_rebalance(Config) -> NMsgs = 800, NClients = 5, Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), @@ -243,6 +243,156 @@ t_replication_rebalance(Config) -> ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), ?assertEqual(Messages, MessagesN3). +t_join_leave_errors(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_join_leave_errors1, #{apps => Apps}}, + {t_join_leave_errors2, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_join_leave_errors('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_join_leave_errors(Config) -> + [N1, N2] = ?config(nodes, Config), + + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), + ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])), + + [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], + + ?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])), + + %% Attempts to join a nonexistent DB / site. + ?assertEqual( + {error, {nonexistent_db, boo}}, + ds_repl_meta(N1, join_db_site, [_DB = boo, S1]) + ), + ?assertEqual( + {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, + ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>]) + ), + %% NOTE: Leaving a non-existent site is not an error. + ?assertEqual( + ok, + ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>]) + ), + + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), + ?assertEqual([], transitions(N1, ?DB)), + + %% Impossible to leave the last site. + ?assertEqual( + {error, {too_few_sites, []}}, + ds_repl_meta(N1, leave_db_site, [?DB, S1]) + ), + + %% "Move" the DB to the other node. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), + ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertMatch([_ | _], transitions(N1, ?DB)), + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertEqual([], transitions(N1, ?DB)). + +t_rebalance_chaotic_converges(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_rebalance_chaotic_converges1, #{apps => Apps}}, + {t_rebalance_chaotic_converges2, #{apps => Apps}}, + {t_rebalance_chaotic_converges3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_rebalance_chaotic_converges('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance_chaotic_converges(Config) -> + NMsgs = 500, + Nodes = [N1, N2, N3] = ?config(nodes, Config), + + %% Initialize DB on first two nodes. + Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), + + %% Open DB on the last node. + ?assertEqual( + ok, + erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + ), + + %% Find out which sites there are. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Initially, the DB is assigned to [S1, S2]. + ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), + ?assertEqual( + lists:sort([S1, S2]), + ds_repl_meta(N1, db_sites, [?DB]) + ), + + %% Fill the storage with messages and few additional generations. + Messages0 = lists:append([ + fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), + fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}), + fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>}) + ]), + + %% Construct a chaotic transition sequence that changes assignment to [S2, S3]. + Sequence = [ + {N1, join_db_site, S3}, + {N2, leave_db_site, S2}, + {N3, leave_db_site, S1}, + {N1, join_db_site, S2}, + {N2, join_db_site, S1}, + {N3, leave_db_site, S3}, + {N1, leave_db_site, S1}, + {N2, join_db_site, S3} + ], + + %% Apply the sequence while also filling the storage with messages. + TransitionMessages = lists:map( + fun({N, Transition, Site}) -> + %% Apply the transition. + ?assertEqual(ok, ds_repl_meta(N, Transition, [?DB, Site])), + %% Give some time for at least one transition to complete. + Transitions = transitions(N, ?DB), + ct:pal("Transitions after ~p: ~p", [N, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), + %% Fill the storage with messages. + CID = integer_to_binary(erlang:system_time()), + fill_storage(N, ?DB, NMsgs, #{client_id => CID}) + end, + Sequence + ), + + %% Wait for the last transition to complete. + ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + + ?assertEqual( + lists:sort([S2, S3]), + ds_repl_meta(N1, db_sites, [?DB]) + ), + + %% Check that all messages are still there. + Messages = lists:append(TransitionMessages) ++ Messages0, + MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), + ?assertEqual(Messages, MessagesDB). + %% shard_server_info(Node, DB, Shard, Site, Info) ->