From a07295d3bc3d4da81d77d5ba2f2e2c91e1cebb09 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Apr 2024 17:34:30 +0200 Subject: [PATCH 01/14] fix(ds): address shards in the supervisor properly --- apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index c521164f4..195db7c34 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -74,7 +74,7 @@ start_egress({DB, Shard}) -> supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)). -spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok. -stop_shard(Shard = {DB, _}) -> +stop_shard({DB, Shard}) -> Sup = ?via(#?shards_sup{db = DB}), ok = supervisor:terminate_child(Sup, Shard), ok = supervisor:delete_child(Sup, Shard). @@ -212,7 +212,7 @@ sup_spec(Id, Options) -> shard_spec(DB, Shard) -> #{ - id => {shard, Shard}, + id => Shard, start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]}, shutdown => infinity, restart => permanent, From d6058b7f51b23a61bbb11c749898b39114f5d4e5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Apr 2024 17:36:57 +0200 Subject: [PATCH 02/14] feat(dsrepl): allow to subscribe to DB metadata changes Currently, only shard metadata changes are announced to the subscribers. --- .../src/emqx_ds_replication_layer_meta.erl | 63 ++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 66029d4ca..f27fa414e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -57,6 +57,12 @@ target_set/2 ]). +%% Subscriptions to changes: +-export([ + subscribe/2, + unsubscribe/1 +]). + %% gen_server -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -125,6 +131,9 @@ | {error, {nonexistent_sites, [site()]}} | {error, _}. +%% Subject of the subscription: +-type subject() :: emqx_ds:db(). + %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). @@ -336,11 +345,21 @@ target_set(DB, Shard) -> undefined end. +%%================================================================================ + +subscribe(Pid, Subject) -> + gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity). + +unsubscribe(Pid) -> + gen_server:call(?SERVER, {unsubscribe, Pid}, infinity). + %%================================================================================ %% behavior callbacks %%================================================================================ --record(s, {}). +-record(s, { + subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}} +}). init([]) -> process_flag(trap_exit, true), @@ -348,14 +367,24 @@ init([]) -> ensure_tables(), ensure_site(), S = #s{}, + {ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}), {ok, S}. +handle_call({subscribe, Pid, Subject}, _From, S) -> + {reply, ok, handle_subscribe(Pid, Subject, S)}; +handle_call({unsubscribe, Pid}, _From, S) -> + {reply, ok, handle_unsubscribe(Pid, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. handle_cast(_Cast, S) -> {noreply, S}. +handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) -> + ok = notify_subscribers(DB, {shard, DB, Shard}, S), + {noreply, S}; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) -> + {noreply, handle_unsubscribe(Pid, S)}; handle_info(_Info, S) -> {noreply, S}. @@ -613,6 +642,38 @@ transaction(Fun, Args) -> {error, Reason} end. +%%==================================================================== + +handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) -> + case maps:is_key(Pid, Subs0) of + false -> + MRef = erlang:monitor(process, Pid), + Subs = Subs0#{Pid => {Subject, MRef}}, + S#s{subs = Subs}; + true -> + S + end. + +handle_unsubscribe(Pid, S = #s{subs = Subs0}) -> + case maps:take(Pid, Subs0) of + {{_Subject, MRef}, Subs} -> + _ = erlang:demonitor(MRef, [flush]), + S#s{subs = Subs}; + error -> + S + end. + +notify_subscribers(EventSubject, Event, #s{subs = Subs}) -> + maps:foreach( + fun(Pid, {Subject, _MRef}) -> + Subject == EventSubject andalso + erlang:send(Pid, {changed, Event}) + end, + Subs + ). + +%%==================================================================== + %% @doc Intersperse elements of two lists. %% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5]. -spec intersperse([X], [Y]) -> [X | Y]. From 556ffc78c9eac7ed42c1400eee2d28fc0c87417f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 5 Apr 2024 17:39:17 +0200 Subject: [PATCH 03/14] feat(dsrepl): implement membership changes and rebalancing --- .../src/emqx_ds_replication_layer_shard.erl | 140 +++++++++- .../emqx_ds_replication_shard_allocator.erl | 259 ++++++++++++++++-- .../test/emqx_ds_replication_SUITE.erl | 244 ++++++++++++++--- 3 files changed, 561 insertions(+), 82 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 45739fbe3..a57e45dfd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -21,6 +21,7 @@ %% Static server configuration -export([ shard_servers/2, + shard_server/3, local_server/2 ]). @@ -30,6 +31,14 @@ server/3 ]). +%% Membership +-export([ + add_local_server/2, + drop_local_server/2, + remove_server/3, + server_info/2 +]). + -behaviour(gen_server). -export([ init/1, @@ -38,21 +47,31 @@ terminate/2 ]). +-type server() :: ra:server_id(). + +-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). + %% start_link(DB, Shard, Opts) -> gen_server:start_link(?MODULE, {DB, Shard, Opts}, []). +-spec shard_servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [server()]. shard_servers(DB, Shard) -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), - [ - {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} - || Site <- ReplicaSet - ]. + [shard_server(DB, Shard, Site) || Site <- ReplicaSet]. +-spec shard_server( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer_meta:site() +) -> server(). +shard_server(DB, Shard, Site) -> + {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}. + +-spec local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> server(). local_server(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - {server_name(DB, Shard, Site), node()}. + {server_name(DB, Shard, local_site()), node()}. cluster_name(DB, Shard) -> iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). @@ -61,6 +80,14 @@ server_name(DB, Shard, Site) -> DBBin = atom_to_binary(DB), binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). +server_uid(_DB, Shard) -> + %% NOTE + %% Each new "instance" of a server should have a unique identifier. Otherwise, + %% if some server migrates to another node during rebalancing, and then comes + %% back, `ra` will be very confused by it having the same UID as before. + Ts = integer_to_binary(erlang:system_time(microsecond)), + <>. + %% servers(DB, Shard, _Order = leader_preferred) -> @@ -118,11 +145,100 @@ get_local_server(DB, Shard) -> get_shard_servers(DB, Shard) -> maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)). +local_site() -> + emqx_ds_replication_layer_meta:this_site(). + +%% + +add_local_server(DB, Shard) -> + %% NOTE + %% Adding local server as "promotable" member to the cluster, which means + %% that it will affect quorum until it is promoted to a voter, which in + %% turn happens when the server has caught up sufficiently with the log. + %% We also rely on this "membership" to understand when the server's + %% readiness. + ShardServers = shard_servers(DB, Shard), + LocalServer = local_server(DB, Shard), + ServerRecord = #{ + id => LocalServer, + membership => promotable, + uid => server_uid(DB, Shard) + }, + case ra:add_member(ShardServers, ServerRecord, ?MEMBERSHIP_CHANGE_TIMEOUT) of + {ok, _, _Leader} -> + ok; + {error, already_member} -> + ok; + {error, Reason} -> + {error, recoverable, Reason} + end. + +drop_local_server(DB, Shard) -> + LocalServer = local_server(DB, Shard), + case remove_server(DB, Shard, LocalServer) of + ok -> + ra:force_delete_server(DB, LocalServer); + {error, _, _Reason} = Error -> + Error + end. + +remove_server(DB, Shard, Server) -> + ShardServers = shard_servers(DB, Shard), + case ra:remove_member(ShardServers, Server, ?MEMBERSHIP_CHANGE_TIMEOUT) of + {ok, _, _Leader} -> + ok; + {error, not_member} -> + ok; + {error, Reason} -> + {error, recoverable, Reason} + end. + +server_info(readiness, Server) -> + %% NOTE + %% Server is ready if it's either the leader or a follower with voter "membership" + %% status (meaning it was promoted after catching up with the log). + case current_leader(Server) of + Server -> + ready; + Leader when Leader /= unknown -> + member_info(readiness, Server, Leader); + unknown -> + unknown + end; +server_info(leader, Server) -> + current_leader(Server). + +member_info(readiness, Server, Leader) -> + case ra:member_overview(Leader) of + {ok, #{cluster := Cluster}, _} -> + member_readiness(maps:get(Server, Cluster)); + _Error -> + unknown + end. + +current_leader(Server) -> + case ra:members(Server) of + {ok, _Servers, Leader} -> + Leader; + _Error -> + unknown + end. + +member_readiness(#{status := Status, voter_status := #{membership := Membership}}) -> + case Status of + normal when Membership =:= voter -> + ready; + _Other -> + {unready, Status, Membership} + end; +member_readiness(#{}) -> + unknown. + %% init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - _Meta = start_shard(DB, Shard, Opts), + ok = start_shard(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -138,7 +254,6 @@ terminate(_Reason, {DB, Shard}) -> %% start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> - Site = emqx_ds_replication_layer_meta:this_site(), ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), @@ -157,7 +272,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> ), ok = ra:start_server(DB, #{ id => LocalServer, - uid => <>, + uid => server_uid(DB, Shard), cluster_name => ClusterName, initial_members => Servers, machine => Machine, @@ -188,12 +303,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> end; _ -> ok - end, - #{ - cluster_name => ClusterName, - servers => Servers, - local_server => LocalServer - }. + end. %% diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 7393da692..4113fcedc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -16,6 +16,8 @@ -module(emqx_ds_replication_shard_allocator). +-include_lib("snabbkaffe/include/trace.hrl"). + -export([start_link/1]). -export([n_shards/1]). @@ -30,9 +32,23 @@ terminate/2 ]). +-export([handle_transition/4]). + -define(db_meta(DB), {?MODULE, DB}). -define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). +-define(ALLOCATE_RETRY_TIMEOUT, 1_000). + +-define(TRANS_RETRY_TIMEOUT, 5_000). +-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). + +-ifdef(TEST). +-undef(TRANS_RETRY_TIMEOUT). +-undef(REMOVE_REPLICA_DELAY). +-define(TRANS_RETRY_TIMEOUT, 1_000). +-define(REMOVE_REPLICA_DELAY, {4_000, 2_000}). +-endif. + %% start_link(DB) -> @@ -47,13 +63,11 @@ shard_meta(DB, Shard) -> %% --define(ALLOCATE_RETRY_TIMEOUT, 1_000). - init(DB) -> _ = erlang:process_flag(trap_exit, true), - _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), - State = #{db => DB, status => allocating}, - handle_allocate_shards(State, ok). + _ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}), + State = #{db => DB, transitions => #{}, status => allocating}, + {ok, handle_allocate_shards(State)}. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -61,12 +75,19 @@ handle_call(_Call, _From, State) -> handle_cast(_Cast, State) -> {noreply, State}. -handle_info(timeout, State) -> - handle_allocate_shards(State, noreply); +handle_info({timeout, _TRef, allocate}, State) -> + {noreply, handle_allocate_shards(State)}; +handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> + {noreply, handle_shard_changed(Shard, State)}; +handle_info({changed, _}, State) -> + {noreply, State}; +handle_info({'EXIT', Pid, Reason}, State) -> + {noreply, handle_exit(Pid, Reason, State)}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #{db := DB, shards := Shards}) -> +terminate(_Reason, State = #{db := DB, shards := Shards}) -> + unsubscribe_db_changes(State), erase_db_meta(DB), erase_shards_meta(DB, Shards); terminate(_Reason, #{}) -> @@ -74,10 +95,11 @@ terminate(_Reason, #{}) -> %% -handle_allocate_shards(State, Ret) -> +handle_allocate_shards(State) -> case allocate_shards(State) of {ok, NState} -> - {Ret, NState}; + ok = subscribe_db_changes(State), + NState; {error, Data} -> _ = logger:notice( Data#{ @@ -85,15 +107,197 @@ handle_allocate_shards(State, Ret) -> retry_in => ?ALLOCATE_RETRY_TIMEOUT } ), - {Ret, State, ?ALLOCATE_RETRY_TIMEOUT} + _TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate), + State end. +subscribe_db_changes(#{db := DB}) -> + emqx_ds_replication_layer_meta:subscribe(self(), DB). + +unsubscribe_db_changes(_State) -> + emqx_ds_replication_layer_meta:unsubscribe(self()). + +%% + +handle_shard_changed(Shard, State = #{db := DB}) -> + ok = save_shard_meta(DB, Shard), + Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), + handle_shard_transitions(Shard, Transitions, State). + +handle_shard_transitions(Shard, Transitions, State = #{db := DB}) -> + ThisSite = emqx_ds_replication_layer_meta:this_site(), + case Transitions of + [] -> + %% We reached the target allocation. + State; + [Trans = {add, ThisSite} | _Rest] -> + ensure_transition_handler(Shard, Trans, fun trans_add_local/3, State); + [Trans = {del, ThisSite} | _Rest] -> + ensure_transition_handler(Shard, Trans, fun trans_drop_local/3, State); + [Trans = {del, Site} | _Rest] -> + ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + case lists:member(Site, ReplicaSet) of + true -> + %% NOTE + %% Putting this transition handler on separate "track" so that it + %% won't block any changes with higher priority (e.g. managing + %% local replicas). + Handler = fun trans_rm_unresponsive/3, + ensure_transition_handler(unresp, Shard, Trans, Handler, State); + false -> + State + end; + [_Trans | _Rest] -> + %% This site is not involved in the next queued transition. + State + end. + +handle_transition(DB, Shard, Trans, Fun) -> + logger:set_process_metadata(#{ + db => DB, + shard => Shard, + domain => [emqx, ds, DB, shard_transition] + }), + ?tp( + dsrepl_shard_transition_begin, + #{shard => Shard, db => DB, transition => Trans, pid => self()} + ), + erlang:apply(Fun, [DB, Shard, Trans]). + +trans_add_local(DB, Shard, {add, Site}) -> + logger:info(#{msg => "Adding new local shard replica", site => Site}), + do_add_local(membership, DB, Shard). + +do_add_local(membership = Stage, DB, Shard) -> + ok = start_shard(DB, Shard), + case emqx_ds_replication_layer_shard:add_local_server(DB, Shard) of + ok -> + do_add_local(readiness, DB, Shard); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_add_local(Stage, DB, Shard) + end; +do_add_local(readiness = Stage, DB, Shard) -> + LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), + case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of + ready -> + logger:info(#{msg => "Local shard replica ready"}); + Status -> + logger:warning(#{ + msg => "Still waiting for local shard replica to be ready", + status => Status, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_add_local(Stage, DB, Shard) + end. + +trans_drop_local(DB, Shard, {del, Site}) -> + logger:info(#{msg => "Dropping local shard replica", site => Site}), + do_drop_local(DB, Shard). + +do_drop_local(DB, Shard) -> + case emqx_ds_replication_layer_shard:drop_local_server(DB, Shard) of + ok -> + ok = emqx_ds_builtin_db_sup:stop_shard({DB, Shard}), + ok = emqx_ds_storage_layer:drop_shard({DB, Shard}), + logger:info(#{msg => "Local shard replica dropped"}); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_drop_local(DB, Shard) + end. + +trans_rm_unresponsive(DB, Shard, Trans = {del, Site}) -> + %% NOTE + %% Let the replica handle its own removal first, thus the delay. + ok = delay(?REMOVE_REPLICA_DELAY), + Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), + case Transitions of + [Trans | _] -> + logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), + do_rm_unresponsive(DB, Shard, Site); + _Outdated -> + exit({shutdown, skipped}) + end. + +do_rm_unresponsive(DB, Shard, Site) -> + Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), + case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of + ok -> + logger:info(#{msg => "Unresponsive shard replica removed"}); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_rm_unresponsive(DB, Shard, Site) + end. + +%% + +ensure_transition_handler(Shard, Trans, Handler, State) -> + ensure_transition_handler(Shard, Shard, Trans, Handler, State). + +ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> + case maps:get(Track, Ts, undefined) of + undefined -> + Pid = start_transition_handler(Shard, Trans, Handler, State), + State#{transitions := Ts#{Track => {Shard, Trans, Pid}}}; + _AlreadyRunning -> + %% NOTE: Avoiding multiple transition handlers for the same shard for safety. + State + end. + +start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> + proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). + +handle_exit(Pid, Reason, State = #{db := DB, transitions := Ts}) -> + case maps:to_list(maps:filter(fun(_, {_S, _T, P}) -> P == Pid end, Ts)) of + [{Track, {Shard, Trans, Pid}}] -> + ?tp( + dsrepl_shard_transition_end, + #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} + ), + ok = handle_transition_exit(Shard, Trans, Reason, State), + State#{transitions := maps:remove(Track, Ts)}; + [] -> + logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), + State + end. + +handle_transition_exit(Shard, Trans, normal, _State = #{db := DB}) -> + %% NOTE: This will trigger the next transition if any. + ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans); +handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, _State) -> + ok; +handle_transition_exit(Shard, Trans, Reason, _State) -> + logger:warning(#{ + msg => "Shard membership transition failed", + shard => Shard, + transition => Trans, + reason => Reason + }), + %% FIXME: retry + ok. + %% allocate_shards(State = #{db := DB}) -> case emqx_ds_replication_layer_meta:allocate_shards(DB) of {ok, Shards} -> - logger:notice(#{msg => "Shards allocated", shards => Shards}), + logger:info(#{msg => "Shards allocated", shards => Shards}), ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), ok = start_egresses(DB, Shards), ok = save_db_meta(DB, Shards), @@ -104,25 +308,23 @@ allocate_shards(State = #{db := DB}) -> end. start_shards(DB, Shards) -> - ok = lists:foreach( - fun(Shard) -> - ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}) - end, - Shards - ), - ok = logger:info(#{msg => "Shards started", shards => Shards}), + lists:foreach(fun(Shard) -> start_shard(DB, Shard) end, Shards). + +start_shard(DB, Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}), + ok = logger:info(#{msg => "Shard started", shard => Shard}), ok. start_egresses(DB, Shards) -> - ok = lists:foreach( - fun(Shard) -> - ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}) - end, - Shards - ), - logger:info(#{msg => "Egresses started", shards => Shards}), + lists:foreach(fun(Shard) -> start_egress(DB, Shard) end, Shards). + +start_egress(DB, Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}), + ok = logger:info(#{msg => "Egress started", shard => Shard}), ok. +%% + save_db_meta(DB, Shards) -> persistent_term:put(?db_meta(DB), #{ shards => Shards, @@ -146,3 +348,8 @@ erase_shards_meta(DB, Shards) -> erase_shard_meta(DB, Shard) -> persistent_term:erase(?shard_meta(DB, Shard)). + +%% + +delay({MinDelay, Variance}) -> + timer:sleep(MinDelay + rand:uniform(Variance)). diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 24e7cdafb..872169765 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -26,18 +26,45 @@ -define(DB, testdb). opts() -> - #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 1, - n_sites => 3, - replication_factor => 3, - replication_options => #{ - wal_max_size_bytes => 128 * 1024, - wal_max_batch_size => 1024, - snapshot_interval => 128 - } - }. + opts(#{}). + +opts(Overrides) -> + maps:merge( + #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 16, + n_sites => 1, + replication_factor => 3, + replication_options => #{ + wal_max_size_bytes => 64 * 1024, + wal_max_batch_size => 1024, + snapshot_interval => 128 + } + }, + Overrides + ). + +appspec(emqx_durable_storage) -> + {emqx_durable_storage, #{ + before_start => fun snabbkaffe:fix_ct_logging/0, + override_env => [{egress_flush_interval, 1}] + }}. + +t_replication_transfers_snapshots(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + NodeSpecs = emqx_cth_cluster:mk_nodespecs( + [ + {t_replication_transfers_snapshots1, #{apps => Apps}}, + {t_replication_transfers_snapshots2, #{apps => Apps}}, + {t_replication_transfers_snapshots3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(NodeSpecs), + [{nodes, Nodes}, {specs, NodeSpecs} | Config]; +t_replication_transfers_snapshots('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_replication_transfers_snapshots(Config) -> NMsgs = 4000, @@ -45,9 +72,10 @@ t_replication_transfers_snapshots(Config) -> _Specs = [_, SpecOffline | _] = ?config(specs, Config), %% Initialize DB on all nodes and wait for it to be online. + Opts = opts(#{n_shards => 1, n_sites => 3}), ?assertEqual( [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()]) + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) ), ?retry( 500, @@ -88,7 +116,7 @@ t_replication_transfers_snapshots(Config) -> Shard = hd(shards(NodeOffline, ?DB)), MessagesOffline = lists:keysort( #message.timestamp, - consume(NodeOffline, ?DB, Shard, ['#'], 0) + consume_shard(NodeOffline, ?DB, Shard, ['#'], 0) ), ?assertEqual( sample(40, Messages), @@ -99,26 +127,169 @@ t_replication_transfers_snapshots(Config) -> MessagesOffline ). +t_replication_rebalance(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_replication_rebalance1, #{apps => Apps}}, + {t_replication_rebalance2, #{apps => Apps}}, + {t_replication_rebalance3, #{apps => Apps}}, + {t_replication_rebalance4, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_replication_rebalance('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_replication_rebalance(Config) -> + NMsgs = 800, + NClients = 5, + Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), + + %% Initialize DB on the first node. + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), + ?assertMatch( + Shards when length(Shards) == 16, + shards_online(N1, ?DB) + ), + + %% Open DB on the rest of the nodes. + ?assertEqual( + [{ok, ok} || _ <- [N2, N3, N4]], + erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) + ), + + Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Only N1 should be responsible for all shards initially. + ?assertEqual( + [[S1] || _ <- Nodes], + [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] + ), + + %% Fill the storage with messages and few additional generations. + %% This will force shards to trigger snapshot transfers during rebalance. + ClientMessages = emqx_utils:pmap( + fun(CID) -> + N = lists:nth(1 + (CID rem length(Nodes)), Nodes), + fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) + end, + lists:seq(1, NClients), + infinity + ), + Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)), + + %% Join the second site to the DB replication sites. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), + + %% Fill in some more messages *during* the rebalance. + MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}), + + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Now join the rest of the sites. + ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), + + %% Fill in some more messages *during* the rebalance. + MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for 3/4 of the shards. + ?assertEqual( + [(16 * 3) div length(Nodes) || _ <- Nodes], + [n_shards_online(N, ?DB) || N <- Nodes] + ), + + %% Verify that the set of shard servers matches the target allocation. + Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], + ShardServers = [ + shard_server_info(N, ?DB, Shard, Site, readiness) + || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), + Shard <- Shards + ], + ?assert( + lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), + ShardServers + ), + + %% Verify that the messages are preserved after the rebalance. + Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2, + MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesN4)), + ?assertEqual(Messages, MessagesN4), + + %% Scale down the cluster by removing the first node. + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for each shard. + ?assertEqual( + [0, 16, 16, 16], + [n_shards_online(N, ?DB) || N <- Nodes] + ), + + %% Verify that the messages are once again preserved after the rebalance. + MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), + ?assertEqual(Messages, MessagesN3). + +%% + +shard_server_info(Node, DB, Shard, Site, Info) -> + Server = shard_server(Node, DB, Shard, Site), + {Server, ds_repl_shard(Node, server_info, [Info, Server])}. + +shard_server(Node, DB, Shard, Site) -> + ds_repl_shard(Node, shard_server, [DB, Shard, Site]). + +ds_repl_meta(Node, Fun) -> + ds_repl_meta(Node, Fun, []). + +ds_repl_meta(Node, Fun, Args) -> + erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). + +ds_repl_shard(Node, Fun, Args) -> + erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). + +transitions(Node, DB) -> + Shards = shards(Node, DB), + [{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])]. + shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). shards_online(Node, DB) -> erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]). +n_shards_online(Node, DB) -> + length(shards_online(Node, DB)). + fill_storage(Node, DB, NMsgs, Opts) -> fill_storage(Node, DB, NMsgs, 0, Opts). -fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs -> - R1 = push_message(Node, DB, I), +fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> + PAddGen = maps:get(p_addgen, Opts, 0.001), + R1 = push_message(Node, DB, I, Opts), R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> []. -push_message(Node, DB, I) -> +push_message(Node, DB, I, Opts) -> Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), - Message = message(Topic, Bytes, I * 100), + ClientId = maps:get(client_id, Opts, <>), + Message = message(ClientId, Topic, Bytes, I * 100), ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), [Message]. @@ -126,16 +297,22 @@ add_generation(Node, DB) -> ok = erpc:call(Node, emqx_ds, add_generation, [DB]), []. -message(Topic, Payload, PublishedAt) -> +message(ClientId, Topic, Payload, PublishedAt) -> #message{ - from = <>, + from = ClientId, topic = Topic, payload = Payload, timestamp = PublishedAt, id = emqx_guid:gen() }. -consume(Node, DB, Shard, TopicFilter, StartTime) -> +compare_message(M1, M2) -> + {M1#message.from, M1#message.timestamp} < {M2#message.from, M2#message.timestamp}. + +consume(Node, DB, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_test_helpers, consume, [DB, TopicFilter, StartTime]). + +consume_shard(Node, DB, Shard, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]). probably(P, Fun) -> @@ -156,26 +333,11 @@ suite() -> [{timetrap, {seconds, 60}}]. all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(TCName, Config) -> - Apps = [ - {emqx_durable_storage, #{ - before_start => fun snabbkaffe:fix_ct_logging/0, - override_env => [{egress_flush_interval, 1}] - }} - ], - WorkDir = emqx_cth_suite:work_dir(TCName, Config), - NodeSpecs = emqx_cth_cluster:mk_nodespecs( - [ - {emqx_ds_replication_SUITE1, #{apps => Apps}}, - {emqx_ds_replication_SUITE2, #{apps => Apps}}, - {emqx_ds_replication_SUITE3, #{apps => Apps}} - ], - #{work_dir => WorkDir} - ), - Nodes = emqx_cth_cluster:start(NodeSpecs), +init_per_testcase(TCName, Config0) -> + Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0), ok = snabbkaffe:start_trace(), - [{nodes, Nodes}, {specs, NodeSpecs} | Config]. + Config. -end_per_testcase(_TCName, Config) -> +end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), - ok = emqx_cth_cluster:stop(?config(nodes, Config)). + emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). From 826ce5806ddfdd2cd27740477370e1bf7cb75092 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Sun, 7 Apr 2024 22:31:24 +0200 Subject: [PATCH 04/14] fix(dsrepl): ensure that new member UID matches server's UID Before that change, UIDs supplied in the `ra:add_member/3` were not the same as those servers were using. This haven't caused any issues for some reason, but it's better to ensure that UIDs are the same. --- .../src/emqx_ds_replication_layer_shard.erl | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index a57e45dfd..5dbeafdb2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -80,14 +80,6 @@ server_name(DB, Shard, Site) -> DBBin = atom_to_binary(DB), binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). -server_uid(_DB, Shard) -> - %% NOTE - %% Each new "instance" of a server should have a unique identifier. Otherwise, - %% if some server migrates to another node during rebalancing, and then comes - %% back, `ra` will be very confused by it having the same UID as before. - Ts = integer_to_binary(erlang:system_time(microsecond)), - <>. - %% servers(DB, Shard, _Order = leader_preferred) -> @@ -159,11 +151,19 @@ add_local_server(DB, Shard) -> %% readiness. ShardServers = shard_servers(DB, Shard), LocalServer = local_server(DB, Shard), - ServerRecord = #{ - id => LocalServer, - membership => promotable, - uid => server_uid(DB, Shard) - }, + case server_info(uid, LocalServer) of + UID when is_binary(UID) -> + ServerRecord = #{ + id => LocalServer, + membership => promotable, + uid => UID + }; + unknown -> + ServerRecord = #{ + id => LocalServer, + membership => voter + } + end, case ra:add_member(ShardServers, ServerRecord, ?MEMBERSHIP_CHANGE_TIMEOUT) of {ok, _, _Leader} -> ok; @@ -206,15 +206,13 @@ server_info(readiness, Server) -> unknown end; server_info(leader, Server) -> - current_leader(Server). + current_leader(Server); +server_info(uid, Server) -> + maps:get(uid, ra_overview(Server), unknown). member_info(readiness, Server, Leader) -> - case ra:member_overview(Leader) of - {ok, #{cluster := Cluster}, _} -> - member_readiness(maps:get(Server, Cluster)); - _Error -> - unknown - end. + Cluster = maps:get(cluster, ra_overview(Leader), #{}), + member_readiness(maps:get(Server, Cluster, #{})). current_leader(Server) -> case ra:members(Server) of @@ -234,6 +232,14 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} member_readiness(#{}) -> unknown. +ra_overview(Server) -> + case ra:member_overview(Server) of + {ok, Overview, _Leader} -> + Overview; + _Error -> + #{} + end. + %% init({DB, Shard, Opts}) -> @@ -305,6 +311,16 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> ok end. +server_uid(_DB, Shard) -> + %% NOTE + %% Each new "instance" of a server should have a unique identifier. Otherwise, + %% if some server migrates to another node during rebalancing, and then comes + %% back, `ra` will be very confused by it having the same UID as before. + %% Keeping the shard ID as a prefix to make it easier to identify the server + %% in the filesystem / logs / etc. + Ts = integer_to_binary(erlang:system_time(microsecond)), + <>. + %% memoize(Fun, Args) -> From 6293efb995675d0470d4178b9916e26149d2ad70 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Sun, 7 Apr 2024 22:35:14 +0200 Subject: [PATCH 05/14] fix(dsrepl): retry crashed membership transitions --- .../src/emqx_ds_replication_layer_meta.erl | 11 +- .../emqx_ds_replication_shard_allocator.erl | 160 ++++++++++++------ 2 files changed, 120 insertions(+), 51 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index f27fa414e..31ed62fcb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -79,7 +79,12 @@ n_shards/1 ]). --export_type([site/0, update_cluster_result/0]). +-export_type([ + site/0, + transition/0, + subscription_event/0, + update_cluster_result/0 +]). -include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -134,6 +139,10 @@ %% Subject of the subscription: -type subject() :: emqx_ds:db(). +%% Event for the subscription: +-type subscription_event() :: + {changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}. + %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 4113fcedc..363a453d6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -40,6 +40,7 @@ -define(ALLOCATE_RETRY_TIMEOUT, 1_000). -define(TRANS_RETRY_TIMEOUT, 5_000). +-define(CRASH_RETRY_DELAY, 20_000). -define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). -ifdef(TEST). @@ -51,9 +52,11 @@ %% +-spec start_link(emqx_ds:db()) -> {ok, pid()}. start_link(DB) -> gen_server:start_link(?MODULE, DB, []). +-spec n_shards(emqx_ds:db()) -> non_neg_integer(). n_shards(DB) -> Meta = persistent_term:get(?db_meta(DB)), maps:get(n_shards, Meta). @@ -63,18 +66,44 @@ shard_meta(DB, Shard) -> %% +-record(transhdl, { + shard :: emqx_ds_replication_layer:shard_id(), + trans :: emqx_ds_replication_layer_meta:transition(), + pid :: pid() +}). + +-type state() :: #{ + db := emqx_ds:db(), + shards := [emqx_ds_replication_layer:shard_id()], + status := allocating | ready, + transitions := #{_Track => #transhdl{}} +}. + +-spec init(emqx_ds:db()) -> {ok, state()}. init(DB) -> _ = erlang:process_flag(trap_exit, true), _ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}), - State = #{db => DB, transitions => #{}, status => allocating}, + State = #{ + db => DB, + shards => [], + status => allocating, + transitions => #{} + }, {ok, handle_allocate_shards(State)}. +-spec handle_call(_Call, _From, state()) -> {reply, ignored, state()}. handle_call(_Call, _From, State) -> {reply, ignored, State}. +-spec handle_cast(_Cast, state()) -> {noreply, state()}. handle_cast(_Cast, State) -> {noreply, State}. +-spec handle_info(Info, state()) -> {noreply, state()} when + Info :: + emqx_ds_replication_layer_meta:subscription_event() + | {timeout, reference(), allocate} + | {'EXIT', pid(), _Reason}. handle_info({timeout, _TRef, allocate}, State) -> {noreply, handle_allocate_shards(State)}; handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> @@ -86,6 +115,7 @@ handle_info({'EXIT', Pid, Reason}, State) -> handle_info(_Info, State) -> {noreply, State}. +-spec terminate(_Reason, state()) -> _Ok. terminate(_Reason, State = #{db := DB, shards := Shards}) -> unsubscribe_db_changes(State), erase_db_meta(DB), @@ -121,38 +151,55 @@ unsubscribe_db_changes(_State) -> handle_shard_changed(Shard, State = #{db := DB}) -> ok = save_shard_meta(DB, Shard), - Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), - handle_shard_transitions(Shard, Transitions, State). + handle_shard_transitions(Shard, next_transitions(DB, Shard), State). -handle_shard_transitions(Shard, Transitions, State = #{db := DB}) -> +next_transitions(DB, Shard) -> + emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard). + +handle_shard_transitions(_Shard, [], State) -> + %% We reached the target allocation. + State; +handle_shard_transitions(Shard, [Trans | _Rest], State) -> + case transition_handler(Shard, Trans, State) of + {Track, Handler} -> + ensure_transition_handler(Track, Shard, Trans, Handler, State); + undefined -> + State + end. + +transition_handler(Shard, Trans, _State = #{db := DB}) -> ThisSite = emqx_ds_replication_layer_meta:this_site(), - case Transitions of - [] -> - %% We reached the target allocation. - State; - [Trans = {add, ThisSite} | _Rest] -> - ensure_transition_handler(Shard, Trans, fun trans_add_local/3, State); - [Trans = {del, ThisSite} | _Rest] -> - ensure_transition_handler(Shard, Trans, fun trans_drop_local/3, State); - [Trans = {del, Site} | _Rest] -> + case Trans of + {add, ThisSite} -> + {Shard, fun trans_add_local/3}; + {del, ThisSite} -> + {Shard, fun trans_drop_local/3}; + {del, Site} -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), case lists:member(Site, ReplicaSet) of true -> + %% NOTE + %% Let the replica handle its own removal first, but still set + %% up a removal handler after a delay, in case the replica is + %% unresponsive. + Handler = {fun trans_delay/5, [ + ?REMOVE_REPLICA_DELAY, + fun trans_rm_unresponsive/3 + ]}, %% NOTE %% Putting this transition handler on separate "track" so that it %% won't block any changes with higher priority (e.g. managing %% local replicas). - Handler = fun trans_rm_unresponsive/3, - ensure_transition_handler(unresp, Shard, Trans, Handler, State); + {_Track = unresp, Handler}; false -> - State + undefined end; - [_Trans | _Rest] -> + _NotOurs -> %% This site is not involved in the next queued transition. - State + undefined end. -handle_transition(DB, Shard, Trans, Fun) -> +handle_transition(DB, Shard, Trans, Handler) -> logger:set_process_metadata(#{ db => DB, shard => Shard, @@ -162,6 +209,11 @@ handle_transition(DB, Shard, Trans, Fun) -> dsrepl_shard_transition_begin, #{shard => Shard, db => DB, transition => Trans, pid => self()} ), + apply_handler(Handler, DB, Shard, Trans). + +apply_handler({Fun, Args}, DB, Shard, Trans) -> + erlang:apply(Fun, [DB, Shard, Trans | Args]); +apply_handler(Fun, DB, Shard, Trans) -> erlang:apply(Fun, [DB, Shard, Trans]). trans_add_local(DB, Shard, {add, Site}) -> @@ -217,18 +269,9 @@ do_drop_local(DB, Shard) -> do_drop_local(DB, Shard) end. -trans_rm_unresponsive(DB, Shard, Trans = {del, Site}) -> - %% NOTE - %% Let the replica handle its own removal first, thus the delay. - ok = delay(?REMOVE_REPLICA_DELAY), - Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), - case Transitions of - [Trans | _] -> - logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), - do_rm_unresponsive(DB, Shard, Site); - _Outdated -> - exit({shutdown, skipped}) - end. +trans_rm_unresponsive(DB, Shard, {del, Site}) -> + logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), + do_rm_unresponsive(DB, Shard, Site). do_rm_unresponsive(DB, Shard, Site) -> Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), @@ -245,16 +288,23 @@ do_rm_unresponsive(DB, Shard, Site) -> do_rm_unresponsive(DB, Shard, Site) end. -%% +trans_delay(DB, Shard, Trans, Delay, NextHandler) -> + ok = delay(Delay), + case next_transitions(DB, Shard) of + [Trans | _] -> + apply_handler(NextHandler, DB, Shard, Trans); + _Outdated -> + exit({shutdown, skipped}) + end. -ensure_transition_handler(Shard, Trans, Handler, State) -> - ensure_transition_handler(Shard, Shard, Trans, Handler, State). +%% ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> case maps:get(Track, Ts, undefined) of undefined -> Pid = start_transition_handler(Shard, Trans, Handler, State), - State#{transitions := Ts#{Track => {Shard, Trans, Pid}}}; + Record = #transhdl{shard = Shard, trans = Trans, pid = Pid}, + State#{transitions := Ts#{Track => Record}}; _AlreadyRunning -> %% NOTE: Avoiding multiple transition handlers for the same shard for safety. State @@ -263,34 +313,42 @@ ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). -handle_exit(Pid, Reason, State = #{db := DB, transitions := Ts}) -> - case maps:to_list(maps:filter(fun(_, {_S, _T, P}) -> P == Pid end, Ts)) of - [{Track, {Shard, Trans, Pid}}] -> +handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> + case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of + [{Track, #transhdl{shard = Shard, trans = Trans}}] -> ?tp( dsrepl_shard_transition_end, #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} ), - ok = handle_transition_exit(Shard, Trans, Reason, State), - State#{transitions := maps:remove(Track, Ts)}; + State = State0#{transitions := maps:remove(Track, Ts)}, + handle_transition_exit(Shard, Trans, Reason, State); [] -> logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), - State + State0 end. -handle_transition_exit(Shard, Trans, normal, _State = #{db := DB}) -> +handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> %% NOTE: This will trigger the next transition if any. - ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans); -handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, _State) -> - ok; -handle_transition_exit(Shard, Trans, Reason, _State) -> + ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans), + State; +handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> + State; +handle_transition_exit(Shard, Trans, Reason, State) -> + %% NOTE + %% In case of `{add, Site}` transition failure, we have no choice but to retry: + %% no other node can perform the transition and make progress towards the desired + %% state. For simplicity, we retry any crashed transition handler after a fixed + %% delay. logger:warning(#{ msg => "Shard membership transition failed", shard => Shard, transition => Trans, - reason => Reason + reason => Reason, + retry_in => ?CRASH_RETRY_DELAY }), - %% FIXME: retry - ok. + {Track, Handler} = transition_handler(Shard, Trans, State), + RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, + ensure_transition_handler(Track, Shard, Trans, RetryHandler, State). %% @@ -352,4 +410,6 @@ erase_shard_meta(DB, Shard) -> %% delay({MinDelay, Variance}) -> - timer:sleep(MinDelay + rand:uniform(Variance)). + timer:sleep(MinDelay + rand:uniform(Variance)); +delay(Delay) -> + timer:sleep(Delay). From ecaad348a754f37c25b92580488448d15fd999aa Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Sun, 7 Apr 2024 22:41:44 +0200 Subject: [PATCH 06/14] chore(dsrepl): update few outdated comments / TODOs --- .../src/emqx_ds_replication_layer_shard.erl | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 5dbeafdb2..c2828f31f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -98,13 +98,11 @@ get_servers_leader_preferred(DB, Shard) -> Servers = ra_leaderboard:lookup_members(ClusterName), [Leader | lists:delete(Leader, Servers)]; undefined -> - %% TODO: Dynamic membership. get_shard_servers(DB, Shard) end. get_server_local_preferred(DB, Shard) -> - %% NOTE: Contact random replica that is not a known leader. - %% TODO: Replica may be down, so we may need to retry. + %% NOTE: Contact either local server or a random replica. ClusterName = get_cluster_name(DB, Shard), case ra_leaderboard:lookup_members(ClusterName) of Servers when is_list(Servers) -> @@ -113,15 +111,14 @@ get_server_local_preferred(DB, Shard) -> %% TODO %% Leader is unkonwn if there are no servers of this group on the %% local node. We want to pick a replica in that case as well. - %% TODO: Dynamic membership. pick_random(get_shard_servers(DB, Shard)) end. pick_local(Servers) -> - case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of - [Local | _] -> + case lists:keyfind(node(), 2, Servers) of + Local when is_tuple(Local) -> Local; - [] -> + false -> pick_random(Servers) end. @@ -215,6 +212,7 @@ member_info(readiness, Server, Leader) -> member_readiness(maps:get(Server, Cluster, #{})). current_leader(Server) -> + %% NOTE: This call will block until the leader is known, or until the timeout. case ra:members(Server) of {ok, _Servers, Leader} -> Leader; From 2ace9bb893d75325cbf970ebb31b1d4dc878712d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Sun, 7 Apr 2024 22:50:36 +0200 Subject: [PATCH 07/14] chore(dsrepl): sprinkle few comments and typespecs for exports --- .../src/emqx_ds_replication_layer_shard.erl | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index c2828f31f..8f87b69b4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -139,6 +139,11 @@ local_site() -> %% +%% @doc Add a local server to the shard cluster. +%% It's recommended to have the local server running before calling this function. +%% This function is idempotent. +-spec add_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + ok | emqx_ds:error(_Reason). add_local_server(DB, Shard) -> %% NOTE %% Adding local server as "promotable" member to the cluster, which means @@ -170,6 +175,11 @@ add_local_server(DB, Shard) -> {error, recoverable, Reason} end. +%% @doc Remove a local server from the shard cluster and clean up on-disk data. +%% It's required to have the local server running before calling this function. +%% This function is idempotent. +-spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> LocalServer = local_server(DB, Shard), case remove_server(DB, Shard, LocalServer) of @@ -179,6 +189,12 @@ drop_local_server(DB, Shard) -> Error end. +%% @doc Remove a (remote) server from the shard cluster. +%% The server might not be running when calling this function, e.g. the node +%% might be offline. Because of this, on-disk data will not be cleaned up. +%% This function is idempotent. +-spec remove_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), server()) -> + ok | emqx_ds:error(_Reason). remove_server(DB, Shard, Server) -> ShardServers = shard_servers(DB, Shard), case ra:remove_member(ShardServers, Server, ?MEMBERSHIP_CHANGE_TIMEOUT) of @@ -190,6 +206,10 @@ remove_server(DB, Shard, Server) -> {error, recoverable, Reason} end. +-spec server_info + (readiness, server()) -> ready | {unready, _Status, _Membership} | unknown; + (leader, server()) -> server() | unknown; + (uid, server()) -> _UID :: binary() | unknown. server_info(readiness, Server) -> %% NOTE %% Server is ready if it's either the leader or a follower with voter "membership" From dcde30c38a98ae8112b4e8f71b41e0d9a94508f4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 13:22:31 +0200 Subject: [PATCH 08/14] test(dsrepl): add two more testcases for rebalancing --- .../src/emqx_ds_replication_layer_meta.erl | 3 + .../emqx_ds_replication_shard_allocator.erl | 4 + .../test/emqx_ds_replication_SUITE.erl | 164 +++++++++++++++++- 3 files changed, 164 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 31ed62fcb..dca2442b8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -134,6 +134,7 @@ ok | {error, {nonexistent_db, emqx_ds:db()}} | {error, {nonexistent_sites, [site()]}} + | {error, {too_few_sites, [site()]}} | {error, _}. %% Subject of the subscription: @@ -452,6 +453,8 @@ allocate_shards_trans(DB) -> assign_db_sites_trans(DB, Sites) -> Opts = db_config_trans(DB), case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of + [] when length(Sites) == 0 -> + mnesia:abort({too_few_sites, Sites}); [] -> ok; NonexistentSites -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 363a453d6..2c9cc44fa 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -323,6 +323,10 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> State = State0#{transitions := maps:remove(Track, Ts)}, handle_transition_exit(Shard, Trans, Reason, State); [] -> + %% NOTE + %% Actually, it's sort of expected to have a portion of exit signals here, + %% because of `mria:with_middleman/3`. But it's impossible to tell them apart + %% from other singals. logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), State0 end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 872169765..6a2c36b30 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -127,22 +127,22 @@ t_replication_transfers_snapshots(Config) -> MessagesOffline ). -t_replication_rebalance(init, Config) -> +t_rebalance(init, Config) -> Apps = [appspec(emqx_durable_storage)], Nodes = emqx_cth_cluster:start( [ - {t_replication_rebalance1, #{apps => Apps}}, - {t_replication_rebalance2, #{apps => Apps}}, - {t_replication_rebalance3, #{apps => Apps}}, - {t_replication_rebalance4, #{apps => Apps}} + {t_rebalance1, #{apps => Apps}}, + {t_rebalance2, #{apps => Apps}}, + {t_rebalance3, #{apps => Apps}}, + {t_rebalance4, #{apps => Apps}} ], #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} ), [{nodes, Nodes} | Config]; -t_replication_rebalance('end', Config) -> +t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). -t_replication_rebalance(Config) -> +t_rebalance(Config) -> NMsgs = 800, NClients = 5, Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), @@ -243,6 +243,156 @@ t_replication_rebalance(Config) -> ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), ?assertEqual(Messages, MessagesN3). +t_join_leave_errors(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_join_leave_errors1, #{apps => Apps}}, + {t_join_leave_errors2, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_join_leave_errors('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_join_leave_errors(Config) -> + [N1, N2] = ?config(nodes, Config), + + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), + ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])), + + [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], + + ?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])), + + %% Attempts to join a nonexistent DB / site. + ?assertEqual( + {error, {nonexistent_db, boo}}, + ds_repl_meta(N1, join_db_site, [_DB = boo, S1]) + ), + ?assertEqual( + {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, + ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>]) + ), + %% NOTE: Leaving a non-existent site is not an error. + ?assertEqual( + ok, + ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>]) + ), + + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), + ?assertEqual([], transitions(N1, ?DB)), + + %% Impossible to leave the last site. + ?assertEqual( + {error, {too_few_sites, []}}, + ds_repl_meta(N1, leave_db_site, [?DB, S1]) + ), + + %% "Move" the DB to the other node. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), + ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertMatch([_ | _], transitions(N1, ?DB)), + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertEqual([], transitions(N1, ?DB)). + +t_rebalance_chaotic_converges(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_rebalance_chaotic_converges1, #{apps => Apps}}, + {t_rebalance_chaotic_converges2, #{apps => Apps}}, + {t_rebalance_chaotic_converges3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_rebalance_chaotic_converges('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance_chaotic_converges(Config) -> + NMsgs = 500, + Nodes = [N1, N2, N3] = ?config(nodes, Config), + + %% Initialize DB on first two nodes. + Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), + + %% Open DB on the last node. + ?assertEqual( + ok, + erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + ), + + %% Find out which sites there are. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Initially, the DB is assigned to [S1, S2]. + ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), + ?assertEqual( + lists:sort([S1, S2]), + ds_repl_meta(N1, db_sites, [?DB]) + ), + + %% Fill the storage with messages and few additional generations. + Messages0 = lists:append([ + fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), + fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}), + fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>}) + ]), + + %% Construct a chaotic transition sequence that changes assignment to [S2, S3]. + Sequence = [ + {N1, join_db_site, S3}, + {N2, leave_db_site, S2}, + {N3, leave_db_site, S1}, + {N1, join_db_site, S2}, + {N2, join_db_site, S1}, + {N3, leave_db_site, S3}, + {N1, leave_db_site, S1}, + {N2, join_db_site, S3} + ], + + %% Apply the sequence while also filling the storage with messages. + TransitionMessages = lists:map( + fun({N, Transition, Site}) -> + %% Apply the transition. + ?assertEqual(ok, ds_repl_meta(N, Transition, [?DB, Site])), + %% Give some time for at least one transition to complete. + Transitions = transitions(N, ?DB), + ct:pal("Transitions after ~p: ~p", [N, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), + %% Fill the storage with messages. + CID = integer_to_binary(erlang:system_time()), + fill_storage(N, ?DB, NMsgs, #{client_id => CID}) + end, + Sequence + ), + + %% Wait for the last transition to complete. + ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + + ?assertEqual( + lists:sort([S2, S3]), + ds_repl_meta(N1, db_sites, [?DB]) + ), + + %% Check that all messages are still there. + Messages = lists:append(TransitionMessages) ++ Messages0, + MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), + ?assertEqual(Messages, MessagesDB). + %% shard_server_info(Node, DB, Shard, Site, Info) -> From 4c0cc079c24d2ea7e47a18ccf746c8d4b63a424e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 13:25:45 +0200 Subject: [PATCH 09/14] fix(dsrepl): apply unnecessary rebalancing transitions cleanly --- .../src/emqx_ds_replication_layer_meta.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index dca2442b8..97d4e7412 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -481,15 +481,19 @@ modify_db_sites_trans(DB, Modifications) -> case Sites of Sites0 -> ok; - _Chagned -> + _Changed -> assign_db_sites_trans(DB, Sites) end. update_replica_set_trans(DB, Shard, Trans) -> case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> + %% NOTE + %% It's possible to complete a transition that's no longer planned. We + %% should anticipate that we may stray _away_ from the target set. + TargetSet1 = emqx_maybe:define(TargetSet0, ReplicaSet0), ReplicaSet = apply_transition(Trans, ReplicaSet0), - case lists:usort(TargetSet0) of + case lists:usort(TargetSet1) of ReplicaSet -> TargetSet = undefined; TS -> From 75bb7f5cdc26c319729aad445195c39b6cd5e411 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 16:04:33 +0200 Subject: [PATCH 10/14] fix(dsrepl): retry only `{add, Site}` crashed membership transitions To minimize the potential negative impact of removal transitions that crash for some unknown and unusual reasons. --- .../emqx_ds_replication_shard_allocator.erl | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 2c9cc44fa..7afeb9d26 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -290,6 +290,7 @@ do_rm_unresponsive(DB, Shard, Site) -> trans_delay(DB, Shard, Trans, Delay, NextHandler) -> ok = delay(Delay), + %% NOTE: Proceed only if the transition we are going to handle is still desired. case next_transitions(DB, Shard) of [Trans | _] -> apply_handler(NextHandler, DB, Shard, Trans); @@ -338,11 +339,6 @@ handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> State; handle_transition_exit(Shard, Trans, Reason, State) -> - %% NOTE - %% In case of `{add, Site}` transition failure, we have no choice but to retry: - %% no other node can perform the transition and make progress towards the desired - %% state. For simplicity, we retry any crashed transition handler after a fixed - %% delay. logger:warning(#{ msg => "Shard membership transition failed", shard => Shard, @@ -350,9 +346,18 @@ handle_transition_exit(Shard, Trans, Reason, State) -> reason => Reason, retry_in => ?CRASH_RETRY_DELAY }), - {Track, Handler} = transition_handler(Shard, Trans, State), - RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, - ensure_transition_handler(Track, Shard, Trans, RetryHandler, State). + %% NOTE + %% In case of `{add, Site}` transition failure, we have no choice but to retry: + %% no other node can perform the transition and make progress towards the desired + %% state. + case Trans of + {add, _ThisSite} -> + {Track, Handler} = transition_handler(Shard, Trans, State), + RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, + ensure_transition_handler(Track, Shard, Trans, RetryHandler, State); + _Another -> + State + end. %% From 7a836317acffb72224b617a9d7154735e1ae3139 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 16:12:42 +0200 Subject: [PATCH 11/14] fix(dsrepl): trigger unfinished shard transition upon startup Also provide a trivial API to trigger them by hand. --- .../emqx_ds_replication_shard_allocator.erl | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 7afeb9d26..cfc2b7c81 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -23,6 +23,9 @@ -export([n_shards/1]). -export([shard_meta/2]). +%% Maintenace purposes: +-export([trigger_transitions/1]). + -behaviour(gen_server). -export([ init/1, @@ -52,10 +55,16 @@ %% +-record(trigger_transitions, {}). + -spec start_link(emqx_ds:db()) -> {ok, pid()}. start_link(DB) -> gen_server:start_link(?MODULE, DB, []). +-spec trigger_transitions(pid()) -> ok. +trigger_transitions(Pid) -> + gen_server:cast(Pid, #trigger_transitions{}). + -spec n_shards(emqx_ds:db()) -> non_neg_integer(). n_shards(DB) -> Meta = persistent_term:get(?db_meta(DB)), @@ -96,6 +105,8 @@ handle_call(_Call, _From, State) -> {reply, ignored, State}. -spec handle_cast(_Cast, state()) -> {noreply, state()}. +handle_cast(#trigger_transitions{}, State) -> + {noreply, handle_pending_transitions(State)}; handle_cast(_Cast, State) -> {noreply, State}. @@ -125,11 +136,14 @@ terminate(_Reason, #{}) -> %% -handle_allocate_shards(State) -> - case allocate_shards(State) of - {ok, NState} -> +handle_allocate_shards(State0) -> + case allocate_shards(State0) of + {ok, State} -> + %% NOTE + %% Subscribe to shard changes and trigger any yet unhandled transitions. ok = subscribe_db_changes(State), - NState; + ok = trigger_transitions(self()), + State; {error, Data} -> _ = logger:notice( Data#{ @@ -138,7 +152,7 @@ handle_allocate_shards(State) -> } ), _TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate), - State + State0 end. subscribe_db_changes(#{db := DB}) -> @@ -153,6 +167,15 @@ handle_shard_changed(Shard, State = #{db := DB}) -> ok = save_shard_meta(DB, Shard), handle_shard_transitions(Shard, next_transitions(DB, Shard), State). +handle_pending_transitions(State = #{db := DB, shards := Shards}) -> + lists:foldl( + fun(Shard, StateAcc) -> + handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc) + end, + State, + Shards + ). + next_transitions(DB, Shard) -> emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard). From 1e95bd4da6d04d7426930b6225c0a6c569c67c54 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 19:58:59 +0200 Subject: [PATCH 12/14] test(dsrepl): test unresponsive nodes removal / node restarts --- .../emqx_ds_replication_shard_allocator.erl | 4 +- .../test/emqx_ds_replication_SUITE.erl | 78 ++++++++++++++++++- 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index cfc2b7c81..f02335a10 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -50,7 +50,7 @@ -undef(TRANS_RETRY_TIMEOUT). -undef(REMOVE_REPLICA_DELAY). -define(TRANS_RETRY_TIMEOUT, 1_000). --define(REMOVE_REPLICA_DELAY, {4_000, 2_000}). +-define(REMOVE_REPLICA_DELAY, {3_000, 2_000}). -endif. %% @@ -213,7 +213,7 @@ transition_handler(Shard, Trans, _State = #{db := DB}) -> %% Putting this transition handler on separate "track" so that it %% won't block any changes with higher priority (e.g. managing %% local replicas). - {_Track = unresp, Handler}; + {{unresp, Shard}, Handler}; false -> undefined end; diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 6a2c36b30..9fc55d170 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -143,6 +143,12 @@ t_rebalance('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_rebalance(Config) -> + %% This testcase verifies that the storage rebalancing works correctly: + %% 1. Join/leave operations are applied successfully. + %% 2. Message data survives the rebalancing. + %% 3. Shard cluster membership converges to the target replica allocation. + %% 4. Replication factor is respected. + NMsgs = 800, NClients = 5, Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), @@ -257,6 +263,9 @@ t_join_leave_errors('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_join_leave_errors(Config) -> + %% This testcase verifies that logical errors arising during handling of + %% join/leave operations are reported correctly. + [N1, N2] = ?config(nodes, Config), Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), @@ -317,6 +326,10 @@ t_rebalance_chaotic_converges('end', Config) -> ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_rebalance_chaotic_converges(Config) -> + %% This testcase verifies that even a very chaotic sequence of join/leave + %% operations will still be handled consistently, and that the shard + %% allocation will converge to the target state. + NMsgs = 500, Nodes = [N1, N2, N3] = ?config(nodes, Config), @@ -365,12 +378,12 @@ t_rebalance_chaotic_converges(Config) -> %% Apply the sequence while also filling the storage with messages. TransitionMessages = lists:map( - fun({N, Transition, Site}) -> + fun({N, Operation, Site}) -> %% Apply the transition. - ?assertEqual(ok, ds_repl_meta(N, Transition, [?DB, Site])), + ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), %% Give some time for at least one transition to complete. Transitions = transitions(N, ?DB), - ct:pal("Transitions after ~p: ~p", [N, Transitions]), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), %% Fill the storage with messages. CID = integer_to_binary(erlang:system_time()), @@ -393,6 +406,65 @@ t_rebalance_chaotic_converges(Config) -> ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), ?assertEqual(Messages, MessagesDB). +t_rebalance_offline_restarts(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {t_rebalance_offline_restarts1, #{apps => Apps}}, + {t_rebalance_offline_restarts2, #{apps => Apps}}, + {t_rebalance_offline_restarts3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + [{nodes, Nodes}, {nodespecs, Specs} | Config]; +t_rebalance_offline_restarts('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance_offline_restarts(Config) -> + %% This testcase verifies that rebalancing progresses if nodes restart or + %% go offline and never come back. + + Nodes = [N1, N2, N3] = ?config(nodes, Config), + _Specs = [NS1, NS2, _] = ?config(nodespecs, Config), + + %% Initialize DB on all 3 nodes. + Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + ?retry( + 500, + 10, + ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes]) + ), + + %% Find out which sites are there. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Shut down N3 and then remove it from the DB. + ok = emqx_cth_cluster:stop_node(N3), + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), + Transitions = transitions(N1, ?DB), + ct:pal("Transitions: ~p~n", [Transitions]), + + %% Wait until at least one transition completes. + ?block_until(#{?snk_kind := dsrepl_shard_transition_end}), + + %% Restart N1 and N2. + [N1] = emqx_cth_cluster:restart(NS1), + [N2] = emqx_cth_cluster:restart(NS2), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), + + %% Target state should still be reached eventually. + ?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), + ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). + %% shard_server_info(Node, DB, Shard, Site, Info) -> From 3223797ae5930587523a15be1ce00e62012dbdab Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 21:28:20 +0200 Subject: [PATCH 13/14] fix(dsrepl): attempt leadership transfer before server removal This should make it much less likely to hit weird edge cases that lead to duplicate Raft log entries because of client retries upon receiving `shutdown` from the leader being removed. --- .../src/emqx_ds_replication_layer_shard.erl | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 8f87b69b4..2d19ec7ef 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -114,6 +114,13 @@ get_server_local_preferred(DB, Shard) -> pick_random(get_shard_servers(DB, Shard)) end. +lookup_leader(DB, Shard) -> + %% NOTE + %% Does not block, but the result may be outdated or even unknown when there's + %% no servers on the local node. + ClusterName = get_cluster_name(DB, Shard), + ra_leaderboard:lookup_leader(ClusterName). + pick_local(Servers) -> case lists:keyfind(node(), 2, Servers) of Local when is_tuple(Local) -> @@ -181,7 +188,22 @@ add_local_server(DB, Shard) -> -spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | emqx_ds:error(_Reason). drop_local_server(DB, Shard) -> + ShardServers = shard_servers(DB, Shard), LocalServer = local_server(DB, Shard), + case lookup_leader(DB, Shard) of + LocalServer -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% Timeouts are ignored, it's a best effort attempt. + [Candidate | _] = lists:delete(LocalServer, ShardServers), + _ = ra:transfer_leadership(LocalServer, Candidate), + _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); + _Another -> + ok + end, case remove_server(DB, Shard, LocalServer) of ok -> ra:force_delete_server(DB, LocalServer); @@ -351,3 +373,24 @@ memoize(Fun, Args) -> Result -> Result end. + +wait_until(Fun) -> + wait_until(Fun, 5_000, 250). + +wait_until(Fun, Timeout, Sleep) -> + Deadline = erlang:monotonic_time(millisecond) + Timeout, + loop_until(Fun, Deadline, Sleep). + +loop_until(Fun, Deadline, Sleep) -> + case Fun() of + true -> + ok; + false -> + case erlang:monotonic_time(millisecond) of + Now when Now < Deadline -> + timer:sleep(Sleep), + loop_until(Fun, Deadline, Sleep); + _ -> + timeout + end + end. From d12e907209786a0170d5026f04de01ae949f92f7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 8 Apr 2024 22:44:34 +0200 Subject: [PATCH 14/14] fix(dsrepl): correctly handle ra membership change command results Before this change, results similar to `{error, {no_more_servers_to_try, [{error, nodedown}, {error, not_member}]}}` were considered retryable failures, which is incorrect. --- .../src/emqx_ds_replication_layer_shard.erl | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 2d19ec7ef..f4c0d3b01 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -173,13 +173,14 @@ add_local_server(DB, Shard) -> membership => voter } end, - case ra:add_member(ShardServers, ServerRecord, ?MEMBERSHIP_CHANGE_TIMEOUT) of + Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, + case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of {ok, _, _Leader} -> ok; {error, already_member} -> ok; - {error, Reason} -> - {error, recoverable, Reason} + Error -> + {error, recoverable, Error} end. %% @doc Remove a local server from the shard cluster and clean up on-disk data. @@ -219,13 +220,14 @@ drop_local_server(DB, Shard) -> ok | emqx_ds:error(_Reason). remove_server(DB, Shard, Server) -> ShardServers = shard_servers(DB, Shard), - case ra:remove_member(ShardServers, Server, ?MEMBERSHIP_CHANGE_TIMEOUT) of + Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, + case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of {ok, _, _Leader} -> ok; {error, not_member} -> ok; - {error, Reason} -> - {error, recoverable, Reason} + Error -> + {error, recoverable, Error} end. -spec server_info @@ -272,6 +274,20 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} member_readiness(#{}) -> unknown. +%% + +ra_try_servers([Server | Rest], Fun, Args) -> + case erlang:apply(Fun, [Server | Args]) of + {ok, R, Leader} -> + {ok, R, Leader}; + {error, Reason} when Reason == noproc; Reason == nodedown -> + ra_try_servers(Rest, Fun, Args); + ErrorOrTimeout -> + ErrorOrTimeout + end; +ra_try_servers([], _Fun, _Args) -> + {error, servers_unreachable}. + ra_overview(Server) -> case ra:member_overview(Server) of {ok, Overview, _Leader} ->