Merge pull request #13462 from keynslug/fix/ci/flaky-ds-raft

fix(dsraft): preserve pending replica set transitions
This commit is contained in:
Andrew Mayorov 2024-07-17 16:11:59 +02:00 committed by GitHub
commit 78fe9304be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 235 additions and 131 deletions

View File

@ -41,6 +41,7 @@
leave_db_site/2, leave_db_site/2,
assign_db_sites/2, assign_db_sites/2,
replica_set_transitions/2, replica_set_transitions/2,
claim_transition/3,
update_replica_set/3, update_replica_set/3,
db_sites/1, db_sites/1,
target_set/2 target_set/2
@ -61,6 +62,7 @@
allocate_shards_trans/1, allocate_shards_trans/1,
assign_db_sites_trans/2, assign_db_sites_trans/2,
modify_db_sites_trans/2, modify_db_sites_trans/2,
claim_transition_trans/3,
update_replica_set_trans/3, update_replica_set_trans/3,
update_db_config_trans/2, update_db_config_trans/2,
drop_db_trans/1, drop_db_trans/1,
@ -92,6 +94,8 @@
-define(NODE_TAB, emqx_ds_builtin_node_tab). -define(NODE_TAB, emqx_ds_builtin_node_tab).
%% Shard metadata: %% Shard metadata:
-define(SHARD_TAB, emqx_ds_builtin_shard_tab). -define(SHARD_TAB, emqx_ds_builtin_shard_tab).
%% Membership transitions:
-define(TRANSITION_TAB, emqx_ds_builtin_trans_tab).
-record(?META_TAB, { -record(?META_TAB, {
db :: emqx_ds:db(), db :: emqx_ds:db(),
@ -111,6 +115,13 @@
%% Sites that should contain the data when the cluster is in the %% Sites that should contain the data when the cluster is in the
%% stable state (no nodes are being added or removed from it): %% stable state (no nodes are being added or removed from it):
target_set :: [site()] | undefined, target_set :: [site()] | undefined,
% target_set :: [transition() | site()] | undefined,
misc = #{} :: map()
}).
-record(?TRANSITION_TAB, {
shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
transition :: transition(),
misc = #{} :: map() misc = #{} :: map()
}). }).
@ -184,6 +195,7 @@ print_status() ->
eval_qlc(mnesia:table(?NODE_TAB)) eval_qlc(mnesia:table(?NODE_TAB))
), ),
Shards = eval_qlc(mnesia:table(?SHARD_TAB)), Shards = eval_qlc(mnesia:table(?SHARD_TAB)),
Transitions = eval_qlc(mnesia:table(?TRANSITION_TAB)),
io:format( io:format(
"~nSHARDS:~n~s~s~n", "~nSHARDS:~n~s~s~n",
[string:pad("Shard", 30), "Replicas"] [string:pad("Shard", 30), "Replicas"]
@ -201,9 +213,10 @@ print_status() ->
), ),
PendingTransitions = lists:filtermap( PendingTransitions = lists:filtermap(
fun(Record = #?SHARD_TAB{shard = DBShard}) -> fun(Record = #?SHARD_TAB{shard = DBShard}) ->
case compute_transitions(Record) of ClaimedTs = [T || T = #?TRANSITION_TAB{shard = S} <- Transitions, S == DBShard],
case compute_transitions(Record, ClaimedTs) of
[] -> false; [] -> false;
Transitions -> {true, {DBShard, Transitions}} ShardTransitions -> {true, {DBShard, ShardTransitions}}
end end
end, end,
Shards Shards
@ -214,9 +227,9 @@ print_status() ->
[string:pad("Shard", 30), "Transitions"] [string:pad("Shard", 30), "Transitions"]
), ),
lists:foreach( lists:foreach(
fun({DBShard, Transitions}) -> fun({DBShard, ShardTransitions}) ->
ShardStr = format_shard(DBShard), ShardStr = format_shard(DBShard),
TransStr = string:join(lists:map(fun format_transition/1, Transitions), " "), TransStr = string:join(lists:map(fun format_transition/1, ShardTransitions), " "),
io:format( io:format(
"~s~s~n", "~s~s~n",
[string:pad(ShardStr, 30), TransStr] [string:pad(ShardStr, 30), TransStr]
@ -381,21 +394,25 @@ db_sites(DB) ->
replica_set_transitions(DB, Shard) -> replica_set_transitions(DB, Shard) ->
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
[Record] -> [Record] ->
compute_transitions(Record); PendingTransitions = mnesia:dirty_read(?TRANSITION_TAB, {DB, Shard}),
compute_transitions(Record, PendingTransitions);
[] -> [] ->
undefined undefined
end. end.
%% @doc Claim the intention to start the replica set transition for the given shard.
%% To be called before starting acting on transition, so that information about this
%% will not get lost. Once it finishes, call `update_replica_set/3`.
-spec claim_transition(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) ->
ok | {error, {conflict, transition()} | {outdated, _Expected :: [transition()]}}.
claim_transition(DB, Shard, Trans) ->
transaction(fun ?MODULE:claim_transition_trans/3, [DB, Shard, Trans]).
%% @doc Update the set of replication sites for a shard. %% @doc Update the set of replication sites for a shard.
%% To be called after a `transition()` has been conducted successfully. %% To be called after a `transition()` has been conducted successfully.
-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok. -spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
update_replica_set(DB, Shard, Trans) -> update_replica_set(DB, Shard, Trans) ->
case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of transaction(fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]).
{atomic, ok} ->
ok;
{aborted, Reason} ->
{error, Reason}
end.
%% @doc Get the current set of replication sites for a shard. %% @doc Get the current set of replication sites for a shard.
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
@ -556,6 +573,28 @@ modify_db_sites_trans(DB, Modifications) ->
assign_db_sites_trans(DB, Sites) assign_db_sites_trans(DB, Sites)
end. end.
claim_transition_trans(DB, Shard, Trans) ->
ShardRecord =
case mnesia:read(?SHARD_TAB, {DB, Shard}, read) of
[Record] ->
Record;
[] ->
mnesia:abort({nonexistent_shard, {DB, Shard}})
end,
case mnesia:read(?TRANSITION_TAB, {DB, Shard}, write) of
[#?TRANSITION_TAB{transition = Trans}] ->
ok;
[#?TRANSITION_TAB{transition = Conflict}] ->
mnesia:abort({conflict, Conflict});
[] ->
case compute_transitions(ShardRecord) of
[Trans | _] ->
mnesia:write(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans});
Expected ->
mnesia:abort({outdated, Expected})
end
end.
update_replica_set_trans(DB, Shard, Trans) -> update_replica_set_trans(DB, Shard, Trans) ->
case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
[Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
@ -570,6 +609,8 @@ update_replica_set_trans(DB, Shard, Trans) ->
TS -> TS ->
TargetSet = TS TargetSet = TS
end, end,
%% NOTE: Not enforcing existence on that level, makes little sense.
mnesia:delete_object(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}),
mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet}); mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet});
[] -> [] ->
mnesia:abort({nonexistent_shard, {DB, Shard}}) mnesia:abort({nonexistent_shard, {DB, Shard}})
@ -663,6 +704,13 @@ ensure_tables() ->
{record_name, ?SHARD_TAB}, {record_name, ?SHARD_TAB},
{attributes, record_info(fields, ?SHARD_TAB)} {attributes, record_info(fields, ?SHARD_TAB)}
]), ]),
ok = mria:create_table(?TRANSITION_TAB, [
{rlog_shard, ?SHARD},
{type, bag},
{storage, disc_copies},
{record_name, ?TRANSITION_TAB},
{attributes, record_info(fields, ?TRANSITION_TAB)}
]),
ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]). ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
ensure_site() -> ensure_site() ->
@ -733,12 +781,17 @@ compute_allocation(Shards, Sites, Opts) ->
), ),
Allocation. Allocation.
compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) -> compute_transitions(Shard, []) ->
compute_transitions(TargetSet, ReplicaSet). compute_transitions(Shard);
compute_transitions(Shard, [#?TRANSITION_TAB{transition = Trans}]) ->
[Trans | lists:delete(Trans, compute_transitions(Shard))].
compute_transitions(undefined, _ReplicaSet) -> compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) ->
do_compute_transitions(TargetSet, ReplicaSet).
do_compute_transitions(undefined, _ReplicaSet) ->
[]; [];
compute_transitions(TargetSet, ReplicaSet) -> do_compute_transitions(TargetSet, ReplicaSet) ->
Additions = TargetSet -- ReplicaSet, Additions = TargetSet -- ReplicaSet,
Deletions = ReplicaSet -- TargetSet, Deletions = ReplicaSet -- TargetSet,
intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]). intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).

