diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 7afeb9d26..cfc2b7c81 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -23,6 +23,9 @@ -export([n_shards/1]). -export([shard_meta/2]). +%% Maintenace purposes: +-export([trigger_transitions/1]). + -behaviour(gen_server). -export([ init/1, @@ -52,10 +55,16 @@ %% +-record(trigger_transitions, {}). + -spec start_link(emqx_ds:db()) -> {ok, pid()}. start_link(DB) -> gen_server:start_link(?MODULE, DB, []). +-spec trigger_transitions(pid()) -> ok. +trigger_transitions(Pid) -> + gen_server:cast(Pid, #trigger_transitions{}). + -spec n_shards(emqx_ds:db()) -> non_neg_integer(). n_shards(DB) -> Meta = persistent_term:get(?db_meta(DB)), @@ -96,6 +105,8 @@ handle_call(_Call, _From, State) -> {reply, ignored, State}. -spec handle_cast(_Cast, state()) -> {noreply, state()}. +handle_cast(#trigger_transitions{}, State) -> + {noreply, handle_pending_transitions(State)}; handle_cast(_Cast, State) -> {noreply, State}. @@ -125,11 +136,14 @@ terminate(_Reason, #{}) -> %% -handle_allocate_shards(State) -> - case allocate_shards(State) of - {ok, NState} -> +handle_allocate_shards(State0) -> + case allocate_shards(State0) of + {ok, State} -> + %% NOTE + %% Subscribe to shard changes and trigger any yet unhandled transitions. ok = subscribe_db_changes(State), - NState; + ok = trigger_transitions(self()), + State; {error, Data} -> _ = logger:notice( Data#{ @@ -138,7 +152,7 @@ handle_allocate_shards(State) -> } ), _TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate), - State + State0 end. subscribe_db_changes(#{db := DB}) -> @@ -153,6 +167,15 @@ handle_shard_changed(Shard, State = #{db := DB}) -> ok = save_shard_meta(DB, Shard), handle_shard_transitions(Shard, next_transitions(DB, Shard), State). +handle_pending_transitions(State = #{db := DB, shards := Shards}) -> + lists:foldl( + fun(Shard, StateAcc) -> + handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc) + end, + State, + Shards + ). + next_transitions(DB, Shard) -> emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).