fix(dsrepl): retry crashed membership transitions

This commit is contained in:
Andrew Mayorov 2024-04-07 22:35:14 +02:00
parent 826ce5806d
commit 6293efb995
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 120 additions and 51 deletions

View File

@ -79,7 +79,12 @@
n_shards/1 n_shards/1
]). ]).
-export_type([site/0, update_cluster_result/0]). -export_type([
site/0,
transition/0,
subscription_event/0,
update_cluster_result/0
]).
-include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
@ -134,6 +139,10 @@
%% Subject of the subscription: %% Subject of the subscription:
-type subject() :: emqx_ds:db(). -type subject() :: emqx_ds:db().
%% Event for the subscription:
-type subscription_event() ::
{changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}.
%% Peristent term key: %% Peristent term key:
-define(emqx_ds_builtin_site, emqx_ds_builtin_site). -define(emqx_ds_builtin_site, emqx_ds_builtin_site).

View File

@ -40,6 +40,7 @@
-define(ALLOCATE_RETRY_TIMEOUT, 1_000). -define(ALLOCATE_RETRY_TIMEOUT, 1_000).
-define(TRANS_RETRY_TIMEOUT, 5_000). -define(TRANS_RETRY_TIMEOUT, 5_000).
-define(CRASH_RETRY_DELAY, 20_000).
-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
-ifdef(TEST). -ifdef(TEST).
@ -51,9 +52,11 @@
%% %%
-spec start_link(emqx_ds:db()) -> {ok, pid()}.
start_link(DB) -> start_link(DB) ->
gen_server:start_link(?MODULE, DB, []). gen_server:start_link(?MODULE, DB, []).
-spec n_shards(emqx_ds:db()) -> non_neg_integer().
n_shards(DB) -> n_shards(DB) ->
Meta = persistent_term:get(?db_meta(DB)), Meta = persistent_term:get(?db_meta(DB)),
maps:get(n_shards, Meta). maps:get(n_shards, Meta).
@ -63,18 +66,44 @@ shard_meta(DB, Shard) ->
%% %%
-record(transhdl, {
shard :: emqx_ds_replication_layer:shard_id(),
trans :: emqx_ds_replication_layer_meta:transition(),
pid :: pid()
}).
-type state() :: #{
db := emqx_ds:db(),
shards := [emqx_ds_replication_layer:shard_id()],
status := allocating | ready,
transitions := #{_Track => #transhdl{}}
}.
-spec init(emqx_ds:db()) -> {ok, state()}.
init(DB) -> init(DB) ->
_ = erlang:process_flag(trap_exit, true), _ = erlang:process_flag(trap_exit, true),
_ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}), _ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}),
State = #{db => DB, transitions => #{}, status => allocating}, State = #{
db => DB,
shards => [],
status => allocating,
transitions => #{}
},
{ok, handle_allocate_shards(State)}. {ok, handle_allocate_shards(State)}.
-spec handle_call(_Call, _From, state()) -> {reply, ignored, state()}.
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
-spec handle_cast(_Cast, state()) -> {noreply, state()}.
handle_cast(_Cast, State) -> handle_cast(_Cast, State) ->
{noreply, State}. {noreply, State}.
-spec handle_info(Info, state()) -> {noreply, state()} when
Info ::
emqx_ds_replication_layer_meta:subscription_event()
| {timeout, reference(), allocate}
| {'EXIT', pid(), _Reason}.
handle_info({timeout, _TRef, allocate}, State) -> handle_info({timeout, _TRef, allocate}, State) ->
{noreply, handle_allocate_shards(State)}; {noreply, handle_allocate_shards(State)};
handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) ->
@ -86,6 +115,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
-spec terminate(_Reason, state()) -> _Ok.
terminate(_Reason, State = #{db := DB, shards := Shards}) -> terminate(_Reason, State = #{db := DB, shards := Shards}) ->
unsubscribe_db_changes(State), unsubscribe_db_changes(State),
erase_db_meta(DB), erase_db_meta(DB),
@ -121,38 +151,55 @@ unsubscribe_db_changes(_State) ->
handle_shard_changed(Shard, State = #{db := DB}) -> handle_shard_changed(Shard, State = #{db := DB}) ->
ok = save_shard_meta(DB, Shard), ok = save_shard_meta(DB, Shard),
Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), handle_shard_transitions(Shard, next_transitions(DB, Shard), State).
handle_shard_transitions(Shard, Transitions, State).
handle_shard_transitions(Shard, Transitions, State = #{db := DB}) -> next_transitions(DB, Shard) ->
emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).
handle_shard_transitions(_Shard, [], State) ->
%% We reached the target allocation.
State;
handle_shard_transitions(Shard, [Trans | _Rest], State) ->
case transition_handler(Shard, Trans, State) of
{Track, Handler} ->
ensure_transition_handler(Track, Shard, Trans, Handler, State);
undefined ->
State
end.
transition_handler(Shard, Trans, _State = #{db := DB}) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(), ThisSite = emqx_ds_replication_layer_meta:this_site(),
case Transitions of case Trans of
[] -> {add, ThisSite} ->
%% We reached the target allocation. {Shard, fun trans_add_local/3};
State; {del, ThisSite} ->
[Trans = {add, ThisSite} | _Rest] -> {Shard, fun trans_drop_local/3};
ensure_transition_handler(Shard, Trans, fun trans_add_local/3, State); {del, Site} ->
[Trans = {del, ThisSite} | _Rest] ->
ensure_transition_handler(Shard, Trans, fun trans_drop_local/3, State);
[Trans = {del, Site} | _Rest] ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
case lists:member(Site, ReplicaSet) of case lists:member(Site, ReplicaSet) of
true -> true ->
%% NOTE
%% Let the replica handle its own removal first, but still set
%% up a removal handler after a delay, in case the replica is
%% unresponsive.
Handler = {fun trans_delay/5, [
?REMOVE_REPLICA_DELAY,
fun trans_rm_unresponsive/3
]},
%% NOTE %% NOTE
%% Putting this transition handler on separate "track" so that it %% Putting this transition handler on separate "track" so that it
%% won't block any changes with higher priority (e.g. managing %% won't block any changes with higher priority (e.g. managing
%% local replicas). %% local replicas).
Handler = fun trans_rm_unresponsive/3, {_Track = unresp, Handler};
ensure_transition_handler(unresp, Shard, Trans, Handler, State);
false -> false ->
State undefined
end; end;
[_Trans | _Rest] -> _NotOurs ->
%% This site is not involved in the next queued transition. %% This site is not involved in the next queued transition.
State undefined
end. end.
handle_transition(DB, Shard, Trans, Fun) -> handle_transition(DB, Shard, Trans, Handler) ->
logger:set_process_metadata(#{ logger:set_process_metadata(#{
db => DB, db => DB,
shard => Shard, shard => Shard,
@ -162,6 +209,11 @@ handle_transition(DB, Shard, Trans, Fun) ->
dsrepl_shard_transition_begin, dsrepl_shard_transition_begin,
#{shard => Shard, db => DB, transition => Trans, pid => self()} #{shard => Shard, db => DB, transition => Trans, pid => self()}
), ),
apply_handler(Handler, DB, Shard, Trans).
apply_handler({Fun, Args}, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans | Args]);
apply_handler(Fun, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans]). erlang:apply(Fun, [DB, Shard, Trans]).
trans_add_local(DB, Shard, {add, Site}) -> trans_add_local(DB, Shard, {add, Site}) ->
@ -217,18 +269,9 @@ do_drop_local(DB, Shard) ->
do_drop_local(DB, Shard) do_drop_local(DB, Shard)
end. end.
trans_rm_unresponsive(DB, Shard, Trans = {del, Site}) -> trans_rm_unresponsive(DB, Shard, {del, Site}) ->
%% NOTE logger:info(#{msg => "Removing unresponsive shard replica", site => Site}),
%% Let the replica handle its own removal first, thus the delay. do_rm_unresponsive(DB, Shard, Site).
ok = delay(?REMOVE_REPLICA_DELAY),
Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard),
case Transitions of
[Trans | _] ->
logger:info(#{msg => "Removing unresponsive shard replica", site => Site}),
do_rm_unresponsive(DB, Shard, Site);
_Outdated ->
exit({shutdown, skipped})
end.
do_rm_unresponsive(DB, Shard, Site) -> do_rm_unresponsive(DB, Shard, Site) ->
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
@ -245,16 +288,23 @@ do_rm_unresponsive(DB, Shard, Site) ->
do_rm_unresponsive(DB, Shard, Site) do_rm_unresponsive(DB, Shard, Site)
end. end.
%% trans_delay(DB, Shard, Trans, Delay, NextHandler) ->
ok = delay(Delay),
case next_transitions(DB, Shard) of
[Trans | _] ->
apply_handler(NextHandler, DB, Shard, Trans);
_Outdated ->
exit({shutdown, skipped})
end.
ensure_transition_handler(Shard, Trans, Handler, State) -> %%
ensure_transition_handler(Shard, Shard, Trans, Handler, State).
ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
case maps:get(Track, Ts, undefined) of case maps:get(Track, Ts, undefined) of
undefined -> undefined ->
Pid = start_transition_handler(Shard, Trans, Handler, State), Pid = start_transition_handler(Shard, Trans, Handler, State),
State#{transitions := Ts#{Track => {Shard, Trans, Pid}}}; Record = #transhdl{shard = Shard, trans = Trans, pid = Pid},
State#{transitions := Ts#{Track => Record}};
_AlreadyRunning -> _AlreadyRunning ->
%% NOTE: Avoiding multiple transition handlers for the same shard for safety. %% NOTE: Avoiding multiple transition handlers for the same shard for safety.
State State
@ -263,34 +313,42 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions :=
start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> start_transition_handler(Shard, Trans, Handler, #{db := DB}) ->
proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]).
handle_exit(Pid, Reason, State = #{db := DB, transitions := Ts}) -> handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
case maps:to_list(maps:filter(fun(_, {_S, _T, P}) -> P == Pid end, Ts)) of case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of
[{Track, {Shard, Trans, Pid}}] -> [{Track, #transhdl{shard = Shard, trans = Trans}}] ->
?tp( ?tp(
dsrepl_shard_transition_end, dsrepl_shard_transition_end,
#{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason}
), ),
ok = handle_transition_exit(Shard, Trans, Reason, State), State = State0#{transitions := maps:remove(Track, Ts)},
State#{transitions := maps:remove(Track, Ts)}; handle_transition_exit(Shard, Trans, Reason, State);
[] -> [] ->
logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}),
State State0
end. end.
handle_transition_exit(Shard, Trans, normal, _State = #{db := DB}) -> handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) ->
%% NOTE: This will trigger the next transition if any. %% NOTE: This will trigger the next transition if any.
ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans); ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans),
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, _State) -> State;
ok; handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
handle_transition_exit(Shard, Trans, Reason, _State) -> State;
handle_transition_exit(Shard, Trans, Reason, State) ->
%% NOTE
%% In case of `{add, Site}` transition failure, we have no choice but to retry:
%% no other node can perform the transition and make progress towards the desired
%% state. For simplicity, we retry any crashed transition handler after a fixed
%% delay.
logger:warning(#{ logger:warning(#{
msg => "Shard membership transition failed", msg => "Shard membership transition failed",
shard => Shard, shard => Shard,
transition => Trans, transition => Trans,
reason => Reason reason => Reason,
retry_in => ?CRASH_RETRY_DELAY
}), }),
%% FIXME: retry {Track, Handler} = transition_handler(Shard, Trans, State),
ok. RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]},
ensure_transition_handler(Track, Shard, Trans, RetryHandler, State).
%% %%
@ -352,4 +410,6 @@ erase_shard_meta(DB, Shard) ->
%% %%
delay({MinDelay, Variance}) -> delay({MinDelay, Variance}) ->
timer:sleep(MinDelay + rand:uniform(Variance)). timer:sleep(MinDelay + rand:uniform(Variance));
delay(Delay) ->
timer:sleep(Delay).