diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index c521164f4..195db7c34 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -74,7 +74,7 @@ start_egress({DB, Shard}) -> supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)). -spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok. -stop_shard(Shard = {DB, _}) -> +stop_shard({DB, Shard}) -> Sup = ?via(#?shards_sup{db = DB}), ok = supervisor:terminate_child(Sup, Shard), ok = supervisor:delete_child(Sup, Shard). @@ -212,7 +212,7 @@ sup_spec(Id, Options) -> shard_spec(DB, Shard) -> #{ - id => {shard, Shard}, + id => Shard, start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]}, shutdown => infinity, restart => permanent, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 66029d4ca..97d4e7412 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -57,6 +57,12 @@ target_set/2 ]). +%% Subscriptions to changes: +-export([ + subscribe/2, + unsubscribe/1 +]). + %% gen_server -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -73,7 +79,12 @@ n_shards/1 ]). --export_type([site/0, update_cluster_result/0]). +-export_type([ + site/0, + transition/0, + subscription_event/0, + update_cluster_result/0 +]). -include_lib("stdlib/include/qlc.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -123,8 +134,16 @@ ok | {error, {nonexistent_db, emqx_ds:db()}} | {error, {nonexistent_sites, [site()]}} + | {error, {too_few_sites, [site()]}} | {error, _}. +%% Subject of the subscription: +-type subject() :: emqx_ds:db(). + +%% Event for the subscription: +-type subscription_event() :: + {changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}. + %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). @@ -336,11 +355,21 @@ target_set(DB, Shard) -> undefined end. +%%================================================================================ + +subscribe(Pid, Subject) -> + gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity). + +unsubscribe(Pid) -> + gen_server:call(?SERVER, {unsubscribe, Pid}, infinity). + %%================================================================================ %% behavior callbacks %%================================================================================ --record(s, {}). +-record(s, { + subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}} +}). init([]) -> process_flag(trap_exit, true), @@ -348,14 +377,24 @@ init([]) -> ensure_tables(), ensure_site(), S = #s{}, + {ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}), {ok, S}. +handle_call({subscribe, Pid, Subject}, _From, S) -> + {reply, ok, handle_subscribe(Pid, Subject, S)}; +handle_call({unsubscribe, Pid}, _From, S) -> + {reply, ok, handle_unsubscribe(Pid, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. handle_cast(_Cast, S) -> {noreply, S}. +handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) -> + ok = notify_subscribers(DB, {shard, DB, Shard}, S), + {noreply, S}; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) -> + {noreply, handle_unsubscribe(Pid, S)}; handle_info(_Info, S) -> {noreply, S}. @@ -414,6 +453,8 @@ allocate_shards_trans(DB) -> assign_db_sites_trans(DB, Sites) -> Opts = db_config_trans(DB), case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of + [] when length(Sites) == 0 -> + mnesia:abort({too_few_sites, Sites}); [] -> ok; NonexistentSites -> @@ -440,15 +481,19 @@ modify_db_sites_trans(DB, Modifications) -> case Sites of Sites0 -> ok; - _Chagned -> + _Changed -> assign_db_sites_trans(DB, Sites) end. update_replica_set_trans(DB, Shard, Trans) -> case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] -> + %% NOTE + %% It's possible to complete a transition that's no longer planned. We + %% should anticipate that we may stray _away_ from the target set. + TargetSet1 = emqx_maybe:define(TargetSet0, ReplicaSet0), ReplicaSet = apply_transition(Trans, ReplicaSet0), - case lists:usort(TargetSet0) of + case lists:usort(TargetSet1) of ReplicaSet -> TargetSet = undefined; TS -> @@ -613,6 +658,38 @@ transaction(Fun, Args) -> {error, Reason} end. +%%==================================================================== + +handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) -> + case maps:is_key(Pid, Subs0) of + false -> + MRef = erlang:monitor(process, Pid), + Subs = Subs0#{Pid => {Subject, MRef}}, + S#s{subs = Subs}; + true -> + S + end. + +handle_unsubscribe(Pid, S = #s{subs = Subs0}) -> + case maps:take(Pid, Subs0) of + {{_Subject, MRef}, Subs} -> + _ = erlang:demonitor(MRef, [flush]), + S#s{subs = Subs}; + error -> + S + end. + +notify_subscribers(EventSubject, Event, #s{subs = Subs}) -> + maps:foreach( + fun(Pid, {Subject, _MRef}) -> + Subject == EventSubject andalso + erlang:send(Pid, {changed, Event}) + end, + Subs + ). + +%%==================================================================== + %% @doc Intersperse elements of two lists. %% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5]. -spec intersperse([X], [Y]) -> [X | Y]. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 45739fbe3..f4c0d3b01 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -21,6 +21,7 @@ %% Static server configuration -export([ shard_servers/2, + shard_server/3, local_server/2 ]). @@ -30,6 +31,14 @@ server/3 ]). +%% Membership +-export([ + add_local_server/2, + drop_local_server/2, + remove_server/3, + server_info/2 +]). + -behaviour(gen_server). -export([ init/1, @@ -38,21 +47,31 @@ terminate/2 ]). +-type server() :: ra:server_id(). + +-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). + %% start_link(DB, Shard, Opts) -> gen_server:start_link(?MODULE, {DB, Shard, Opts}, []). +-spec shard_servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [server()]. shard_servers(DB, Shard) -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), - [ - {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} - || Site <- ReplicaSet - ]. + [shard_server(DB, Shard, Site) || Site <- ReplicaSet]. +-spec shard_server( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer_meta:site() +) -> server(). +shard_server(DB, Shard, Site) -> + {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}. + +-spec local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> server(). local_server(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - {server_name(DB, Shard, Site), node()}. + {server_name(DB, Shard, local_site()), node()}. cluster_name(DB, Shard) -> iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). @@ -79,13 +98,11 @@ get_servers_leader_preferred(DB, Shard) -> Servers = ra_leaderboard:lookup_members(ClusterName), [Leader | lists:delete(Leader, Servers)]; undefined -> - %% TODO: Dynamic membership. get_shard_servers(DB, Shard) end. get_server_local_preferred(DB, Shard) -> - %% NOTE: Contact random replica that is not a known leader. - %% TODO: Replica may be down, so we may need to retry. + %% NOTE: Contact either local server or a random replica. ClusterName = get_cluster_name(DB, Shard), case ra_leaderboard:lookup_members(ClusterName) of Servers when is_list(Servers) -> @@ -94,15 +111,21 @@ get_server_local_preferred(DB, Shard) -> %% TODO %% Leader is unkonwn if there are no servers of this group on the %% local node. We want to pick a replica in that case as well. - %% TODO: Dynamic membership. pick_random(get_shard_servers(DB, Shard)) end. +lookup_leader(DB, Shard) -> + %% NOTE + %% Does not block, but the result may be outdated or even unknown when there's + %% no servers on the local node. + ClusterName = get_cluster_name(DB, Shard), + ra_leaderboard:lookup_leader(ClusterName). + pick_local(Servers) -> - case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of - [Local | _] -> + case lists:keyfind(node(), 2, Servers) of + Local when is_tuple(Local) -> Local; - [] -> + false -> pick_random(Servers) end. @@ -118,11 +141,166 @@ get_local_server(DB, Shard) -> get_shard_servers(DB, Shard) -> maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)). +local_site() -> + emqx_ds_replication_layer_meta:this_site(). + +%% + +%% @doc Add a local server to the shard cluster. +%% It's recommended to have the local server running before calling this function. +%% This function is idempotent. +-spec add_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + ok | emqx_ds:error(_Reason). +add_local_server(DB, Shard) -> + %% NOTE + %% Adding local server as "promotable" member to the cluster, which means + %% that it will affect quorum until it is promoted to a voter, which in + %% turn happens when the server has caught up sufficiently with the log. + %% We also rely on this "membership" to understand when the server's + %% readiness. + ShardServers = shard_servers(DB, Shard), + LocalServer = local_server(DB, Shard), + case server_info(uid, LocalServer) of + UID when is_binary(UID) -> + ServerRecord = #{ + id => LocalServer, + membership => promotable, + uid => UID + }; + unknown -> + ServerRecord = #{ + id => LocalServer, + membership => voter + } + end, + Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, + case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of + {ok, _, _Leader} -> + ok; + {error, already_member} -> + ok; + Error -> + {error, recoverable, Error} + end. + +%% @doc Remove a local server from the shard cluster and clean up on-disk data. +%% It's required to have the local server running before calling this function. +%% This function is idempotent. +-spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + ok | emqx_ds:error(_Reason). +drop_local_server(DB, Shard) -> + ShardServers = shard_servers(DB, Shard), + LocalServer = local_server(DB, Shard), + case lookup_leader(DB, Shard) of + LocalServer -> + %% NOTE + %% Trigger leadership transfer *and* force to wait until the new leader + %% is elected and updated in the leaderboard. This should help to avoid + %% edge cases where entries appended right before removal are duplicated + %% due to client retries. + %% Timeouts are ignored, it's a best effort attempt. + [Candidate | _] = lists:delete(LocalServer, ShardServers), + _ = ra:transfer_leadership(LocalServer, Candidate), + _ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end); + _Another -> + ok + end, + case remove_server(DB, Shard, LocalServer) of + ok -> + ra:force_delete_server(DB, LocalServer); + {error, _, _Reason} = Error -> + Error + end. + +%% @doc Remove a (remote) server from the shard cluster. +%% The server might not be running when calling this function, e.g. the node +%% might be offline. Because of this, on-disk data will not be cleaned up. +%% This function is idempotent. +-spec remove_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), server()) -> + ok | emqx_ds:error(_Reason). +remove_server(DB, Shard, Server) -> + ShardServers = shard_servers(DB, Shard), + Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, + case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of + {ok, _, _Leader} -> + ok; + {error, not_member} -> + ok; + Error -> + {error, recoverable, Error} + end. + +-spec server_info + (readiness, server()) -> ready | {unready, _Status, _Membership} | unknown; + (leader, server()) -> server() | unknown; + (uid, server()) -> _UID :: binary() | unknown. +server_info(readiness, Server) -> + %% NOTE + %% Server is ready if it's either the leader or a follower with voter "membership" + %% status (meaning it was promoted after catching up with the log). + case current_leader(Server) of + Server -> + ready; + Leader when Leader /= unknown -> + member_info(readiness, Server, Leader); + unknown -> + unknown + end; +server_info(leader, Server) -> + current_leader(Server); +server_info(uid, Server) -> + maps:get(uid, ra_overview(Server), unknown). + +member_info(readiness, Server, Leader) -> + Cluster = maps:get(cluster, ra_overview(Leader), #{}), + member_readiness(maps:get(Server, Cluster, #{})). + +current_leader(Server) -> + %% NOTE: This call will block until the leader is known, or until the timeout. + case ra:members(Server) of + {ok, _Servers, Leader} -> + Leader; + _Error -> + unknown + end. + +member_readiness(#{status := Status, voter_status := #{membership := Membership}}) -> + case Status of + normal when Membership =:= voter -> + ready; + _Other -> + {unready, Status, Membership} + end; +member_readiness(#{}) -> + unknown. + +%% + +ra_try_servers([Server | Rest], Fun, Args) -> + case erlang:apply(Fun, [Server | Args]) of + {ok, R, Leader} -> + {ok, R, Leader}; + {error, Reason} when Reason == noproc; Reason == nodedown -> + ra_try_servers(Rest, Fun, Args); + ErrorOrTimeout -> + ErrorOrTimeout + end; +ra_try_servers([], _Fun, _Args) -> + {error, servers_unreachable}. + +ra_overview(Server) -> + case ra:member_overview(Server) of + {ok, Overview, _Leader} -> + Overview; + _Error -> + #{} + end. + %% init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - _Meta = start_shard(DB, Shard, Opts), + ok = start_shard(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -138,7 +316,6 @@ terminate(_Reason, {DB, Shard}) -> %% start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> - Site = emqx_ds_replication_layer_meta:this_site(), ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), @@ -157,7 +334,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> ), ok = ra:start_server(DB, #{ id => LocalServer, - uid => <>, + uid => server_uid(DB, Shard), cluster_name => ClusterName, initial_members => Servers, machine => Machine, @@ -188,12 +365,17 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> end; _ -> ok - end, - #{ - cluster_name => ClusterName, - servers => Servers, - local_server => LocalServer - }. + end. + +server_uid(_DB, Shard) -> + %% NOTE + %% Each new "instance" of a server should have a unique identifier. Otherwise, + %% if some server migrates to another node during rebalancing, and then comes + %% back, `ra` will be very confused by it having the same UID as before. + %% Keeping the shard ID as a prefix to make it easier to identify the server + %% in the filesystem / logs / etc. + Ts = integer_to_binary(erlang:system_time(microsecond)), + <>. %% @@ -207,3 +389,24 @@ memoize(Fun, Args) -> Result -> Result end. + +wait_until(Fun) -> + wait_until(Fun, 5_000, 250). + +wait_until(Fun, Timeout, Sleep) -> + Deadline = erlang:monotonic_time(millisecond) + Timeout, + loop_until(Fun, Deadline, Sleep). + +loop_until(Fun, Deadline, Sleep) -> + case Fun() of + true -> + ok; + false -> + case erlang:monotonic_time(millisecond) of + Now when Now < Deadline -> + timer:sleep(Sleep), + loop_until(Fun, Deadline, Sleep); + _ -> + timeout + end + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 7393da692..f02335a10 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -16,11 +16,16 @@ -module(emqx_ds_replication_shard_allocator). +-include_lib("snabbkaffe/include/trace.hrl"). + -export([start_link/1]). -export([n_shards/1]). -export([shard_meta/2]). +%% Maintenace purposes: +-export([trigger_transitions/1]). + -behaviour(gen_server). -export([ init/1, @@ -30,14 +35,37 @@ terminate/2 ]). +-export([handle_transition/4]). + -define(db_meta(DB), {?MODULE, DB}). -define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). +-define(ALLOCATE_RETRY_TIMEOUT, 1_000). + +-define(TRANS_RETRY_TIMEOUT, 5_000). +-define(CRASH_RETRY_DELAY, 20_000). +-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). + +-ifdef(TEST). +-undef(TRANS_RETRY_TIMEOUT). +-undef(REMOVE_REPLICA_DELAY). +-define(TRANS_RETRY_TIMEOUT, 1_000). +-define(REMOVE_REPLICA_DELAY, {3_000, 2_000}). +-endif. + %% +-record(trigger_transitions, {}). + +-spec start_link(emqx_ds:db()) -> {ok, pid()}. start_link(DB) -> gen_server:start_link(?MODULE, DB, []). +-spec trigger_transitions(pid()) -> ok. +trigger_transitions(Pid) -> + gen_server:cast(Pid, #trigger_transitions{}). + +-spec n_shards(emqx_ds:db()) -> non_neg_integer(). n_shards(DB) -> Meta = persistent_term:get(?db_meta(DB)), maps:get(n_shards, Meta). @@ -47,26 +75,60 @@ shard_meta(DB, Shard) -> %% --define(ALLOCATE_RETRY_TIMEOUT, 1_000). +-record(transhdl, { + shard :: emqx_ds_replication_layer:shard_id(), + trans :: emqx_ds_replication_layer_meta:transition(), + pid :: pid() +}). +-type state() :: #{ + db := emqx_ds:db(), + shards := [emqx_ds_replication_layer:shard_id()], + status := allocating | ready, + transitions := #{_Track => #transhdl{}} +}. + +-spec init(emqx_ds:db()) -> {ok, state()}. init(DB) -> _ = erlang:process_flag(trap_exit, true), - _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), - State = #{db => DB, status => allocating}, - handle_allocate_shards(State, ok). + _ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}), + State = #{ + db => DB, + shards => [], + status => allocating, + transitions => #{} + }, + {ok, handle_allocate_shards(State)}. +-spec handle_call(_Call, _From, state()) -> {reply, ignored, state()}. handle_call(_Call, _From, State) -> {reply, ignored, State}. +-spec handle_cast(_Cast, state()) -> {noreply, state()}. +handle_cast(#trigger_transitions{}, State) -> + {noreply, handle_pending_transitions(State)}; handle_cast(_Cast, State) -> {noreply, State}. -handle_info(timeout, State) -> - handle_allocate_shards(State, noreply); +-spec handle_info(Info, state()) -> {noreply, state()} when + Info :: + emqx_ds_replication_layer_meta:subscription_event() + | {timeout, reference(), allocate} + | {'EXIT', pid(), _Reason}. +handle_info({timeout, _TRef, allocate}, State) -> + {noreply, handle_allocate_shards(State)}; +handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> + {noreply, handle_shard_changed(Shard, State)}; +handle_info({changed, _}, State) -> + {noreply, State}; +handle_info({'EXIT', Pid, Reason}, State) -> + {noreply, handle_exit(Pid, Reason, State)}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #{db := DB, shards := Shards}) -> +-spec terminate(_Reason, state()) -> _Ok. +terminate(_Reason, State = #{db := DB, shards := Shards}) -> + unsubscribe_db_changes(State), erase_db_meta(DB), erase_shards_meta(DB, Shards); terminate(_Reason, #{}) -> @@ -74,10 +136,14 @@ terminate(_Reason, #{}) -> %% -handle_allocate_shards(State, Ret) -> - case allocate_shards(State) of - {ok, NState} -> - {Ret, NState}; +handle_allocate_shards(State0) -> + case allocate_shards(State0) of + {ok, State} -> + %% NOTE + %% Subscribe to shard changes and trigger any yet unhandled transitions. + ok = subscribe_db_changes(State), + ok = trigger_transitions(self()), + State; {error, Data} -> _ = logger:notice( Data#{ @@ -85,7 +151,235 @@ handle_allocate_shards(State, Ret) -> retry_in => ?ALLOCATE_RETRY_TIMEOUT } ), - {Ret, State, ?ALLOCATE_RETRY_TIMEOUT} + _TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate), + State0 + end. + +subscribe_db_changes(#{db := DB}) -> + emqx_ds_replication_layer_meta:subscribe(self(), DB). + +unsubscribe_db_changes(_State) -> + emqx_ds_replication_layer_meta:unsubscribe(self()). + +%% + +handle_shard_changed(Shard, State = #{db := DB}) -> + ok = save_shard_meta(DB, Shard), + handle_shard_transitions(Shard, next_transitions(DB, Shard), State). + +handle_pending_transitions(State = #{db := DB, shards := Shards}) -> + lists:foldl( + fun(Shard, StateAcc) -> + handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc) + end, + State, + Shards + ). + +next_transitions(DB, Shard) -> + emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard). + +handle_shard_transitions(_Shard, [], State) -> + %% We reached the target allocation. + State; +handle_shard_transitions(Shard, [Trans | _Rest], State) -> + case transition_handler(Shard, Trans, State) of + {Track, Handler} -> + ensure_transition_handler(Track, Shard, Trans, Handler, State); + undefined -> + State + end. + +transition_handler(Shard, Trans, _State = #{db := DB}) -> + ThisSite = emqx_ds_replication_layer_meta:this_site(), + case Trans of + {add, ThisSite} -> + {Shard, fun trans_add_local/3}; + {del, ThisSite} -> + {Shard, fun trans_drop_local/3}; + {del, Site} -> + ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + case lists:member(Site, ReplicaSet) of + true -> + %% NOTE + %% Let the replica handle its own removal first, but still set + %% up a removal handler after a delay, in case the replica is + %% unresponsive. + Handler = {fun trans_delay/5, [ + ?REMOVE_REPLICA_DELAY, + fun trans_rm_unresponsive/3 + ]}, + %% NOTE + %% Putting this transition handler on separate "track" so that it + %% won't block any changes with higher priority (e.g. managing + %% local replicas). + {{unresp, Shard}, Handler}; + false -> + undefined + end; + _NotOurs -> + %% This site is not involved in the next queued transition. + undefined + end. + +handle_transition(DB, Shard, Trans, Handler) -> + logger:set_process_metadata(#{ + db => DB, + shard => Shard, + domain => [emqx, ds, DB, shard_transition] + }), + ?tp( + dsrepl_shard_transition_begin, + #{shard => Shard, db => DB, transition => Trans, pid => self()} + ), + apply_handler(Handler, DB, Shard, Trans). + +apply_handler({Fun, Args}, DB, Shard, Trans) -> + erlang:apply(Fun, [DB, Shard, Trans | Args]); +apply_handler(Fun, DB, Shard, Trans) -> + erlang:apply(Fun, [DB, Shard, Trans]). + +trans_add_local(DB, Shard, {add, Site}) -> + logger:info(#{msg => "Adding new local shard replica", site => Site}), + do_add_local(membership, DB, Shard). + +do_add_local(membership = Stage, DB, Shard) -> + ok = start_shard(DB, Shard), + case emqx_ds_replication_layer_shard:add_local_server(DB, Shard) of + ok -> + do_add_local(readiness, DB, Shard); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_add_local(Stage, DB, Shard) + end; +do_add_local(readiness = Stage, DB, Shard) -> + LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), + case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of + ready -> + logger:info(#{msg => "Local shard replica ready"}); + Status -> + logger:warning(#{ + msg => "Still waiting for local shard replica to be ready", + status => Status, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_add_local(Stage, DB, Shard) + end. + +trans_drop_local(DB, Shard, {del, Site}) -> + logger:info(#{msg => "Dropping local shard replica", site => Site}), + do_drop_local(DB, Shard). + +do_drop_local(DB, Shard) -> + case emqx_ds_replication_layer_shard:drop_local_server(DB, Shard) of + ok -> + ok = emqx_ds_builtin_db_sup:stop_shard({DB, Shard}), + ok = emqx_ds_storage_layer:drop_shard({DB, Shard}), + logger:info(#{msg => "Local shard replica dropped"}); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_drop_local(DB, Shard) + end. + +trans_rm_unresponsive(DB, Shard, {del, Site}) -> + logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), + do_rm_unresponsive(DB, Shard, Site). + +do_rm_unresponsive(DB, Shard, Site) -> + Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), + case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of + ok -> + logger:info(#{msg => "Unresponsive shard replica removed"}); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_rm_unresponsive(DB, Shard, Site) + end. + +trans_delay(DB, Shard, Trans, Delay, NextHandler) -> + ok = delay(Delay), + %% NOTE: Proceed only if the transition we are going to handle is still desired. + case next_transitions(DB, Shard) of + [Trans | _] -> + apply_handler(NextHandler, DB, Shard, Trans); + _Outdated -> + exit({shutdown, skipped}) + end. + +%% + +ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> + case maps:get(Track, Ts, undefined) of + undefined -> + Pid = start_transition_handler(Shard, Trans, Handler, State), + Record = #transhdl{shard = Shard, trans = Trans, pid = Pid}, + State#{transitions := Ts#{Track => Record}}; + _AlreadyRunning -> + %% NOTE: Avoiding multiple transition handlers for the same shard for safety. + State + end. + +start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> + proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). + +handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> + case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of + [{Track, #transhdl{shard = Shard, trans = Trans}}] -> + ?tp( + dsrepl_shard_transition_end, + #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} + ), + State = State0#{transitions := maps:remove(Track, Ts)}, + handle_transition_exit(Shard, Trans, Reason, State); + [] -> + %% NOTE + %% Actually, it's sort of expected to have a portion of exit signals here, + %% because of `mria:with_middleman/3`. But it's impossible to tell them apart + %% from other singals. + logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), + State0 + end. + +handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> + %% NOTE: This will trigger the next transition if any. + ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans), + State; +handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> + State; +handle_transition_exit(Shard, Trans, Reason, State) -> + logger:warning(#{ + msg => "Shard membership transition failed", + shard => Shard, + transition => Trans, + reason => Reason, + retry_in => ?CRASH_RETRY_DELAY + }), + %% NOTE + %% In case of `{add, Site}` transition failure, we have no choice but to retry: + %% no other node can perform the transition and make progress towards the desired + %% state. + case Trans of + {add, _ThisSite} -> + {Track, Handler} = transition_handler(Shard, Trans, State), + RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]}, + ensure_transition_handler(Track, Shard, Trans, RetryHandler, State); + _Another -> + State end. %% @@ -93,7 +387,7 @@ handle_allocate_shards(State, Ret) -> allocate_shards(State = #{db := DB}) -> case emqx_ds_replication_layer_meta:allocate_shards(DB) of {ok, Shards} -> - logger:notice(#{msg => "Shards allocated", shards => Shards}), + logger:info(#{msg => "Shards allocated", shards => Shards}), ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), ok = start_egresses(DB, Shards), ok = save_db_meta(DB, Shards), @@ -104,25 +398,23 @@ allocate_shards(State = #{db := DB}) -> end. start_shards(DB, Shards) -> - ok = lists:foreach( - fun(Shard) -> - ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}) - end, - Shards - ), - ok = logger:info(#{msg => "Shards started", shards => Shards}), + lists:foreach(fun(Shard) -> start_shard(DB, Shard) end, Shards). + +start_shard(DB, Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}), + ok = logger:info(#{msg => "Shard started", shard => Shard}), ok. start_egresses(DB, Shards) -> - ok = lists:foreach( - fun(Shard) -> - ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}) - end, - Shards - ), - logger:info(#{msg => "Egresses started", shards => Shards}), + lists:foreach(fun(Shard) -> start_egress(DB, Shard) end, Shards). + +start_egress(DB, Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}), + ok = logger:info(#{msg => "Egress started", shard => Shard}), ok. +%% + save_db_meta(DB, Shards) -> persistent_term:put(?db_meta(DB), #{ shards => Shards, @@ -146,3 +438,10 @@ erase_shards_meta(DB, Shards) -> erase_shard_meta(DB, Shard) -> persistent_term:erase(?shard_meta(DB, Shard)). + +%% + +delay({MinDelay, Variance}) -> + timer:sleep(MinDelay + rand:uniform(Variance)); +delay(Delay) -> + timer:sleep(Delay). diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 24e7cdafb..9fc55d170 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -26,18 +26,45 @@ -define(DB, testdb). opts() -> - #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 1, - n_sites => 3, - replication_factor => 3, - replication_options => #{ - wal_max_size_bytes => 128 * 1024, - wal_max_batch_size => 1024, - snapshot_interval => 128 - } - }. + opts(#{}). + +opts(Overrides) -> + maps:merge( + #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 16, + n_sites => 1, + replication_factor => 3, + replication_options => #{ + wal_max_size_bytes => 64 * 1024, + wal_max_batch_size => 1024, + snapshot_interval => 128 + } + }, + Overrides + ). + +appspec(emqx_durable_storage) -> + {emqx_durable_storage, #{ + before_start => fun snabbkaffe:fix_ct_logging/0, + override_env => [{egress_flush_interval, 1}] + }}. + +t_replication_transfers_snapshots(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + NodeSpecs = emqx_cth_cluster:mk_nodespecs( + [ + {t_replication_transfers_snapshots1, #{apps => Apps}}, + {t_replication_transfers_snapshots2, #{apps => Apps}}, + {t_replication_transfers_snapshots3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(NodeSpecs), + [{nodes, Nodes}, {specs, NodeSpecs} | Config]; +t_replication_transfers_snapshots('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_replication_transfers_snapshots(Config) -> NMsgs = 4000, @@ -45,9 +72,10 @@ t_replication_transfers_snapshots(Config) -> _Specs = [_, SpecOffline | _] = ?config(specs, Config), %% Initialize DB on all nodes and wait for it to be online. + Opts = opts(#{n_shards => 1, n_sites => 3}), ?assertEqual( [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()]) + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) ), ?retry( 500, @@ -88,7 +116,7 @@ t_replication_transfers_snapshots(Config) -> Shard = hd(shards(NodeOffline, ?DB)), MessagesOffline = lists:keysort( #message.timestamp, - consume(NodeOffline, ?DB, Shard, ['#'], 0) + consume_shard(NodeOffline, ?DB, Shard, ['#'], 0) ), ?assertEqual( sample(40, Messages), @@ -99,26 +127,391 @@ t_replication_transfers_snapshots(Config) -> MessagesOffline ). +t_rebalance(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_rebalance1, #{apps => Apps}}, + {t_rebalance2, #{apps => Apps}}, + {t_rebalance3, #{apps => Apps}}, + {t_rebalance4, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_rebalance('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance(Config) -> + %% This testcase verifies that the storage rebalancing works correctly: + %% 1. Join/leave operations are applied successfully. + %% 2. Message data survives the rebalancing. + %% 3. Shard cluster membership converges to the target replica allocation. + %% 4. Replication factor is respected. + + NMsgs = 800, + NClients = 5, + Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), + + %% Initialize DB on the first node. + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), + ?assertMatch( + Shards when length(Shards) == 16, + shards_online(N1, ?DB) + ), + + %% Open DB on the rest of the nodes. + ?assertEqual( + [{ok, ok} || _ <- [N2, N3, N4]], + erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) + ), + + Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Only N1 should be responsible for all shards initially. + ?assertEqual( + [[S1] || _ <- Nodes], + [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] + ), + + %% Fill the storage with messages and few additional generations. + %% This will force shards to trigger snapshot transfers during rebalance. + ClientMessages = emqx_utils:pmap( + fun(CID) -> + N = lists:nth(1 + (CID rem length(Nodes)), Nodes), + fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) + end, + lists:seq(1, NClients), + infinity + ), + Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)), + + %% Join the second site to the DB replication sites. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), + + %% Fill in some more messages *during* the rebalance. + MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}), + + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Now join the rest of the sites. + ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), + + %% Fill in some more messages *during* the rebalance. + MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for 3/4 of the shards. + ?assertEqual( + [(16 * 3) div length(Nodes) || _ <- Nodes], + [n_shards_online(N, ?DB) || N <- Nodes] + ), + + %% Verify that the set of shard servers matches the target allocation. + Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], + ShardServers = [ + shard_server_info(N, ?DB, Shard, Site, readiness) + || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), + Shard <- Shards + ], + ?assert( + lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), + ShardServers + ), + + %% Verify that the messages are preserved after the rebalance. + Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2, + MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesN4)), + ?assertEqual(Messages, MessagesN4), + + %% Scale down the cluster by removing the first node. + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for each shard. + ?assertEqual( + [0, 16, 16, 16], + [n_shards_online(N, ?DB) || N <- Nodes] + ), + + %% Verify that the messages are once again preserved after the rebalance. + MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), + ?assertEqual(Messages, MessagesN3). + +t_join_leave_errors(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_join_leave_errors1, #{apps => Apps}}, + {t_join_leave_errors2, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_join_leave_errors('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_join_leave_errors(Config) -> + %% This testcase verifies that logical errors arising during handling of + %% join/leave operations are reported correctly. + + [N1, N2] = ?config(nodes, Config), + + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), + ?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])), + + [S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]], + + ?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])), + + %% Attempts to join a nonexistent DB / site. + ?assertEqual( + {error, {nonexistent_db, boo}}, + ds_repl_meta(N1, join_db_site, [_DB = boo, S1]) + ), + ?assertEqual( + {error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}}, + ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>]) + ), + %% NOTE: Leaving a non-existent site is not an error. + ?assertEqual( + ok, + ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>]) + ), + + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])), + ?assertEqual([], transitions(N1, ?DB)), + + %% Impossible to leave the last site. + ?assertEqual( + {error, {too_few_sites, []}}, + ds_repl_meta(N1, leave_db_site, [?DB, S1]) + ), + + %% "Move" the DB to the other node. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), + ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertMatch([_ | _], transitions(N1, ?DB)), + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])), + ?assertEqual([], transitions(N1, ?DB)). + +t_rebalance_chaotic_converges(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_rebalance_chaotic_converges1, #{apps => Apps}}, + {t_rebalance_chaotic_converges2, #{apps => Apps}}, + {t_rebalance_chaotic_converges3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_rebalance_chaotic_converges('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance_chaotic_converges(Config) -> + %% This testcase verifies that even a very chaotic sequence of join/leave + %% operations will still be handled consistently, and that the shard + %% allocation will converge to the target state. + + NMsgs = 500, + Nodes = [N1, N2, N3] = ?config(nodes, Config), + + %% Initialize DB on first two nodes. + Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), + + %% Open DB on the last node. + ?assertEqual( + ok, + erpc:call(N3, emqx_ds, open_db, [?DB, Opts]) + ), + + %% Find out which sites there are. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Initially, the DB is assigned to [S1, S2]. + ?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])), + ?assertEqual( + lists:sort([S1, S2]), + ds_repl_meta(N1, db_sites, [?DB]) + ), + + %% Fill the storage with messages and few additional generations. + Messages0 = lists:append([ + fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}), + fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}), + fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>}) + ]), + + %% Construct a chaotic transition sequence that changes assignment to [S2, S3]. + Sequence = [ + {N1, join_db_site, S3}, + {N2, leave_db_site, S2}, + {N3, leave_db_site, S1}, + {N1, join_db_site, S2}, + {N2, join_db_site, S1}, + {N3, leave_db_site, S3}, + {N1, leave_db_site, S1}, + {N2, join_db_site, S3} + ], + + %% Apply the sequence while also filling the storage with messages. + TransitionMessages = lists:map( + fun({N, Operation, Site}) -> + %% Apply the transition. + ?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])), + %% Give some time for at least one transition to complete. + Transitions = transitions(N, ?DB), + ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))), + %% Fill the storage with messages. + CID = integer_to_binary(erlang:system_time()), + fill_storage(N, ?DB, NMsgs, #{client_id => CID}) + end, + Sequence + ), + + %% Wait for the last transition to complete. + ?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))), + + ?assertEqual( + lists:sort([S2, S3]), + ds_repl_meta(N1, db_sites, [?DB]) + ), + + %% Check that all messages are still there. + Messages = lists:append(TransitionMessages) ++ Messages0, + MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesDB)), + ?assertEqual(Messages, MessagesDB). + +t_rebalance_offline_restarts(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {t_rebalance_offline_restarts1, #{apps => Apps}}, + {t_rebalance_offline_restarts2, #{apps => Apps}}, + {t_rebalance_offline_restarts3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + [{nodes, Nodes}, {nodespecs, Specs} | Config]; +t_rebalance_offline_restarts('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_rebalance_offline_restarts(Config) -> + %% This testcase verifies that rebalancing progresses if nodes restart or + %% go offline and never come back. + + Nodes = [N1, N2, N3] = ?config(nodes, Config), + _Specs = [NS1, NS2, _] = ?config(nodespecs, Config), + + %% Initialize DB on all 3 nodes. + Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}), + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) + ), + ?retry( + 500, + 10, + ?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes]) + ), + + %% Find out which sites are there. + Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Shut down N3 and then remove it from the DB. + ok = emqx_cth_cluster:stop_node(N3), + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])), + Transitions = transitions(N1, ?DB), + ct:pal("Transitions: ~p~n", [Transitions]), + + %% Wait until at least one transition completes. + ?block_until(#{?snk_kind := dsrepl_shard_transition_end}), + + %% Restart N1 and N2. + [N1] = emqx_cth_cluster:restart(NS1), + [N2] = emqx_cth_cluster:restart(NS2), + ?assertEqual( + [{ok, ok}, {ok, ok}], + erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts]) + ), + + %% Target state should still be reached eventually. + ?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))), + ?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])). + +%% + +shard_server_info(Node, DB, Shard, Site, Info) -> + Server = shard_server(Node, DB, Shard, Site), + {Server, ds_repl_shard(Node, server_info, [Info, Server])}. + +shard_server(Node, DB, Shard, Site) -> + ds_repl_shard(Node, shard_server, [DB, Shard, Site]). + +ds_repl_meta(Node, Fun) -> + ds_repl_meta(Node, Fun, []). + +ds_repl_meta(Node, Fun, Args) -> + erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). + +ds_repl_shard(Node, Fun, Args) -> + erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). + +transitions(Node, DB) -> + Shards = shards(Node, DB), + [{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])]. + shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). shards_online(Node, DB) -> erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]). +n_shards_online(Node, DB) -> + length(shards_online(Node, DB)). + fill_storage(Node, DB, NMsgs, Opts) -> fill_storage(Node, DB, NMsgs, 0, Opts). -fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs -> - R1 = push_message(Node, DB, I), +fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> + PAddGen = maps:get(p_addgen, Opts, 0.001), + R1 = push_message(Node, DB, I, Opts), R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> []. -push_message(Node, DB, I) -> +push_message(Node, DB, I, Opts) -> Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), - Message = message(Topic, Bytes, I * 100), + ClientId = maps:get(client_id, Opts, <>), + Message = message(ClientId, Topic, Bytes, I * 100), ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), [Message]. @@ -126,16 +519,22 @@ add_generation(Node, DB) -> ok = erpc:call(Node, emqx_ds, add_generation, [DB]), []. -message(Topic, Payload, PublishedAt) -> +message(ClientId, Topic, Payload, PublishedAt) -> #message{ - from = <>, + from = ClientId, topic = Topic, payload = Payload, timestamp = PublishedAt, id = emqx_guid:gen() }. -consume(Node, DB, Shard, TopicFilter, StartTime) -> +compare_message(M1, M2) -> + {M1#message.from, M1#message.timestamp} < {M2#message.from, M2#message.timestamp}. + +consume(Node, DB, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_test_helpers, consume, [DB, TopicFilter, StartTime]). + +consume_shard(Node, DB, Shard, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]). probably(P, Fun) -> @@ -156,26 +555,11 @@ suite() -> [{timetrap, {seconds, 60}}]. all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(TCName, Config) -> - Apps = [ - {emqx_durable_storage, #{ - before_start => fun snabbkaffe:fix_ct_logging/0, - override_env => [{egress_flush_interval, 1}] - }} - ], - WorkDir = emqx_cth_suite:work_dir(TCName, Config), - NodeSpecs = emqx_cth_cluster:mk_nodespecs( - [ - {emqx_ds_replication_SUITE1, #{apps => Apps}}, - {emqx_ds_replication_SUITE2, #{apps => Apps}}, - {emqx_ds_replication_SUITE3, #{apps => Apps}} - ], - #{work_dir => WorkDir} - ), - Nodes = emqx_cth_cluster:start(NodeSpecs), +init_per_testcase(TCName, Config0) -> + Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0), ok = snabbkaffe:start_trace(), - [{nodes, Nodes}, {specs, NodeSpecs} | Config]. + Config. -end_per_testcase(_TCName, Config) -> +end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), - ok = emqx_cth_cluster:stop(?config(nodes, Config)). + emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).