fix(dsrepl): trigger unfinished shard transition upon startup
Also provide a trivial API to trigger them by hand.
This commit is contained in:
parent
75bb7f5cdc
commit
7a836317ac
|
@ -23,6 +23,9 @@
|
|||
-export([n_shards/1]).
|
||||
-export([shard_meta/2]).
|
||||
|
||||
%% Maintenace purposes:
|
||||
-export([trigger_transitions/1]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-export([
|
||||
init/1,
|
||||
|
@ -52,10 +55,16 @@
|
|||
|
||||
%%
|
||||
|
||||
-record(trigger_transitions, {}).
|
||||
|
||||
-spec start_link(emqx_ds:db()) -> {ok, pid()}.
|
||||
start_link(DB) ->
|
||||
gen_server:start_link(?MODULE, DB, []).
|
||||
|
||||
-spec trigger_transitions(pid()) -> ok.
|
||||
trigger_transitions(Pid) ->
|
||||
gen_server:cast(Pid, #trigger_transitions{}).
|
||||
|
||||
-spec n_shards(emqx_ds:db()) -> non_neg_integer().
|
||||
n_shards(DB) ->
|
||||
Meta = persistent_term:get(?db_meta(DB)),
|
||||
|
@ -96,6 +105,8 @@ handle_call(_Call, _From, State) ->
|
|||
{reply, ignored, State}.
|
||||
|
||||
-spec handle_cast(_Cast, state()) -> {noreply, state()}.
|
||||
handle_cast(#trigger_transitions{}, State) ->
|
||||
{noreply, handle_pending_transitions(State)};
|
||||
handle_cast(_Cast, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
@ -125,11 +136,14 @@ terminate(_Reason, #{}) ->
|
|||
|
||||
%%
|
||||
|
||||
handle_allocate_shards(State) ->
|
||||
case allocate_shards(State) of
|
||||
{ok, NState} ->
|
||||
handle_allocate_shards(State0) ->
|
||||
case allocate_shards(State0) of
|
||||
{ok, State} ->
|
||||
%% NOTE
|
||||
%% Subscribe to shard changes and trigger any yet unhandled transitions.
|
||||
ok = subscribe_db_changes(State),
|
||||
NState;
|
||||
ok = trigger_transitions(self()),
|
||||
State;
|
||||
{error, Data} ->
|
||||
_ = logger:notice(
|
||||
Data#{
|
||||
|
@ -138,7 +152,7 @@ handle_allocate_shards(State) ->
|
|||
}
|
||||
),
|
||||
_TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate),
|
||||
State
|
||||
State0
|
||||
end.
|
||||
|
||||
subscribe_db_changes(#{db := DB}) ->
|
||||
|
@ -153,6 +167,15 @@ handle_shard_changed(Shard, State = #{db := DB}) ->
|
|||
ok = save_shard_meta(DB, Shard),
|
||||
handle_shard_transitions(Shard, next_transitions(DB, Shard), State).
|
||||
|
||||
handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
|
||||
lists:foldl(
|
||||
fun(Shard, StateAcc) ->
|
||||
handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc)
|
||||
end,
|
||||
State,
|
||||
Shards
|
||||
).
|
||||
|
||||
next_transitions(DB, Shard) ->
|
||||
emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).
|
||||
|
||||
|
|
Loading…
Reference in New Issue