From 205ad507ea90d62dfb5b0e314d824f5a237da0ba Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 12 Jul 2024 15:26:00 +0200 Subject: [PATCH 1/8] test(dsraft): attempt to ensure testcases start from stable state Where "stable state" is currently defined as "everyone knows and agrees on the current leader". --- .../test/emqx_ds_replication_SUITE.erl | 45 ++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 9b53bddff..146b69e2b 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -281,7 +281,7 @@ t_rebalance(Config) -> %% Verify that the set of shard servers matches the target allocation. Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], ShardServers = [ - shard_server_info(N, ?DB, Shard, Site, readiness) + {{Shard, N}, shard_server_info(N, ?DB, Shard, Site, readiness)} || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), Shard <- Shards ], @@ -416,10 +416,8 @@ t_rebalance_chaotic_converges(Config) -> Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}), %% Open DB: - ?assertEqual( - [{ok, ok}, {ok, ok}, {ok, ok}], - erpc:multicall([N1, N2, N3], emqx_ds, open_db, [?DB, Opts]) - ), + assert_db_open(Nodes, ?DB, Opts), + assert_db_stable(Nodes, ?DB), %% Kick N3 from the replica set as the initial condition: ?assertMatch( @@ -922,12 +920,47 @@ kill_restart_node(Node, Spec, DBOpts) -> %% +assert_db_open(Nodes, DB, Opts) -> + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) + ). + +assert_db_stable(Nodes = [N1 | _], DB) -> + Shards = shards(N1, DB), + ?assertMatch( + _Leadership = [_ | _], + db_leadership(Nodes, DB, Shards) + ). + +%% + +db_leadership(Nodes, DB, Shards) -> + Leadership = [{S, shard_leadership(Nodes, DB, S)} || S <- Shards], + Inconsistent = [SL || SL = {_, Leaders} <- Leadership, map_size(Leaders) > 1], + case Inconsistent of + [] -> + Leadership; + [_ | _] -> + {error, inconsistent, Inconsistent} + end. + +shard_leadership(Nodes, DB, Shard) -> + lists:foldl( + fun(N, Acc) -> Acc#{shard_leader(N, DB, Shard) => N} end, + #{}, + Nodes + ). + +shard_leader(Node, DB, Shard) -> + shard_server_info(Node, DB, Shard, ds_repl_meta(Node, this_site), leader). + shard_server_info(Node, DB, Shard, Site, Info) -> ?ON( Node, begin Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), - {Server, emqx_ds_replication_layer_shard:server_info(Info, Server)} + emqx_ds_replication_layer_shard:server_info(Info, Server) end ). From 70a760850fe58b7b221ae97d7b756143ae189f61 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 12 Jul 2024 15:27:29 +0200 Subject: [PATCH 2/8] chore(dsraft): correct comment spelling errors --- .../src/emqx_ds_replication_layer_shard.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index cdd62d874..e3366ca3e 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -202,10 +202,10 @@ try_servers([], _Fun, _Args) -> add_local_server(DB, Shard) -> %% NOTE %% Adding local server as "promotable" member to the cluster, which means - %% that it will affect quorum until it is promoted to a voter, which in + %% that it won't affect quorum until it is promoted to a voter, which in %% turn happens when the server has caught up sufficiently with the log. %% We also rely on this "membership" to understand when the server's - %% readiness. + %% ready. ShardServers = shard_servers(DB, Shard), LocalServer = local_server(DB, Shard), case server_info(uid, LocalServer) of From 8e8b382ec0e11be79ebe3df8e0b55bc0f089e2f0 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 12 Jul 2024 17:05:16 +0200 Subject: [PATCH 3/8] chore(dsraft): provide more details when replica is unready --- .../src/emqx_ds_replication_layer_shard.erl | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index e3366ca3e..12d621102 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -266,20 +266,20 @@ remove_server(DB, Shard, Server) -> end. -spec server_info - (readiness, server()) -> ready | {unready, _Status, _Membership} | unknown; + (readiness, server()) -> ready | {unready, _Details} | unknown; (leader, server()) -> server() | unknown; (uid, server()) -> _UID :: binary() | unknown. server_info(readiness, Server) -> %% NOTE %% Server is ready if it's either the leader or a follower with voter "membership" %% status (meaning it was promoted after catching up with the log). - case current_leader(Server) of - Server -> + case ra:members(Server) of + {ok, _Servers, Server} -> ready; - Leader when Leader /= unknown -> + {ok, _Servers, Leader} -> member_info(readiness, Server, Leader); - unknown -> - unknown + Error -> + {unready, {leader_unavailable, Error}} end; server_info(leader, Server) -> current_leader(Server); @@ -287,8 +287,13 @@ server_info(uid, Server) -> maps:get(uid, ra_overview(Server), unknown). member_info(readiness, Server, Leader) -> - Cluster = maps:get(cluster, ra_overview(Leader), #{}), - member_readiness(maps:get(Server, Cluster, #{})). + case ra:member_overview(Leader) of + {ok, Overview = #{}, _Leader} -> + Cluster = maps:get(cluster, Overview, #{}), + member_readiness(maps:get(Server, Cluster, #{})); + Error -> + {unready, {leader_overview_unavailable, Error}} + end. current_leader(Server) -> %% NOTE: This call will block until the leader is known, or until the timeout. @@ -304,7 +309,7 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} normal when Membership =:= voter -> ready; _Other -> - {unready, Status, Membership} + {unready, {catching_up, Status, Membership}} end; member_readiness(#{}) -> unknown. From af81800aec2b97554652eaf068bcff6c0902a73c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 12 Jul 2024 18:24:58 +0200 Subject: [PATCH 4/8] chore(dsraft): log a bit more informative messages in shard allocator --- .../emqx_ds_replication_shard_allocator.erl | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl index fa6814572..699237227 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl @@ -232,8 +232,7 @@ apply_handler(Fun, DB, Shard, Trans) -> erlang:apply(Fun, [DB, Shard, Trans]). trans_add_local(DB, Shard, {add, Site}) -> - logger:info(#{ - msg => "Adding new local shard replica", + ?tp(info, "Adding new local shard replica", #{ site => Site, db => DB, shard => Shard @@ -246,8 +245,7 @@ do_add_local(membership = Stage, DB, Shard) -> ok -> do_add_local(readiness, DB, Shard); {error, recoverable, Reason} -> - logger:warning(#{ - msg => "Shard membership change failed", + ?tp(warning, "Adding local shard replica failed", #{ db => DB, shard => Shard, reason => Reason, @@ -260,10 +258,9 @@ do_add_local(readiness = Stage, DB, Shard) -> LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of ready -> - logger:info(#{msg => "Local shard replica ready", db => DB, shard => Shard}); + ?tp(info, "Local shard replica ready", #{db => DB, shard => Shard}); Status -> - logger:warning(#{ - msg => "Still waiting for local shard replica to be ready", + ?tp(notice, "Still waiting for local shard replica to be ready", #{ db => DB, shard => Shard, status => Status, @@ -274,8 +271,7 @@ do_add_local(readiness = Stage, DB, Shard) -> end. trans_drop_local(DB, Shard, {del, Site}) -> - logger:info(#{ - msg => "Dropping local shard replica", + ?tp(notice, "Dropping local shard replica", #{ site => Site, db => DB, shard => Shard @@ -287,10 +283,11 @@ do_drop_local(DB, Shard) -> ok -> ok = emqx_ds_builtin_raft_db_sup:stop_shard({DB, Shard}), ok = emqx_ds_storage_layer:drop_shard({DB, Shard}), - logger:info(#{msg => "Local shard replica dropped"}); + ?tp(notice, "Local shard replica dropped", #{db => DB, shard => Shard}); {error, recoverable, Reason} -> - logger:warning(#{ - msg => "Shard membership change failed", + ?tp(warning, "Dropping local shard replica failed", #{ + db => DB, + shard => Shard, reason => Reason, retry_in => ?TRANS_RETRY_TIMEOUT }), @@ -299,8 +296,7 @@ do_drop_local(DB, Shard) -> end. trans_rm_unresponsive(DB, Shard, {del, Site}) -> - logger:info(#{ - msg => "Removing unresponsive shard replica", + ?tp(notice, "Removing unresponsive shard replica", #{ site => Site, db => DB, shard => Shard @@ -311,10 +307,9 @@ do_rm_unresponsive(DB, Shard, Site) -> Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of ok -> - logger:info(#{msg => "Unresponsive shard replica removed", db => DB, shard => Shard}); + ?tp(info, "Unresponsive shard replica removed", #{db => DB, shard => Shard}); {error, recoverable, Reason} -> - logger:warning(#{ - msg => "Shard membership change failed", + ?tp(warning, "Removing shard replica failed", #{ db => DB, shard => Shard, reason => Reason, @@ -376,8 +371,7 @@ handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> State; handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) -> - logger:warning(#{ - msg => "Shard membership transition failed", + ?tp(warning, "Shard membership transition failed", #{ db => DB, shard => Shard, transition => Trans, From 2401a2fb80bdff2d0d03db6928d5f3443fc9f01b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 12 Jul 2024 18:28:24 +0200 Subject: [PATCH 5/8] test(dsraft): run `t_join_leave_errors` case in tracing context --- .../test/emqx_ds_replication_SUITE.erl | 91 ++++++++++--------- 1 file changed, 49 insertions(+), 42 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 146b69e2b..140b4c2d6 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -327,57 +327,64 @@ t_join_leave_errors('end', Config) -> t_join_leave_errors(Config) -> %% This testcase verifies that logical errors arising during handling of %% join/leave operations are reported correctly. + DB = ?FUNCTION_NAME, [N1, N2] = ?config(nodes, Config), - Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), - ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), - ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), - [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], + ?check_trace( + begin + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [DB, Opts])), + ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [DB, Opts])), - ?assertEqual(lists:sort([S1, S2]), lists:sort(ds_repl_meta(N1, db_sites, [?FUNCTION_NAME]))), + [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], - %% Attempts to join a nonexistent DB / site. - ?assertEqual( - {error, {nonexistent_db, boo}}, - ds_repl_meta(N1, join_db_site, [_DB = boo, S1]) - ), - ?assertEqual( - {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, - ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, <<"NO-MANS-SITE">>]) - ), - %% NOTE: Leaving a non-existent site is not an error. - ?assertEqual( - {ok, unchanged}, - ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, <<"NO-MANS-SITE">>]) - ), + ?assertEqual( + lists:sort([S1, S2]), lists:sort(ds_repl_meta(N1, db_sites, [DB])) + ), - %% Should be no-op. - ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, S1])), - ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)), + %% Attempts to join a nonexistent DB / site. + ?assertEqual( + {error, {nonexistent_db, boo}}, + ds_repl_meta(N1, join_db_site, [_DB = boo, S1]) + ), + ?assertEqual( + {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, + ds_repl_meta(N1, join_db_site, [DB, <<"NO-MANS-SITE">>]) + ), + %% NOTE: Leaving a non-existent site is not an error. + ?assertEqual( + {ok, unchanged}, + ds_repl_meta(N1, leave_db_site, [DB, <<"NO-MANS-SITE">>]) + ), - %% Leave S2: - ?assertEqual( - {ok, [S1]}, - ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, S2]) - ), - %% Impossible to leave the last site: - ?assertEqual( - {error, {too_few_sites, []}}, - ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, S1]) - ), + %% Should be no-op. + ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [DB, S1])), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, DB)), - %% "Move" the DB to the other node. - ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, S2])), - ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])), - ?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)), - ?retry( - 1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)) - ), + %% Leave S2: + ?assertEqual( + {ok, [S1]}, + ds_repl_meta(N1, leave_db_site, [DB, S2]) + ), + %% Impossible to leave the last site: + ?assertEqual( + {error, {too_few_sites, []}}, + ds_repl_meta(N1, leave_db_site, [DB, S1]) + ), - %% Should be no-op. - ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])), - ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)). + %% "Move" the DB to the other node. + ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [DB, S2])), + ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [DB, S1])), + ?retry( + 1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, DB)) + ), + + %% Should be no-op. + ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [DB, S1])), + ?assertEqual([], emqx_ds_test_helpers:transitions(N1, DB)) + end, + [] + ). t_rebalance_chaotic_converges(init, Config) -> Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], From 78bb102311855bffb844c7b2656d60a997fba49b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 16 Jul 2024 13:27:36 +0200 Subject: [PATCH 6/8] test(dsraft): attempt to start select testcases from stable state --- .../test/emqx_ds_replication_SUITE.erl | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 140b4c2d6..fc18976e3 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -209,10 +209,8 @@ t_rebalance(Config) -> Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], %% 1. Initialize DB on the first node. Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), - [ - ?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) - || Node <- Nodes - ], + assert_db_open(Nodes, ?DB, Opts), + assert_db_stable(Nodes, ?DB), %% 1.1 Kick all sites except S1 from the replica set as %% the initial condition: @@ -506,10 +504,9 @@ t_rebalance_offline_restarts(Config) -> %% Initialize DB on all 3 nodes. Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), - ?assertEqual( - [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) - ), + assert_db_open(Nodes, ?DB, Opts), + assert_db_stable(Nodes, ?DB), + ?retry( 1000, 5, @@ -933,17 +930,17 @@ assert_db_open(Nodes, DB, Opts) -> erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts]) ). -assert_db_stable(Nodes = [N1 | _], DB) -> - Shards = shards(N1, DB), +assert_db_stable([Node | _], DB) -> + Shards = ds_repl_meta(Node, shards, [DB]), ?assertMatch( _Leadership = [_ | _], - db_leadership(Nodes, DB, Shards) + db_leadership(Node, DB, Shards) ). %% -db_leadership(Nodes, DB, Shards) -> - Leadership = [{S, shard_leadership(Nodes, DB, S)} || S <- Shards], +db_leadership(Node, DB, Shards) -> + Leadership = [{S, shard_leadership(Node, DB, S)} || S <- Shards], Inconsistent = [SL || SL = {_, Leaders} <- Leadership, map_size(Leaders) > 1], case Inconsistent of [] -> @@ -952,15 +949,17 @@ db_leadership(Nodes, DB, Shards) -> {error, inconsistent, Inconsistent} end. -shard_leadership(Nodes, DB, Shard) -> +shard_leadership(Node, DB, Shard) -> + ReplicaSet = ds_repl_meta(Node, replica_set, [DB, Shard]), + Nodes = [ds_repl_meta(Node, node, [Site]) || Site <- ReplicaSet], lists:foldl( - fun(N, Acc) -> Acc#{shard_leader(N, DB, Shard) => N} end, + fun({Site, SN}, Acc) -> Acc#{shard_leader(SN, DB, Shard, Site) => SN} end, #{}, - Nodes + lists:zip(ReplicaSet, Nodes) ). -shard_leader(Node, DB, Shard) -> - shard_server_info(Node, DB, Shard, ds_repl_meta(Node, this_site), leader). +shard_leader(Node, DB, Shard, Site) -> + shard_server_info(Node, DB, Shard, Site, leader). shard_server_info(Node, DB, Shard, Site, Info) -> ?ON( @@ -985,9 +984,6 @@ ds_repl_meta(Node, Fun, Args) -> error(meta_op_failed) end. -shards(Node, DB) -> - erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). - shards_online(Node, DB) -> erpc:call(Node, emqx_ds_builtin_raft_db_sup, which_shards, [DB]). From d04915d6a6f244cb0af484fd6ca0ea7422c4b9d5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 16 Jul 2024 13:31:28 +0200 Subject: [PATCH 7/8] test(dsraft): increase `ra_server` logging level for debugging --- .../test/emqx_ds_replication_SUITE.erl | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index fc18976e3..306bcdf4b 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -51,6 +51,10 @@ appspec(emqx_durable_storage) -> {emqx_durable_storage, #{ before_start => fun snabbkaffe:fix_ct_logging/0, override_env => [{egress_flush_interval, 1}] + }}; +appspec(emqx_ds_builtin_raft) -> + {emqx_ds_builtin_raft, #{ + after_start => fun() -> logger:set_module_level(ra_server, info) end }}. t_metadata(init, Config) -> @@ -98,7 +102,7 @@ t_metadata(_Config) -> end. t_replication_transfers_snapshots(init, Config) -> - Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)], NodeSpecs = emqx_cth_cluster:mk_nodespecs( [ {t_replication_transfers_snapshots1, #{apps => Apps}}, @@ -177,7 +181,7 @@ t_replication_transfers_snapshots(Config) -> ). t_rebalance(init, Config) -> - Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)], Nodes = emqx_cth_cluster:start( [ {t_rebalance1, #{apps => Apps}}, @@ -310,7 +314,7 @@ t_rebalance(Config) -> ). t_join_leave_errors(init, Config) -> - Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)], Nodes = emqx_cth_cluster:start( [ {t_join_leave_errors1, #{apps => Apps}}, @@ -385,7 +389,7 @@ t_join_leave_errors(Config) -> ). t_rebalance_chaotic_converges(init, Config) -> - Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)], Nodes = emqx_cth_cluster:start( [ {t_rebalance_chaotic_converges1, #{apps => Apps}}, @@ -480,7 +484,7 @@ t_rebalance_chaotic_converges(Config) -> ). t_rebalance_offline_restarts(init, Config) -> - Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)], Specs = emqx_cth_cluster:mk_nodespecs( [ {t_rebalance_offline_restarts1, #{apps => Apps}}, @@ -800,7 +804,7 @@ t_store_batch_fail(Config) -> ). t_crash_restart_recover(init, Config) -> - Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)], Specs = emqx_cth_cluster:mk_nodespecs( [ {t_crash_stop_recover1, #{apps => Apps}}, From 6b130c6422b4bb554db94897954062fc9d124a34 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 16 Jul 2024 18:45:22 +0200 Subject: [PATCH 8/8] fix(dsraft): preserve pending replica set transitions Otherwise, information about pending replica set transitions taking a long time to complete could be lost on subsequent target set changes and node crashes. --- .../src/emqx_ds_replication_layer_meta.erl | 83 +++++++++++++++---- .../emqx_ds_replication_shard_allocator.erl | 54 +++++++----- 2 files changed, 101 insertions(+), 36 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl index 2348d7c2d..7bf838e6f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl @@ -41,6 +41,7 @@ leave_db_site/2, assign_db_sites/2, replica_set_transitions/2, + claim_transition/3, update_replica_set/3, db_sites/1, target_set/2 @@ -61,6 +62,7 @@ allocate_shards_trans/1, assign_db_sites_trans/2, modify_db_sites_trans/2, + claim_transition_trans/3, update_replica_set_trans/3, update_db_config_trans/2, drop_db_trans/1, @@ -92,6 +94,8 @@ -define(NODE_TAB, emqx_ds_builtin_node_tab). %% Shard metadata: -define(SHARD_TAB, emqx_ds_builtin_shard_tab). +%% Membership transitions: +-define(TRANSITION_TAB, emqx_ds_builtin_trans_tab). -record(?META_TAB, { db :: emqx_ds:db(), @@ -111,6 +115,13 @@ %% Sites that should contain the data when the cluster is in the %% stable state (no nodes are being added or removed from it): target_set :: [site()] | undefined, + % target_set :: [transition() | site()] | undefined, + misc = #{} :: map() +}). + +-record(?TRANSITION_TAB, { + shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + transition :: transition(), misc = #{} :: map() }). @@ -184,6 +195,7 @@ print_status() -> eval_qlc(mnesia:table(?NODE_TAB)) ), Shards = eval_qlc(mnesia:table(?SHARD_TAB)), + Transitions = eval_qlc(mnesia:table(?TRANSITION_TAB)), io:format( "~nSHARDS:~n~s~s~n", [string:pad("Shard", 30), "Replicas"] @@ -201,9 +213,10 @@ print_status() -> ), PendingTransitions = lists:filtermap( fun(Record = #?SHARD_TAB{shard = DBShard}) -> - case compute_transitions(Record) of + ClaimedTs = [T || T = #?TRANSITION_TAB{shard = S} <- Transitions, S == DBShard], + case compute_transitions(Record, ClaimedTs) of [] -> false; - Transitions -> {true, {DBShard, Transitions}} + ShardTransitions -> {true, {DBShard, ShardTransitions}} end end, Shards @@ -214,9 +227,9 @@ print_status() -> [string:pad("Shard", 30), "Transitions"] ), lists:foreach( - fun({DBShard, Transitions}) -> + fun({DBShard, ShardTransitions}) -> ShardStr = format_shard(DBShard), - TransStr = string:join(lists:map(fun format_transition/1, Transitions), " "), + TransStr = string:join(lists:map(fun format_transition/1, ShardTransitions), " "), io:format( "~s~s~n", [string:pad(ShardStr, 30), TransStr] @@ -381,21 +394,25 @@ db_sites(DB) -> replica_set_transitions(DB, Shard) -> case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of [Record] -> - compute_transitions(Record); + PendingTransitions = mnesia:dirty_read(?TRANSITION_TAB, {DB, Shard}), + compute_transitions(Record, PendingTransitions); [] -> undefined end. +%% @doc Claim the intention to start the replica set transition for the given shard. +%% To be called before starting acting on transition, so that information about this +%% will not get lost. Once it finishes, call `update_replica_set/3`. +-spec claim_transition(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> + ok | {error, {conflict, transition()} | {outdated, _Expected :: [transition()]}}. +claim_transition(DB, Shard, Trans) -> + transaction(fun ?MODULE:claim_transition_trans/3, [DB, Shard, Trans]). + %% @doc Update the set of replication sites for a shard. %% To be called after a `transition()` has been conducted successfully. -spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok. update_replica_set(DB, Shard, Trans) -> - case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. + transaction(fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]). %% @doc Get the current set of replication sites for a shard. -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> @@ -556,6 +573,28 @@ modify_db_sites_trans(DB, Modifications) -> assign_db_sites_trans(DB, Sites) end. +claim_transition_trans(DB, Shard, Trans) -> + ShardRecord = + case mnesia:read(?SHARD_TAB, {DB, Shard}, read) of + [Record] -> + Record; + [] -> + mnesia:abort({nonexistent_shard, {DB, Shard}}) + end, + case mnesia:read(?TRANSITION_TAB, {DB, Shard}, write) of + [#?TRANSITION_TAB{transition = Trans}] -> + ok; + [#?TRANSITION_TAB{transition = Conflict}] -> + mnesia:abort({conflict, Conflict}); + [] -> + case compute_transitions(ShardRecord) of + [Trans | _] -> + mnesia:write(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}); + Expected -> + mnesia:abort({outdated, Expected}) + end + end. + update_replica_set_trans(DB, Shard, Trans) -> case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> @@ -570,6 +609,8 @@ update_replica_set_trans(DB, Shard, Trans) -> TS -> TargetSet = TS end, + %% NOTE: Not enforcing existence on that level, makes little sense. + mnesia:delete_object(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}), mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet}); [] -> mnesia:abort({nonexistent_shard, {DB, Shard}}) @@ -663,6 +704,13 @@ ensure_tables() -> {record_name, ?SHARD_TAB}, {attributes, record_info(fields, ?SHARD_TAB)} ]), + ok = mria:create_table(?TRANSITION_TAB, [ + {rlog_shard, ?SHARD}, + {type, bag}, + {storage, disc_copies}, + {record_name, ?TRANSITION_TAB}, + {attributes, record_info(fields, ?TRANSITION_TAB)} + ]), ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]). ensure_site() -> @@ -733,12 +781,17 @@ compute_allocation(Shards, Sites, Opts) -> ), Allocation. -compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) -> - compute_transitions(TargetSet, ReplicaSet). +compute_transitions(Shard, []) -> + compute_transitions(Shard); +compute_transitions(Shard, [#?TRANSITION_TAB{transition = Trans}]) -> + [Trans | lists:delete(Trans, compute_transitions(Shard))]. -compute_transitions(undefined, _ReplicaSet) -> +compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) -> + do_compute_transitions(TargetSet, ReplicaSet). + +do_compute_transitions(undefined, _ReplicaSet) -> []; -compute_transitions(TargetSet, ReplicaSet) -> +do_compute_transitions(TargetSet, ReplicaSet) -> Additions = TargetSet -- ReplicaSet, Deletions = ReplicaSet -- TargetSet, intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]). diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl index 699237227..5984f75d9 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl @@ -32,7 +32,6 @@ -define(TRIGGER_PENDING_TIMEOUT, 60_000). -define(TRANS_RETRY_TIMEOUT, 5_000). --define(CRASH_RETRY_DELAY, 20_000). -define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -ifdef(TEST). @@ -176,7 +175,7 @@ handle_shard_transitions(_Shard, [], State) -> handle_shard_transitions(Shard, [Trans | _Rest], State) -> case transition_handler(Shard, Trans, State) of {Track, Handler} -> - ensure_transition_handler(Track, Shard, Trans, Handler, State); + ensure_transition(Track, Shard, Trans, Handler, State); undefined -> State end. @@ -185,9 +184,9 @@ transition_handler(Shard, Trans, _State = #{db := DB}) -> ThisSite = catch emqx_ds_replication_layer_meta:this_site(), case Trans of {add, ThisSite} -> - {Shard, fun trans_add_local/3}; + {Shard, {fun trans_claim/4, [fun trans_add_local/3]}}; {del, ThisSite} -> - {Shard, fun trans_drop_local/3}; + {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}}; {del, Site} -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), case lists:member(Site, ReplicaSet) of @@ -198,7 +197,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) -> %% unresponsive. Handler = {fun trans_delay/5, [ ?REMOVE_REPLICA_DELAY, - fun trans_rm_unresponsive/3 + {fun trans_claim/4, [fun trans_rm_unresponsive/3]} ]}, %% NOTE %% Putting this transition handler on separate "track" so that it @@ -231,6 +230,20 @@ apply_handler({Fun, Args}, DB, Shard, Trans) -> apply_handler(Fun, DB, Shard, Trans) -> erlang:apply(Fun, [DB, Shard, Trans]). +trans_claim(DB, Shard, Trans, TransHandler) -> + case claim_transition(DB, Shard, Trans) of + ok -> + apply_handler(TransHandler, DB, Shard, Trans); + {error, {outdated, Expected}} -> + ?tp(debug, "Transition became outdated", #{ + db => DB, + shard => Shard, + trans => Trans, + expected => Expected + }), + exit({shutdown, skipped}) + end. + trans_add_local(DB, Shard, {add, Site}) -> ?tp(info, "Adding new local shard replica", #{ site => Site, @@ -331,7 +344,7 @@ trans_delay(DB, Shard, Trans, Delay, NextHandler) -> %% -ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> +ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> case maps:get(Track, Ts, undefined) of undefined -> Pid = start_transition_handler(Shard, Trans, Handler, State), @@ -342,6 +355,12 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := State end. +claim_transition(DB, Shard, Trans) -> + emqx_ds_replication_layer_meta:claim_transition(DB, Shard, Trans). + +commit_transition(Shard, Trans, #{db := DB}) -> + emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans). + start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). @@ -364,32 +383,25 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> State0 end. -handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> +handle_transition_exit(Shard, Trans, normal, State) -> %% NOTE: This will trigger the next transition if any. - ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans), + ok = commit_transition(Shard, Trans, State), State; handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> State; handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) -> + %% NOTE + %% In case of `{add, Site}` transition failure, we have no choice but to retry: + %% no other node can perform the transition and make progress towards the desired + %% state. Assuming `?TRIGGER_PENDING_TIMEOUT` timer will take care of that. ?tp(warning, "Shard membership transition failed", #{ db => DB, shard => Shard, transition => Trans, reason => Reason, - retry_in => ?CRASH_RETRY_DELAY + retry_in => ?TRIGGER_PENDING_TIMEOUT }), - %% NOTE - %% In case of `{add, Site}` transition failure, we have no choice but to retry: - %% no other node can perform the transition and make progress towards the desired - %% state. - case Trans of - {add, _ThisSite} -> - {Track, Handler} = transition_handler(Shard, Trans, State), - RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, - ensure_transition_handler(Track, Shard, Trans, RetryHandler, State); - _Another -> - State - end. + State. %%