fix(dsraft): rely on last resort timeout with unresponsive replicas

This simplifies the shard transition scheduling logic and makes it less
prone to races.
This commit is contained in:
Andrew Mayorov 2024-07-17 19:24:38 +02:00
parent 4edbcc55e7
commit 466fa41ec3
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 15 additions and 40 deletions

View File

@ -32,13 +32,12 @@
-define(TRIGGER_PENDING_TIMEOUT, 60_000). -define(TRIGGER_PENDING_TIMEOUT, 60_000).
-define(TRANS_RETRY_TIMEOUT, 5_000). -define(TRANS_RETRY_TIMEOUT, 5_000).
-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
-ifdef(TEST). -ifdef(TEST).
-undef(TRANS_RETRY_TIMEOUT). -undef(TRANS_RETRY_TIMEOUT).
-undef(REMOVE_REPLICA_DELAY). -undef(TRIGGER_PENDING_TIMEOUT).
-define(TRANS_RETRY_TIMEOUT, 1_000). -define(TRANS_RETRY_TIMEOUT, 1_000).
-define(REMOVE_REPLICA_DELAY, {3_000, 2_000}). -define(TRIGGER_PENDING_TIMEOUT, 5_000).
-endif. -endif.
%% %%
@ -155,12 +154,12 @@ unsubscribe_db_changes(_State) ->
handle_shard_changed(Shard, State = #{db := DB}) -> handle_shard_changed(Shard, State = #{db := DB}) ->
ok = save_shard_meta(DB, Shard), ok = save_shard_meta(DB, Shard),
handle_shard_transitions(Shard, next_transitions(DB, Shard), State). handle_shard_transitions(Shard, local, next_transitions(DB, Shard), State).
handle_pending_transitions(State = #{db := DB, shards := Shards}) -> handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
lists:foldl( lists:foldl(
fun(Shard, StateAcc) -> fun(Shard, StateAcc) ->
handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc) handle_shard_transitions(Shard, any, next_transitions(DB, Shard), StateAcc)
end, end,
State, State,
Shards Shards
@ -169,41 +168,34 @@ handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
next_transitions(DB, Shard) -> next_transitions(DB, Shard) ->
emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard). emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).
handle_shard_transitions(_Shard, [], State) -> handle_shard_transitions(_Shard, _, [], State) ->
%% We reached the target allocation. %% We reached the target allocation.
State; State;
handle_shard_transitions(Shard, [Trans | _Rest], State) -> handle_shard_transitions(Shard, Scope, [Trans | _Rest], State) ->
case transition_handler(Shard, Trans, State) of case transition_handler(Shard, Scope, Trans, State) of
{Track, Handler} -> {Track, Handler} ->
ensure_transition(Track, Shard, Trans, Handler, State); ensure_transition(Track, Shard, Trans, Handler, State);
undefined -> undefined ->
State State
end. end.
transition_handler(Shard, Trans, _State = #{db := DB}) -> transition_handler(Shard, Scope, Trans, _State = #{db := DB}) ->
ThisSite = catch emqx_ds_replication_layer_meta:this_site(), ThisSite = catch emqx_ds_replication_layer_meta:this_site(),
case Trans of case Trans of
{add, ThisSite} -> {add, ThisSite} ->
{Shard, {fun trans_claim/4, [fun trans_add_local/3]}}; {Shard, {fun trans_claim/4, [fun trans_add_local/3]}};
{del, ThisSite} -> {del, ThisSite} ->
{Shard, {fun trans_claim/4, [fun trans_drop_local/3]}}; {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}};
{del, Site} -> {del, Site} when Scope =:= any ->
%% NOTE
%% Letting the replica handle its own removal first, acting on the
%% transition only when triggered explicitly or by `?TRIGGER_PENDING_TIMEOUT`
%% timer. In other cases `Scope` is `local`.
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
case lists:member(Site, ReplicaSet) of case lists:member(Site, ReplicaSet) of
true -> true ->
%% NOTE Handler = {fun trans_claim/4, [fun trans_rm_unresponsive/3]},
%% Let the replica handle its own removal first, but still set {Shard, Handler};
%% up a removal handler after a delay, in case the replica is
%% unresponsive.
Handler = {fun trans_delay/5, [
?REMOVE_REPLICA_DELAY,
{fun trans_claim/4, [fun trans_rm_unresponsive/3]}
]},
%% NOTE
%% Putting this transition handler on separate "track" so that it
%% won't block any changes with higher priority (e.g. managing
%% local replicas).
{{unresp, Shard}, Handler};
false -> false ->
undefined undefined
end; end;
@ -332,16 +324,6 @@ do_rm_unresponsive(DB, Shard, Site) ->
do_rm_unresponsive(DB, Shard, Site) do_rm_unresponsive(DB, Shard, Site)
end. end.
trans_delay(DB, Shard, Trans, Delay, NextHandler) ->
ok = delay(Delay),
%% NOTE: Proceed only if the transition we are going to handle is still desired.
case next_transitions(DB, Shard) of
[Trans | _] ->
apply_handler(NextHandler, DB, Shard, Trans);
_Outdated ->
exit({shutdown, skipped})
end.
%% %%
ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
@ -459,10 +441,3 @@ erase_shards_meta(DB, Shards) ->
erase_shard_meta(DB, Shard) -> erase_shard_meta(DB, Shard) ->
persistent_term:erase(?shard_meta(DB, Shard)). persistent_term:erase(?shard_meta(DB, Shard)).
%%
delay({MinDelay, Variance}) ->
timer:sleep(MinDelay + rand:uniform(Variance));
delay(Delay) ->
timer:sleep(Delay).