View File

@ -202,10 +202,10 @@ try_servers([], _Fun, _Args) ->
add_local_server(DB, Shard) -> add_local_server(DB, Shard) ->
%% NOTE %% NOTE
%% Adding local server as "promotable" member to the cluster, which means %% Adding local server as "promotable" member to the cluster, which means
%% that it will affect quorum until it is promoted to a voter, which in %% that it won't affect quorum until it is promoted to a voter, which in
%% turn happens when the server has caught up sufficiently with the log. %% turn happens when the server has caught up sufficiently with the log.
%% We also rely on this "membership" to understand when the server's %% We also rely on this "membership" to understand when the server's
%% readiness. %% ready.
ShardServers = shard_servers(DB, Shard), ShardServers = shard_servers(DB, Shard),
LocalServer = local_server(DB, Shard), LocalServer = local_server(DB, Shard),
case server_info(uid, LocalServer) of case server_info(uid, LocalServer) of
@ -266,20 +266,20 @@ remove_server(DB, Shard, Server) ->
end. end.
-spec server_info -spec server_info
(readiness, server()) -> ready | {unready, _Status, _Membership} | unknown; (readiness, server()) -> ready | {unready, _Details} | unknown;
(leader, server()) -> server() | unknown; (leader, server()) -> server() | unknown;
(uid, server()) -> _UID :: binary() | unknown. (uid, server()) -> _UID :: binary() | unknown.
server_info(readiness, Server) -> server_info(readiness, Server) ->
%% NOTE %% NOTE
%% Server is ready if it's either the leader or a follower with voter "membership" %% Server is ready if it's either the leader or a follower with voter "membership"
%% status (meaning it was promoted after catching up with the log). %% status (meaning it was promoted after catching up with the log).
case current_leader(Server) of case ra:members(Server) of
Server -> {ok, _Servers, Server} ->
ready; ready;
Leader when Leader /= unknown -> {ok, _Servers, Leader} ->
member_info(readiness, Server, Leader); member_info(readiness, Server, Leader);
unknown -> Error ->
unknown {unready, {leader_unavailable, Error}}
end; end;
server_info(leader, Server) -> server_info(leader, Server) ->
current_leader(Server); current_leader(Server);
@ -287,8 +287,13 @@ server_info(uid, Server) ->
maps:get(uid, ra_overview(Server), unknown). maps:get(uid, ra_overview(Server), unknown).
member_info(readiness, Server, Leader) -> member_info(readiness, Server, Leader) ->
Cluster = maps:get(cluster, ra_overview(Leader), #{}), case ra:member_overview(Leader) of
member_readiness(maps:get(Server, Cluster, #{})). {ok, Overview = #{}, _Leader} ->
Cluster = maps:get(cluster, Overview, #{}),
member_readiness(maps:get(Server, Cluster, #{}));
Error ->
{unready, {leader_overview_unavailable, Error}}
end.
current_leader(Server) -> current_leader(Server) ->
%% NOTE: This call will block until the leader is known, or until the timeout. %% NOTE: This call will block until the leader is known, or until the timeout.
@ -304,7 +309,7 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership}
normal when Membership =:= voter -> normal when Membership =:= voter ->
ready; ready;
_Other -> _Other ->
{unready, Status, Membership} {unready, {catching_up, Status, Membership}}
end; end;
member_readiness(#{}) -> member_readiness(#{}) ->
unknown. unknown.

View File

@ -32,7 +32,6 @@
-define(TRIGGER_PENDING_TIMEOUT, 60_000). -define(TRIGGER_PENDING_TIMEOUT, 60_000).
-define(TRANS_RETRY_TIMEOUT, 5_000). -define(TRANS_RETRY_TIMEOUT, 5_000).
-define(CRASH_RETRY_DELAY, 20_000).
-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
-ifdef(TEST). -ifdef(TEST).
@ -176,7 +175,7 @@ handle_shard_transitions(_Shard, [], State) ->
handle_shard_transitions(Shard, [Trans | _Rest], State) -> handle_shard_transitions(Shard, [Trans | _Rest], State) ->
case transition_handler(Shard, Trans, State) of case transition_handler(Shard, Trans, State) of
{Track, Handler} -> {Track, Handler} ->
ensure_transition_handler(Track, Shard, Trans, Handler, State); ensure_transition(Track, Shard, Trans, Handler, State);
undefined -> undefined ->
State State
end. end.
@ -185,9 +184,9 @@ transition_handler(Shard, Trans, _State = #{db := DB}) ->
ThisSite = catch emqx_ds_replication_layer_meta:this_site(), ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
case Trans of case Trans of
{add, ThisSite} -> {add, ThisSite} ->
{Shard, fun trans_add_local/3}; {Shard, {fun trans_claim/4, [fun trans_add_local/3]}};
{del, ThisSite} -> {del, ThisSite} ->
{Shard, fun trans_drop_local/3}; {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}};
{del, Site} -> {del, Site} ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
case lists:member(Site, ReplicaSet) of case lists:member(Site, ReplicaSet) of
@ -198,7 +197,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) ->
%% unresponsive. %% unresponsive.
Handler = {fun trans_delay/5, [ Handler = {fun trans_delay/5, [
?REMOVE_REPLICA_DELAY, ?REMOVE_REPLICA_DELAY,
fun trans_rm_unresponsive/3 {fun trans_claim/4, [fun trans_rm_unresponsive/3]}
]}, ]},
%% NOTE %% NOTE
%% Putting this transition handler on separate "track" so that it %% Putting this transition handler on separate "track" so that it
@ -231,9 +230,22 @@ apply_handler({Fun, Args}, DB, Shard, Trans) ->
apply_handler(Fun, DB, Shard, Trans) -> apply_handler(Fun, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans]). erlang:apply(Fun, [DB, Shard, Trans]).
trans_claim(DB, Shard, Trans, TransHandler) ->
case claim_transition(DB, Shard, Trans) of
ok ->
apply_handler(TransHandler, DB, Shard, Trans);
{error, {outdated, Expected}} ->
?tp(debug, "Transition became outdated", #{
db => DB,
shard => Shard,
trans => Trans,
expected => Expected
}),
exit({shutdown, skipped})
end.
trans_add_local(DB, Shard, {add, Site}) -> trans_add_local(DB, Shard, {add, Site}) ->
logger:info(#{ ?tp(info, "Adding new local shard replica", #{
msg => "Adding new local shard replica",
site => Site, site => Site,
db => DB, db => DB,
shard => Shard shard => Shard
@ -246,8 +258,7 @@ do_add_local(membership = Stage, DB, Shard) ->
ok -> ok ->
do_add_local(readiness, DB, Shard); do_add_local(readiness, DB, Shard);
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
logger:warning(#{ ?tp(warning, "Adding local shard replica failed", #{
msg => "Shard membership change failed",
db => DB, db => DB,
shard => Shard, shard => Shard,
reason => Reason, reason => Reason,
@ -260,10 +271,9 @@ do_add_local(readiness = Stage, DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard),
case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of
ready -> ready ->
logger:info(#{msg => "Local shard replica ready", db => DB, shard => Shard}); ?tp(info, "Local shard replica ready", #{db => DB, shard => Shard});
Status -> Status ->
logger:warning(#{ ?tp(notice, "Still waiting for local shard replica to be ready", #{
msg => "Still waiting for local shard replica to be ready",
db => DB, db => DB,
shard => Shard, shard => Shard,
status => Status, status => Status,
@ -274,8 +284,7 @@ do_add_local(readiness = Stage, DB, Shard) ->
end. end.
trans_drop_local(DB, Shard, {del, Site}) -> trans_drop_local(DB, Shard, {del, Site}) ->
logger:info(#{ ?tp(notice, "Dropping local shard replica", #{
msg => "Dropping local shard replica",
site => Site, site => Site,
db => DB, db => DB,
shard => Shard shard => Shard
@ -287,10 +296,11 @@ do_drop_local(DB, Shard) ->
ok -> ok ->
ok = emqx_ds_builtin_raft_db_sup:stop_shard({DB, Shard}), ok = emqx_ds_builtin_raft_db_sup:stop_shard({DB, Shard}),
ok = emqx_ds_storage_layer:drop_shard({DB, Shard}), ok = emqx_ds_storage_layer:drop_shard({DB, Shard}),
logger:info(#{msg => "Local shard replica dropped"}); ?tp(notice, "Local shard replica dropped", #{db => DB, shard => Shard});
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
logger:warning(#{ ?tp(warning, "Dropping local shard replica failed", #{
msg => "Shard membership change failed", db => DB,
shard => Shard,
reason => Reason, reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT retry_in => ?TRANS_RETRY_TIMEOUT
}), }),
@ -299,8 +309,7 @@ do_drop_local(DB, Shard) ->
end. end.
trans_rm_unresponsive(DB, Shard, {del, Site}) -> trans_rm_unresponsive(DB, Shard, {del, Site}) ->
logger:info(#{ ?tp(notice, "Removing unresponsive shard replica", #{
msg => "Removing unresponsive shard replica",
site => Site, site => Site,
db => DB, db => DB,
shard => Shard shard => Shard
@ -311,10 +320,9 @@ do_rm_unresponsive(DB, Shard, Site) ->
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of
ok -> ok ->
logger:info(#{msg => "Unresponsive shard replica removed", db => DB, shard => Shard}); ?tp(info, "Unresponsive shard replica removed", #{db => DB, shard => Shard});
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
logger:warning(#{ ?tp(warning, "Removing shard replica failed", #{
msg => "Shard membership change failed",
db => DB, db => DB,
shard => Shard, shard => Shard,
reason => Reason, reason => Reason,
@ -336,7 +344,7 @@ trans_delay(DB, Shard, Trans, Delay, NextHandler) ->
%% %%
ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
case maps:get(Track, Ts, undefined) of case maps:get(Track, Ts, undefined) of
undefined -> undefined ->
Pid = start_transition_handler(Shard, Trans, Handler, State), Pid = start_transition_handler(Shard, Trans, Handler, State),
@ -347,6 +355,12 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions :=
State State
end. end.
claim_transition(DB, Shard, Trans) ->
emqx_ds_replication_layer_meta:claim_transition(DB, Shard, Trans).
commit_transition(Shard, Trans, #{db := DB}) ->
emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans).
start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> start_transition_handler(Shard, Trans, Handler, #{db := DB}) ->
proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]).
@ -369,33 +383,25 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
State0 State0
end. end.
handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> handle_transition_exit(Shard, Trans, normal, State) ->
%% NOTE: This will trigger the next transition if any. %% NOTE: This will trigger the next transition if any.
ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans), ok = commit_transition(Shard, Trans, State),
State; State;
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
State; State;
handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) -> handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) ->
logger:warning(#{ %% NOTE
msg => "Shard membership transition failed", %% In case of `{add, Site}` transition failure, we have no choice but to retry:
%% no other node can perform the transition and make progress towards the desired
%% state. Assuming `?TRIGGER_PENDING_TIMEOUT` timer will take care of that.
?tp(warning, "Shard membership transition failed", #{
db => DB, db => DB,
shard => Shard, shard => Shard,
transition => Trans, transition => Trans,
reason => Reason, reason => Reason,
retry_in => ?CRASH_RETRY_DELAY retry_in => ?TRIGGER_PENDING_TIMEOUT
}), }),
%% NOTE State.
%% In case of `{add, Site}` transition failure, we have no choice but to retry:
%% no other node can perform the transition and make progress towards the desired
%% state.
case Trans of
{add, _ThisSite} ->
{Track, Handler} = transition_handler(Shard, Trans, State),
RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]},
ensure_transition_handler(Track, Shard, Trans, RetryHandler, State);
_Another ->
State
end.
%% %%

View File

@ -51,6 +51,10 @@ appspec(emqx_durable_storage) ->
{emqx_durable_storage, #{ {emqx_durable_storage, #{
before_start => fun snabbkaffe:fix_ct_logging/0, before_start => fun snabbkaffe:fix_ct_logging/0,
override_env => [{egress_flush_interval, 1}] override_env => [{egress_flush_interval, 1}]
}};
appspec(emqx_ds_builtin_raft) ->
{emqx_ds_builtin_raft, #{
after_start => fun() -> logger:set_module_level(ra_server, info) end
}}. }}.
t_metadata(init, Config) -> t_metadata(init, Config) ->
@ -98,7 +102,7 @@ t_metadata(_Config) ->
end. end.
t_replication_transfers_snapshots(init, Config) -> t_replication_transfers_snapshots(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
NodeSpecs = emqx_cth_cluster:mk_nodespecs( NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[ [
{t_replication_transfers_snapshots1, #{apps => Apps}}, {t_replication_transfers_snapshots1, #{apps => Apps}},
@ -177,7 +181,7 @@ t_replication_transfers_snapshots(Config) ->
). ).
t_rebalance(init, Config) -> t_rebalance(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
Nodes = emqx_cth_cluster:start( Nodes = emqx_cth_cluster:start(
[ [
{t_rebalance1, #{apps => Apps}}, {t_rebalance1, #{apps => Apps}},
@ -209,10 +213,8 @@ t_rebalance(Config) ->
Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes], Sites = [S1, S2 | _] = [ds_repl_meta(N, this_site) || N <- Nodes],
%% 1. Initialize DB on the first node. %% 1. Initialize DB on the first node.
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
[ assert_db_open(Nodes, ?DB, Opts),
?assertEqual(ok, ?ON(Node, emqx_ds:open_db(?DB, Opts))) assert_db_stable(Nodes, ?DB),
|| Node <- Nodes
],
%% 1.1 Kick all sites except S1 from the replica set as %% 1.1 Kick all sites except S1 from the replica set as
%% the initial condition: %% the initial condition:
@ -281,7 +283,7 @@ t_rebalance(Config) ->
%% Verify that the set of shard servers matches the target allocation. %% Verify that the set of shard servers matches the target allocation.
Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
ShardServers = [ ShardServers = [
shard_server_info(N, ?DB, Shard, Site, readiness) {{Shard, N}, shard_server_info(N, ?DB, Shard, Site, readiness)}
|| {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
Shard <- Shards Shard <- Shards
], ],
@ -312,7 +314,7 @@ t_rebalance(Config) ->
). ).
t_join_leave_errors(init, Config) -> t_join_leave_errors(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
Nodes = emqx_cth_cluster:start( Nodes = emqx_cth_cluster:start(
[ [
{t_join_leave_errors1, #{apps => Apps}}, {t_join_leave_errors1, #{apps => Apps}},
@ -327,15 +329,20 @@ t_join_leave_errors('end', Config) ->
t_join_leave_errors(Config) -> t_join_leave_errors(Config) ->
%% This testcase verifies that logical errors arising during handling of %% This testcase verifies that logical errors arising during handling of
%% join/leave operations are reported correctly. %% join/leave operations are reported correctly.
DB = ?FUNCTION_NAME,
[N1, N2] = ?config(nodes, Config), [N1, N2] = ?config(nodes, Config),
Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}), Opts = opts(Config, #{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?FUNCTION_NAME, Opts])),
?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?FUNCTION_NAME, Opts])), ?check_trace(
begin
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [DB, Opts])),
?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [DB, Opts])),
[S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]],
?assertEqual(lists:sort([S1, S2]), lists:sort(ds_repl_meta(N1, db_sites, [?FUNCTION_NAME]))), ?assertEqual(
lists:sort([S1, S2]), lists:sort(ds_repl_meta(N1, db_sites, [DB]))
),
%% Attempts to join a nonexistent DB / site. %% Attempts to join a nonexistent DB / site.
?assertEqual( ?assertEqual(
@ -344,43 +351,45 @@ t_join_leave_errors(Config) ->
), ),
?assertEqual( ?assertEqual(
{error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}},
ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, <<"NO-MANS-SITE">>]) ds_repl_meta(N1, join_db_site, [DB, <<"NO-MANS-SITE">>])
), ),
%% NOTE: Leaving a non-existent site is not an error. %% NOTE: Leaving a non-existent site is not an error.
?assertEqual( ?assertEqual(
{ok, unchanged}, {ok, unchanged},
ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, <<"NO-MANS-SITE">>]) ds_repl_meta(N1, leave_db_site, [DB, <<"NO-MANS-SITE">>])
), ),
%% Should be no-op. %% Should be no-op.
?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, S1])), ?assertEqual({ok, unchanged}, ds_repl_meta(N1, join_db_site, [DB, S1])),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)), ?assertEqual([], emqx_ds_test_helpers:transitions(N1, DB)),
%% Leave S2: %% Leave S2:
?assertEqual( ?assertEqual(
{ok, [S1]}, {ok, [S1]},
ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, S2]) ds_repl_meta(N1, leave_db_site, [DB, S2])
), ),
%% Impossible to leave the last site: %% Impossible to leave the last site:
?assertEqual( ?assertEqual(
{error, {too_few_sites, []}}, {error, {too_few_sites, []}},
ds_repl_meta(N1, leave_db_site, [?FUNCTION_NAME, S1]) ds_repl_meta(N1, leave_db_site, [DB, S1])
), ),
%% "Move" the DB to the other node. %% "Move" the DB to the other node.
?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [?FUNCTION_NAME, S2])), ?assertMatch({ok, _}, ds_repl_meta(N1, join_db_site, [DB, S2])),
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])), ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [DB, S1])),
?assertMatch([_ | _], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)),
?retry( ?retry(
1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)) 1000, 20, ?assertEqual([], emqx_ds_test_helpers:transitions(N1, DB))
), ),
%% Should be no-op. %% Should be no-op.
?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [?FUNCTION_NAME, S1])), ?assertMatch({ok, _}, ds_repl_meta(N2, leave_db_site, [DB, S1])),
?assertEqual([], emqx_ds_test_helpers:transitions(N1, ?FUNCTION_NAME)). ?assertEqual([], emqx_ds_test_helpers:transitions(N1, DB))
end,
[]
).
t_rebalance_chaotic_converges(init, Config) -> t_rebalance_chaotic_converges(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
Nodes = emqx_cth_cluster:start( Nodes = emqx_cth_cluster:start(
[ [
{t_rebalance_chaotic_converges1, #{apps => Apps}}, {t_rebalance_chaotic_converges1, #{apps => Apps}},
@ -416,10 +425,8 @@ t_rebalance_chaotic_converges(Config) ->
Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}), Opts = opts(Config, #{n_shards => 16, n_sites => 2, replication_factor => 3}),
%% Open DB: %% Open DB:
?assertEqual( assert_db_open(Nodes, ?DB, Opts),
[{ok, ok}, {ok, ok}, {ok, ok}], assert_db_stable(Nodes, ?DB),
erpc:multicall([N1, N2, N3], emqx_ds, open_db, [?DB, Opts])
),
%% Kick N3 from the replica set as the initial condition: %% Kick N3 from the replica set as the initial condition:
?assertMatch( ?assertMatch(
@ -477,7 +484,7 @@ t_rebalance_chaotic_converges(Config) ->
). ).
t_rebalance_offline_restarts(init, Config) -> t_rebalance_offline_restarts(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
Specs = emqx_cth_cluster:mk_nodespecs( Specs = emqx_cth_cluster:mk_nodespecs(
[ [
{t_rebalance_offline_restarts1, #{apps => Apps}}, {t_rebalance_offline_restarts1, #{apps => Apps}},
@ -501,10 +508,9 @@ t_rebalance_offline_restarts(Config) ->
%% Initialize DB on all 3 nodes. %% Initialize DB on all 3 nodes.
Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}), Opts = opts(Config, #{n_shards => 8, n_sites => 3, replication_factor => 3}),
?assertEqual( assert_db_open(Nodes, ?DB, Opts),
[{ok, ok} || _ <- Nodes], assert_db_stable(Nodes, ?DB),
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry( ?retry(
1000, 1000,
5, 5,
@ -798,7 +804,7 @@ t_store_batch_fail(Config) ->
). ).
t_crash_restart_recover(init, Config) -> t_crash_restart_recover(init, Config) ->
Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], Apps = [appspec(emqx_durable_storage), appspec(emqx_ds_builtin_raft)],
Specs = emqx_cth_cluster:mk_nodespecs( Specs = emqx_cth_cluster:mk_nodespecs(
[ [
{t_crash_stop_recover1, #{apps => Apps}}, {t_crash_stop_recover1, #{apps => Apps}},
@ -922,12 +928,49 @@ kill_restart_node(Node, Spec, DBOpts) ->
%% %%
assert_db_open(Nodes, DB, Opts) ->
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [DB, Opts])
).
assert_db_stable([Node | _], DB) ->
Shards = ds_repl_meta(Node, shards, [DB]),
?assertMatch(
_Leadership = [_ | _],
db_leadership(Node, DB, Shards)
).
%%
db_leadership(Node, DB, Shards) ->
Leadership = [{S, shard_leadership(Node, DB, S)} || S <- Shards],
Inconsistent = [SL || SL = {_, Leaders} <- Leadership, map_size(Leaders) > 1],
case Inconsistent of
[] ->
Leadership;
[_ | _] ->
{error, inconsistent, Inconsistent}
end.
shard_leadership(Node, DB, Shard) ->
ReplicaSet = ds_repl_meta(Node, replica_set, [DB, Shard]),
Nodes = [ds_repl_meta(Node, node, [Site]) || Site <- ReplicaSet],
lists:foldl(
fun({Site, SN}, Acc) -> Acc#{shard_leader(SN, DB, Shard, Site) => SN} end,
#{},
lists:zip(ReplicaSet, Nodes)
).
shard_leader(Node, DB, Shard, Site) ->
shard_server_info(Node, DB, Shard, Site, leader).
shard_server_info(Node, DB, Shard, Site, Info) -> shard_server_info(Node, DB, Shard, Site, Info) ->
?ON( ?ON(
Node, Node,
begin begin
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
{Server, emqx_ds_replication_layer_shard:server_info(Info, Server)} emqx_ds_replication_layer_shard:server_info(Info, Server)
end end
). ).
@ -945,9 +988,6 @@ ds_repl_meta(Node, Fun, Args) ->
error(meta_op_failed) error(meta_op_failed)
end. end.
shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
shards_online(Node, DB) -> shards_online(Node, DB) ->
erpc:call(Node, emqx_ds_builtin_raft_db_sup, which_shards, [DB]). erpc:call(Node, emqx_ds_builtin_raft_db_sup, which_shards, [DB]).