From 26ddc403c8dc61a96caa01b9ce2bdc09ef192ebc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 5 Aug 2024 16:32:19 +0200 Subject: [PATCH] fix(dsraft): avoid tight loop in shard bootstrap --- .../src/emqx_ds_replication_layer_shard.erl | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index bb0a9c99d..37193eaf4 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -42,6 +42,7 @@ handle_continue/2, handle_call/3, handle_cast/2, + handle_info/2, terminate/2 ]). @@ -54,6 +55,7 @@ | {error, servers_unreachable}. -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). +-define(MAX_BOOSTRAP_RETRY_TIMEOUT, 1_000). -define(PTERM(DB, SHARD, KEY), {?MODULE, DB, SHARD, KEY}). @@ -359,12 +361,17 @@ init({DB, Shard, Opts}) -> }, {ok, St, {continue, bootstrap}}. +handle_continue(bootstrap, St = #st{bootstrapped = true}) -> + {noreply, St}; handle_continue(bootstrap, St0) -> case bootstrap(St0) of St = #st{bootstrapped = true} -> {noreply, St}; St = #st{bootstrapped = false} -> - {noreply, St, {continue, bootstrap}} + {noreply, St, {continue, bootstrap}}; + {retry, Timeout, St} -> + _TRef = erlang:start_timer(Timeout, self(), bootstrap), + {noreply, St} end. handle_call(_Call, _From, State) -> @@ -373,6 +380,9 @@ handle_call(_Call, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({timeout, _TRef, bootstrap}, St) -> + {noreply, St, {continue, bootstrap}}. + terminate(_Reason, {DB, Shard}) -> %% NOTE: Mark as not ready right away. ok = erase_shard_info(DB, Shard), @@ -401,12 +411,18 @@ bootstrap(St = #st{stage = {wait_log, Leader}}) -> St#st{stage = wait_leader} end; bootstrap(St = #st{stage = {wait_log_index, RaftIdx}, db = DB, shard = Shard, server = Server}) -> - case ra_overview(Server) of - #{commit_index := RaftIdx} -> + Overview = ra_overview(Server), + case maps:get(last_applied, Overview, 0) of + LastApplied when LastApplied >= RaftIdx -> ok = announce_shard_ready(DB, Shard), St#st{bootstrapped = true, stage = undefined}; - #{} -> - St + LastApplied -> + %% NOTE + %% Blunt estimate of time shard needs to catch up. If this proves to be too long in + %% practice, it's could be augmented with handling `recover` -> `follower` Ra + %% member state transition. + Timeout = min(RaftIdx - LastApplied, ?MAX_BOOSTRAP_RETRY_TIMEOUT), + {retry, Timeout, St} end. %%