From 466fa41ec311a47ce9a1d8784c08f177249f354c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 17 Jul 2024 19:24:38 +0200 Subject: [PATCH] fix(dsraft): rely on last resort timeout with unresponsive replicas This simplifies the shard transition scheduling logic and makes it less prone to races. --- .../emqx_ds_replication_shard_allocator.erl | 55 +++++-------------- 1 file changed, 15 insertions(+), 40 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl index 5984f75d9..6ed28a8b2 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_shard_allocator.erl @@ -32,13 +32,12 @@ -define(TRIGGER_PENDING_TIMEOUT, 60_000). -define(TRANS_RETRY_TIMEOUT, 5_000). --define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -ifdef(TEST). -undef(TRANS_RETRY_TIMEOUT). --undef(REMOVE_REPLICA_DELAY). +-undef(TRIGGER_PENDING_TIMEOUT). -define(TRANS_RETRY_TIMEOUT, 1_000). --define(REMOVE_REPLICA_DELAY, {3_000, 2_000}). +-define(TRIGGER_PENDING_TIMEOUT, 5_000). -endif. %% @@ -155,12 +154,12 @@ unsubscribe_db_changes(_State) -> handle_shard_changed(Shard, State = #{db := DB}) -> ok = save_shard_meta(DB, Shard), - handle_shard_transitions(Shard, next_transitions(DB, Shard), State). + handle_shard_transitions(Shard, local, next_transitions(DB, Shard), State). handle_pending_transitions(State = #{db := DB, shards := Shards}) -> lists:foldl( fun(Shard, StateAcc) -> - handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc) + handle_shard_transitions(Shard, any, next_transitions(DB, Shard), StateAcc) end, State, Shards @@ -169,41 +168,34 @@ handle_pending_transitions(State = #{db := DB, shards := Shards}) -> next_transitions(DB, Shard) -> emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard). -handle_shard_transitions(_Shard, [], State) -> +handle_shard_transitions(_Shard, _, [], State) -> %% We reached the target allocation. State; -handle_shard_transitions(Shard, [Trans | _Rest], State) -> - case transition_handler(Shard, Trans, State) of +handle_shard_transitions(Shard, Scope, [Trans | _Rest], State) -> + case transition_handler(Shard, Scope, Trans, State) of {Track, Handler} -> ensure_transition(Track, Shard, Trans, Handler, State); undefined -> State end. -transition_handler(Shard, Trans, _State = #{db := DB}) -> +transition_handler(Shard, Scope, Trans, _State = #{db := DB}) -> ThisSite = catch emqx_ds_replication_layer_meta:this_site(), case Trans of {add, ThisSite} -> {Shard, {fun trans_claim/4, [fun trans_add_local/3]}}; {del, ThisSite} -> {Shard, {fun trans_claim/4, [fun trans_drop_local/3]}}; - {del, Site} -> + {del, Site} when Scope =:= any -> + %% NOTE + %% Letting the replica handle its own removal first, acting on the + %% transition only when triggered explicitly or by `?TRIGGER_PENDING_TIMEOUT` + %% timer. In other cases `Scope` is `local`. ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), case lists:member(Site, ReplicaSet) of true -> - %% NOTE - %% Let the replica handle its own removal first, but still set - %% up a removal handler after a delay, in case the replica is - %% unresponsive. - Handler = {fun trans_delay/5, [ - ?REMOVE_REPLICA_DELAY, - {fun trans_claim/4, [fun trans_rm_unresponsive/3]} - ]}, - %% NOTE - %% Putting this transition handler on separate "track" so that it - %% won't block any changes with higher priority (e.g. managing - %% local replicas). - {{unresp, Shard}, Handler}; + Handler = {fun trans_claim/4, [fun trans_rm_unresponsive/3]}, + {Shard, Handler}; false -> undefined end; @@ -332,16 +324,6 @@ do_rm_unresponsive(DB, Shard, Site) -> do_rm_unresponsive(DB, Shard, Site) end. -trans_delay(DB, Shard, Trans, Delay, NextHandler) -> - ok = delay(Delay), - %% NOTE: Proceed only if the transition we are going to handle is still desired. - case next_transitions(DB, Shard) of - [Trans | _] -> - apply_handler(NextHandler, DB, Shard, Trans); - _Outdated -> - exit({shutdown, skipped}) - end. - %% ensure_transition(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> @@ -459,10 +441,3 @@ erase_shards_meta(DB, Shards) -> erase_shard_meta(DB, Shard) -> persistent_term:erase(?shard_meta(DB, Shard)). - -%% - -delay({MinDelay, Variance}) -> - timer:sleep(MinDelay + rand:uniform(Variance)); -delay(Delay) -> - timer:sleep(Delay).