fix(dsraft): preserve pending replica set transitions

Otherwise, information about pending replica set transitions taking a
long time to complete could be lost on subsequent target set changes and
node crashes.
This commit is contained in:
Andrew Mayorov 2024-07-16 18:45:22 +02:00
parent d04915d6a6
commit 6b130c6422
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 101 additions and 36 deletions

View File

@ -41,6 +41,7 @@
leave_db_site/2,
assign_db_sites/2,
replica_set_transitions/2,
claim_transition/3,
update_replica_set/3,
db_sites/1,
target_set/2
@ -61,6 +62,7 @@
allocate_shards_trans/1,
assign_db_sites_trans/2,
modify_db_sites_trans/2,
claim_transition_trans/3,
update_replica_set_trans/3,
update_db_config_trans/2,
drop_db_trans/1,
@ -92,6 +94,8 @@
-define(NODE_TAB, emqx_ds_builtin_node_tab).
%% Shard metadata:
-define(SHARD_TAB, emqx_ds_builtin_shard_tab).
%% Membership transitions:
-define(TRANSITION_TAB, emqx_ds_builtin_trans_tab).
-record(?META_TAB, {
db :: emqx_ds:db(),
@ -111,6 +115,13 @@
%% Sites that should contain the data when the cluster is in the
%% stable state (no nodes are being added or removed from it):
target_set :: [site()] | undefined,
% target_set :: [transition() | site()] | undefined,
misc = #{} :: map()
}).
-record(?TRANSITION_TAB, {
shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
transition :: transition(),
misc = #{} :: map()
}).
@ -184,6 +195,7 @@ print_status() ->
eval_qlc(mnesia:table(?NODE_TAB))
),
Shards = eval_qlc(mnesia:table(?SHARD_TAB)),
Transitions = eval_qlc(mnesia:table(?TRANSITION_TAB)),
io:format(
"~nSHARDS:~n~s~s~n",
[string:pad("Shard", 30), "Replicas"]
@ -201,9 +213,10 @@ print_status() ->
),
PendingTransitions = lists:filtermap(
fun(Record = #?SHARD_TAB{shard = DBShard}) ->
case compute_transitions(Record) of
ClaimedTs = [T || T = #?TRANSITION_TAB{shard = S} <- Transitions, S == DBShard],
case compute_transitions(Record, ClaimedTs) of
[] -> false;
Transitions -> {true, {DBShard, Transitions}}
ShardTransitions -> {true, {DBShard, ShardTransitions}}
end
end,
Shards
@ -214,9 +227,9 @@ print_status() ->
[string:pad("Shard", 30), "Transitions"]
),
lists:foreach(
fun({DBShard, Transitions}) ->
fun({DBShard, ShardTransitions}) ->
ShardStr = format_shard(DBShard),
TransStr = string:join(lists:map(fun format_transition/1, Transitions), " "),
TransStr = string:join(lists:map(fun format_transition/1, ShardTransitions), " "),
io:format(
"~s~s~n",
[string:pad(ShardStr, 30), TransStr]
@ -381,21 +394,25 @@ db_sites(DB) ->
replica_set_transitions(DB, Shard) ->
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
[Record] ->
compute_transitions(Record);
PendingTransitions = mnesia:dirty_read(?TRANSITION_TAB, {DB, Shard}),
compute_transitions(Record, PendingTransitions);
[] ->
undefined
end.
%% @doc Claim the intention to start the replica set transition for the given shard.
%% To be called before starting acting on transition, so that information about this
%% will not get lost. Once it finishes, call `update_replica_set/3`.
-spec claim_transition(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) ->
ok | {error, {conflict, transition()} | {outdated, _Expected :: [transition()]}}.
claim_transition(DB, Shard, Trans) ->
transaction(fun ?MODULE:claim_transition_trans/3, [DB, Shard, Trans]).
%% @doc Update the set of replication sites for a shard.
%% To be called after a `transition()` has been conducted successfully.
-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
update_replica_set(DB, Shard, Trans) ->
case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
{error, Reason}
end.
transaction(fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]).
%% @doc Get the current set of replication sites for a shard.
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
@ -556,6 +573,28 @@ modify_db_sites_trans(DB, Modifications) ->
assign_db_sites_trans(DB, Sites)
end.
claim_transition_trans(DB, Shard, Trans) ->
ShardRecord =
case mnesia:read(?SHARD_TAB, {DB, Shard}, read) of
[Record] ->
Record;
[] ->
mnesia:abort({nonexistent_shard, {DB, Shard}})
end,
case mnesia:read(?TRANSITION_TAB, {DB, Shard}, write) of
[#?TRANSITION_TAB{transition = Trans}] ->
ok;
[#?TRANSITION_TAB{transition = Conflict}] ->
mnesia:abort({conflict, Conflict});
[] ->
case compute_transitions(ShardRecord) of
[Trans | _] ->
mnesia:write(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans});
Expected ->
mnesia:abort({outdated, Expected})
end
end.
update_replica_set_trans(DB, Shard, Trans) ->
case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
[Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
@ -570,6 +609,8 @@ update_replica_set_trans(DB, Shard, Trans) ->
TS ->
TargetSet = TS
end,
%% NOTE: Not enforcing existence on that level, makes little sense.
mnesia:delete_object(#?TRANSITION_TAB{shard = {DB, Shard}, transition = Trans}),
mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet});
[] ->
mnesia:abort({nonexistent_shard, {DB, Shard}})
@ -663,6 +704,13 @@ ensure_tables() ->
{record_name, ?SHARD_TAB},
{attributes, record_info(fields, ?SHARD_TAB)}
]),
ok = mria:create_table(?TRANSITION_TAB, [
{rlog_shard, ?SHARD},
{type, bag},
{storage, disc_copies},
{record_name, ?TRANSITION_TAB},
{attributes, record_info(fields, ?TRANSITION_TAB)}
]),
ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
ensure_site() ->
@ -733,12 +781,17 @@ compute_allocation(Shards, Sites, Opts) ->
),
Allocation.
compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) ->
compute_transitions(TargetSet, ReplicaSet).
compute_transitions(Shard, []) ->
compute_transitions(Shard);
compute_transitions(Shard, [#?TRANSITION_TAB{transition = Trans}]) ->
[Trans | lists:delete(Trans, compute_transitions(Shard))].
compute_transitions(undefined, _ReplicaSet) ->
compute_transitions(#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}) ->
do_compute_transitions(TargetSet, ReplicaSet).
do_compute_transitions(undefined, _ReplicaSet) ->
[];
compute_transitions(TargetSet, ReplicaSet) ->
do_compute_transitions(TargetSet, ReplicaSet) ->
Additions = TargetSet -- ReplicaSet,
Deletions = ReplicaSet -- TargetSet,
intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).

View File

@ -32,7 +32,6 @@
-define(TRIGGER_PENDING_TIMEOUT, 60_000).
-define(TRANS_RETRY_TIMEOUT, 5_000).
-define(CRASH_RETRY_DELAY, 20_000).
-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
-ifdef(TEST).
@ -176,7 +175,7 @@ handle_shard_transitions(_Shard, [], State) ->
handle_shard_transitions(Shard, [Trans | _Rest], State) ->
case transition_handler(Shard, Trans, State) of
{Track, Handler} ->
ensure_transition_handler(Track, Shard, Trans, Handler, State);
ensure_transition(Track, Shard, Trans, Handler, State);
undefined ->
State
end.
@ -185,9 +184,9 @@ transition_handler(Shard, Trans, _State = #{db := DB}) ->
ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
case Trans of
{add, ThisSite} ->
{Shard, fun trans_add_local/3};
{Shard, {fun trans_claim/4, [fun trans_add_local/3]}};
{del, ThisSite} ->
{Shard, fun trans_drop_local/3};
{Shard, {fun trans_claim/4, [fun trans_drop_local/3]}};
{del, Site} ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
case lists:member(Site, ReplicaSet) of
@ -198,7 +197,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) ->
%% unresponsive.
Handler = {fun trans_delay/5, [
?REMOVE_REPLICA_DELAY,
fun trans_rm_unresponsive/3
{fun trans_claim/4, [fun trans_rm_unresponsive/3]}
]},
%% NOTE
%% Putting this transition handler on separate "track" so that it
@ -231,6 +230,20 @@ apply_handler({Fun, Args}, DB, Shard, Trans) ->
apply_handler(Fun, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans]).
trans_claim(DB, Shard, Trans, TransHandler) ->
case claim_transition(DB, Shard, Trans) of
ok ->
apply_handler(TransHandler, DB, Shard, Trans);
{error, {outdated, Expected}} ->
?tp(debug, "Transition became outdated", #{
db => DB,
shard => Shard,
trans => Trans,
expected => Expected
}),
exit({shutdown, skipped})
end.
trans_add_local(DB, Shard, {add, Site}) ->
?tp(info, "Adding new local shard replica", #{
site => Site,
@ -331,7 +344,7 @@ trans_delay(DB, Shard, Trans, Delay, NextHandler) ->
%%
ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
case maps:get(Track, Ts, undefined) of
undefined ->
Pid = start_transition_handler(Shard, Trans, Handler, State),
@ -342,6 +355,12 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions :=
State
end.
claim_transition(DB, Shard, Trans) ->
emqx_ds_replication_layer_meta:claim_transition(DB, Shard, Trans).
commit_transition(Shard, Trans, #{db := DB}) ->
emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans).
start_transition_handler(Shard, Trans, Handler, #{db := DB}) ->
proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]).
@ -364,32 +383,25 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
State0
end.
handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) ->
handle_transition_exit(Shard, Trans, normal, State) ->
%% NOTE: This will trigger the next transition if any.
ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans),
ok = commit_transition(Shard, Trans, State),
State;
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
State;
handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) ->
%% NOTE
%% In case of `{add, Site}` transition failure, we have no choice but to retry:
%% no other node can perform the transition and make progress towards the desired
%% state. Assuming `?TRIGGER_PENDING_TIMEOUT` timer will take care of that.
?tp(warning, "Shard membership transition failed", #{
db => DB,
shard => Shard,
transition => Trans,
reason => Reason,
retry_in => ?CRASH_RETRY_DELAY
retry_in => ?TRIGGER_PENDING_TIMEOUT
}),
%% NOTE
%% In case of `{add, Site}` transition failure, we have no choice but to retry:
%% no other node can perform the transition and make progress towards the desired
%% state.
case Trans of
{add, _ThisSite} ->
{Track, Handler} = transition_handler(Shard, Trans, State),
RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]},
ensure_transition_handler(Track, Shard, Trans, RetryHandler, State);
_Another ->
State
end.
State.
%%