diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f27fa414e..31ed62fcb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -79,7 +79,12 @@ n_shards/1 ]). --export_type([site/0, update_cluster_result/0]). +-export_type([ + site/0, + transition/0, + subscription_event/0, + update_cluster_result/0 +]). -include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -134,6 +139,10 @@ %% Subject of the subscription: -type subject() :: emqx_ds:db(). +%% Event for the subscription: +-type subscription_event() :: + {changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}. + %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 4113fcedc..363a453d6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -40,6 +40,7 @@ -define(ALLOCATE_RETRY_TIMEOUT, 1_000). -define(TRANS_RETRY_TIMEOUT, 5_000). +-define(CRASH_RETRY_DELAY, 20_000). -define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -ifdef(TEST). @@ -51,9 +52,11 @@ %% +-spec start_link(emqx_ds:db()) -> {ok, pid()}. start_link(DB) -> gen_server:start_link(?MODULE, DB, []). +-spec n_shards(emqx_ds:db()) -> non_neg_integer(). n_shards(DB) -> Meta = persistent_term:get(?db_meta(DB)), maps:get(n_shards, Meta). @@ -63,18 +66,44 @@ shard_meta(DB, Shard) -> %% +-record(transhdl, { + shard :: emqx_ds_replication_layer:shard_id(), + trans :: emqx_ds_replication_layer_meta:transition(), + pid :: pid() +}). + +-type state() :: #{ + db := emqx_ds:db(), + shards := [emqx_ds_replication_layer:shard_id()], + status := allocating | ready, + transitions := #{_Track => #transhdl{}} +}. + +-spec init(emqx_ds:db()) -> {ok, state()}. init(DB) -> _ = erlang:process_flag(trap_exit, true), _ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}), - State = #{db => DB, transitions => #{}, status => allocating}, + State = #{ + db => DB, + shards => [], + status => allocating, + transitions => #{} + }, {ok, handle_allocate_shards(State)}. +-spec handle_call(_Call, _From, state()) -> {reply, ignored, state()}. handle_call(_Call, _From, State) -> {reply, ignored, State}. +-spec handle_cast(_Cast, state()) -> {noreply, state()}. handle_cast(_Cast, State) -> {noreply, State}. +-spec handle_info(Info, state()) -> {noreply, state()} when + Info :: + emqx_ds_replication_layer_meta:subscription_event() + | {timeout, reference(), allocate} + | {'EXIT', pid(), _Reason}. handle_info({timeout, _TRef, allocate}, State) -> {noreply, handle_allocate_shards(State)}; handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> @@ -86,6 +115,7 @@ handle_info({'EXIT', Pid, Reason}, State) -> handle_info(_Info, State) -> {noreply, State}. +-spec terminate(_Reason, state()) -> _Ok. terminate(_Reason, State = #{db := DB, shards := Shards}) -> unsubscribe_db_changes(State), erase_db_meta(DB), @@ -121,38 +151,55 @@ unsubscribe_db_changes(_State) -> handle_shard_changed(Shard, State = #{db := DB}) -> ok = save_shard_meta(DB, Shard), - Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), - handle_shard_transitions(Shard, Transitions, State). + handle_shard_transitions(Shard, next_transitions(DB, Shard), State). -handle_shard_transitions(Shard, Transitions, State = #{db := DB}) -> +next_transitions(DB, Shard) -> + emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard). + +handle_shard_transitions(_Shard, [], State) -> + %% We reached the target allocation. + State; +handle_shard_transitions(Shard, [Trans | _Rest], State) -> + case transition_handler(Shard, Trans, State) of + {Track, Handler} -> + ensure_transition_handler(Track, Shard, Trans, Handler, State); + undefined -> + State + end. + +transition_handler(Shard, Trans, _State = #{db := DB}) -> ThisSite = emqx_ds_replication_layer_meta:this_site(), - case Transitions of - [] -> - %% We reached the target allocation. - State; - [Trans = {add, ThisSite} | _Rest] -> - ensure_transition_handler(Shard, Trans, fun trans_add_local/3, State); - [Trans = {del, ThisSite} | _Rest] -> - ensure_transition_handler(Shard, Trans, fun trans_drop_local/3, State); - [Trans = {del, Site} | _Rest] -> + case Trans of + {add, ThisSite} -> + {Shard, fun trans_add_local/3}; + {del, ThisSite} -> + {Shard, fun trans_drop_local/3}; + {del, Site} -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), case lists:member(Site, ReplicaSet) of true -> + %% NOTE + %% Let the replica handle its own removal first, but still set + %% up a removal handler after a delay, in case the replica is + %% unresponsive. + Handler = {fun trans_delay/5, [ + ?REMOVE_REPLICA_DELAY, + fun trans_rm_unresponsive/3 + ]}, %% NOTE %% Putting this transition handler on separate "track" so that it %% won't block any changes with higher priority (e.g. managing %% local replicas). - Handler = fun trans_rm_unresponsive/3, - ensure_transition_handler(unresp, Shard, Trans, Handler, State); + {_Track = unresp, Handler}; false -> - State + undefined end; - [_Trans | _Rest] -> + _NotOurs -> %% This site is not involved in the next queued transition. - State + undefined end. -handle_transition(DB, Shard, Trans, Fun) -> +handle_transition(DB, Shard, Trans, Handler) -> logger:set_process_metadata(#{ db => DB, shard => Shard, @@ -162,6 +209,11 @@ handle_transition(DB, Shard, Trans, Fun) -> dsrepl_shard_transition_begin, #{shard => Shard, db => DB, transition => Trans, pid => self()} ), + apply_handler(Handler, DB, Shard, Trans). + +apply_handler({Fun, Args}, DB, Shard, Trans) -> + erlang:apply(Fun, [DB, Shard, Trans | Args]); +apply_handler(Fun, DB, Shard, Trans) -> erlang:apply(Fun, [DB, Shard, Trans]). trans_add_local(DB, Shard, {add, Site}) -> @@ -217,18 +269,9 @@ do_drop_local(DB, Shard) -> do_drop_local(DB, Shard) end. -trans_rm_unresponsive(DB, Shard, Trans = {del, Site}) -> - %% NOTE - %% Let the replica handle its own removal first, thus the delay. - ok = delay(?REMOVE_REPLICA_DELAY), - Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), - case Transitions of - [Trans | _] -> - logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), - do_rm_unresponsive(DB, Shard, Site); - _Outdated -> - exit({shutdown, skipped}) - end. +trans_rm_unresponsive(DB, Shard, {del, Site}) -> + logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), + do_rm_unresponsive(DB, Shard, Site). do_rm_unresponsive(DB, Shard, Site) -> Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), @@ -245,16 +288,23 @@ do_rm_unresponsive(DB, Shard, Site) -> do_rm_unresponsive(DB, Shard, Site) end. -%% +trans_delay(DB, Shard, Trans, Delay, NextHandler) -> + ok = delay(Delay), + case next_transitions(DB, Shard) of + [Trans | _] -> + apply_handler(NextHandler, DB, Shard, Trans); + _Outdated -> + exit({shutdown, skipped}) + end. -ensure_transition_handler(Shard, Trans, Handler, State) -> - ensure_transition_handler(Shard, Shard, Trans, Handler, State). +%% ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> case maps:get(Track, Ts, undefined) of undefined -> Pid = start_transition_handler(Shard, Trans, Handler, State), - State#{transitions := Ts#{Track => {Shard, Trans, Pid}}}; + Record = #transhdl{shard = Shard, trans = Trans, pid = Pid}, + State#{transitions := Ts#{Track => Record}}; _AlreadyRunning -> %% NOTE: Avoiding multiple transition handlers for the same shard for safety. State @@ -263,34 +313,42 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). -handle_exit(Pid, Reason, State = #{db := DB, transitions := Ts}) -> - case maps:to_list(maps:filter(fun(_, {_S, _T, P}) -> P == Pid end, Ts)) of - [{Track, {Shard, Trans, Pid}}] -> +handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> + case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of + [{Track, #transhdl{shard = Shard, trans = Trans}}] -> ?tp( dsrepl_shard_transition_end, #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} ), - ok = handle_transition_exit(Shard, Trans, Reason, State), - State#{transitions := maps:remove(Track, Ts)}; + State = State0#{transitions := maps:remove(Track, Ts)}, + handle_transition_exit(Shard, Trans, Reason, State); [] -> logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), - State + State0 end. -handle_transition_exit(Shard, Trans, normal, _State = #{db := DB}) -> +handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> %% NOTE: This will trigger the next transition if any. - ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans); -handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, _State) -> - ok; -handle_transition_exit(Shard, Trans, Reason, _State) -> + ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans), + State; +handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> + State; +handle_transition_exit(Shard, Trans, Reason, State) -> + %% NOTE + %% In case of `{add, Site}` transition failure, we have no choice but to retry: + %% no other node can perform the transition and make progress towards the desired + %% state. For simplicity, we retry any crashed transition handler after a fixed + %% delay. logger:warning(#{ msg => "Shard membership transition failed", shard => Shard, transition => Trans, - reason => Reason + reason => Reason, + retry_in => ?CRASH_RETRY_DELAY }), - %% FIXME: retry - ok. + {Track, Handler} = transition_handler(Shard, Trans, State), + RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, + ensure_transition_handler(Track, Shard, Trans, RetryHandler, State). %% @@ -352,4 +410,6 @@ erase_shard_meta(DB, Shard) -> %% delay({MinDelay, Variance}) -> - timer:sleep(MinDelay + rand:uniform(Variance)). + timer:sleep(MinDelay + rand:uniform(Variance)); +delay(Delay) -> + timer:sleep(Delay).