diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 45739fbe3..a57e45dfd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -21,6 +21,7 @@ %% Static server configuration -export([ shard_servers/2, + shard_server/3, local_server/2 ]). @@ -30,6 +31,14 @@ server/3 ]). +%% Membership +-export([ + add_local_server/2, + drop_local_server/2, + remove_server/3, + server_info/2 +]). + -behaviour(gen_server). -export([ init/1, @@ -38,21 +47,31 @@ terminate/2 ]). +-type server() :: ra:server_id(). + +-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). + %% start_link(DB, Shard, Opts) -> gen_server:start_link(?MODULE, {DB, Shard, Opts}, []). +-spec shard_servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [server()]. shard_servers(DB, Shard) -> ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), - [ - {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} - || Site <- ReplicaSet - ]. + [shard_server(DB, Shard, Site) || Site <- ReplicaSet]. +-spec shard_server( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_replication_layer_meta:site() +) -> server(). +shard_server(DB, Shard, Site) -> + {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}. + +-spec local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> server(). local_server(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - {server_name(DB, Shard, Site), node()}. + {server_name(DB, Shard, local_site()), node()}. cluster_name(DB, Shard) -> iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). @@ -61,6 +80,14 @@ server_name(DB, Shard, Site) -> DBBin = atom_to_binary(DB), binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). +server_uid(_DB, Shard) -> + %% NOTE + %% Each new "instance" of a server should have a unique identifier. Otherwise, + %% if some server migrates to another node during rebalancing, and then comes + %% back, `ra` will be very confused by it having the same UID as before. + Ts = integer_to_binary(erlang:system_time(microsecond)), + <>. + %% servers(DB, Shard, _Order = leader_preferred) -> @@ -118,11 +145,100 @@ get_local_server(DB, Shard) -> get_shard_servers(DB, Shard) -> maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)). +local_site() -> + emqx_ds_replication_layer_meta:this_site(). + +%% + +add_local_server(DB, Shard) -> + %% NOTE + %% Adding local server as "promotable" member to the cluster, which means + %% that it will affect quorum until it is promoted to a voter, which in + %% turn happens when the server has caught up sufficiently with the log. + %% We also rely on this "membership" to understand when the server's + %% readiness. + ShardServers = shard_servers(DB, Shard), + LocalServer = local_server(DB, Shard), + ServerRecord = #{ + id => LocalServer, + membership => promotable, + uid => server_uid(DB, Shard) + }, + case ra:add_member(ShardServers, ServerRecord, ?MEMBERSHIP_CHANGE_TIMEOUT) of + {ok, _, _Leader} -> + ok; + {error, already_member} -> + ok; + {error, Reason} -> + {error, recoverable, Reason} + end. + +drop_local_server(DB, Shard) -> + LocalServer = local_server(DB, Shard), + case remove_server(DB, Shard, LocalServer) of + ok -> + ra:force_delete_server(DB, LocalServer); + {error, _, _Reason} = Error -> + Error + end. + +remove_server(DB, Shard, Server) -> + ShardServers = shard_servers(DB, Shard), + case ra:remove_member(ShardServers, Server, ?MEMBERSHIP_CHANGE_TIMEOUT) of + {ok, _, _Leader} -> + ok; + {error, not_member} -> + ok; + {error, Reason} -> + {error, recoverable, Reason} + end. + +server_info(readiness, Server) -> + %% NOTE + %% Server is ready if it's either the leader or a follower with voter "membership" + %% status (meaning it was promoted after catching up with the log). + case current_leader(Server) of + Server -> + ready; + Leader when Leader /= unknown -> + member_info(readiness, Server, Leader); + unknown -> + unknown + end; +server_info(leader, Server) -> + current_leader(Server). + +member_info(readiness, Server, Leader) -> + case ra:member_overview(Leader) of + {ok, #{cluster := Cluster}, _} -> + member_readiness(maps:get(Server, Cluster)); + _Error -> + unknown + end. + +current_leader(Server) -> + case ra:members(Server) of + {ok, _Servers, Leader} -> + Leader; + _Error -> + unknown + end. + +member_readiness(#{status := Status, voter_status := #{membership := Membership}}) -> + case Status of + normal when Membership =:= voter -> + ready; + _Other -> + {unready, Status, Membership} + end; +member_readiness(#{}) -> + unknown. + %% init({DB, Shard, Opts}) -> _ = process_flag(trap_exit, true), - _Meta = start_shard(DB, Shard, Opts), + ok = start_shard(DB, Shard, Opts), {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> @@ -138,7 +254,6 @@ terminate(_Reason, {DB, Shard}) -> %% start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> - Site = emqx_ds_replication_layer_meta:this_site(), ClusterName = cluster_name(DB, Shard), LocalServer = local_server(DB, Shard), Servers = shard_servers(DB, Shard), @@ -157,7 +272,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> ), ok = ra:start_server(DB, #{ id => LocalServer, - uid => <>, + uid => server_uid(DB, Shard), cluster_name => ClusterName, initial_members => Servers, machine => Machine, @@ -188,12 +303,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> end; _ -> ok - end, - #{ - cluster_name => ClusterName, - servers => Servers, - local_server => LocalServer - }. + end. %% diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 7393da692..4113fcedc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -16,6 +16,8 @@ -module(emqx_ds_replication_shard_allocator). +-include_lib("snabbkaffe/include/trace.hrl"). + -export([start_link/1]). -export([n_shards/1]). @@ -30,9 +32,23 @@ terminate/2 ]). +-export([handle_transition/4]). + -define(db_meta(DB), {?MODULE, DB}). -define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}). +-define(ALLOCATE_RETRY_TIMEOUT, 1_000). + +-define(TRANS_RETRY_TIMEOUT, 5_000). +-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}). + +-ifdef(TEST). +-undef(TRANS_RETRY_TIMEOUT). +-undef(REMOVE_REPLICA_DELAY). +-define(TRANS_RETRY_TIMEOUT, 1_000). +-define(REMOVE_REPLICA_DELAY, {4_000, 2_000}). +-endif. + %% start_link(DB) -> @@ -47,13 +63,11 @@ shard_meta(DB, Shard) -> %% --define(ALLOCATE_RETRY_TIMEOUT, 1_000). - init(DB) -> _ = erlang:process_flag(trap_exit, true), - _ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}), - State = #{db => DB, status => allocating}, - handle_allocate_shards(State, ok). + _ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}), + State = #{db => DB, transitions => #{}, status => allocating}, + {ok, handle_allocate_shards(State)}. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -61,12 +75,19 @@ handle_call(_Call, _From, State) -> handle_cast(_Cast, State) -> {noreply, State}. -handle_info(timeout, State) -> - handle_allocate_shards(State, noreply); +handle_info({timeout, _TRef, allocate}, State) -> + {noreply, handle_allocate_shards(State)}; +handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) -> + {noreply, handle_shard_changed(Shard, State)}; +handle_info({changed, _}, State) -> + {noreply, State}; +handle_info({'EXIT', Pid, Reason}, State) -> + {noreply, handle_exit(Pid, Reason, State)}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #{db := DB, shards := Shards}) -> +terminate(_Reason, State = #{db := DB, shards := Shards}) -> + unsubscribe_db_changes(State), erase_db_meta(DB), erase_shards_meta(DB, Shards); terminate(_Reason, #{}) -> @@ -74,10 +95,11 @@ terminate(_Reason, #{}) -> %% -handle_allocate_shards(State, Ret) -> +handle_allocate_shards(State) -> case allocate_shards(State) of {ok, NState} -> - {Ret, NState}; + ok = subscribe_db_changes(State), + NState; {error, Data} -> _ = logger:notice( Data#{ @@ -85,15 +107,197 @@ handle_allocate_shards(State, Ret) -> retry_in => ?ALLOCATE_RETRY_TIMEOUT } ), - {Ret, State, ?ALLOCATE_RETRY_TIMEOUT} + _TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate), + State end. +subscribe_db_changes(#{db := DB}) -> + emqx_ds_replication_layer_meta:subscribe(self(), DB). + +unsubscribe_db_changes(_State) -> + emqx_ds_replication_layer_meta:unsubscribe(self()). + +%% + +handle_shard_changed(Shard, State = #{db := DB}) -> + ok = save_shard_meta(DB, Shard), + Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), + handle_shard_transitions(Shard, Transitions, State). + +handle_shard_transitions(Shard, Transitions, State = #{db := DB}) -> + ThisSite = emqx_ds_replication_layer_meta:this_site(), + case Transitions of + [] -> + %% We reached the target allocation. + State; + [Trans = {add, ThisSite} | _Rest] -> + ensure_transition_handler(Shard, Trans, fun trans_add_local/3, State); + [Trans = {del, ThisSite} | _Rest] -> + ensure_transition_handler(Shard, Trans, fun trans_drop_local/3, State); + [Trans = {del, Site} | _Rest] -> + ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + case lists:member(Site, ReplicaSet) of + true -> + %% NOTE + %% Putting this transition handler on separate "track" so that it + %% won't block any changes with higher priority (e.g. managing + %% local replicas). + Handler = fun trans_rm_unresponsive/3, + ensure_transition_handler(unresp, Shard, Trans, Handler, State); + false -> + State + end; + [_Trans | _Rest] -> + %% This site is not involved in the next queued transition. + State + end. + +handle_transition(DB, Shard, Trans, Fun) -> + logger:set_process_metadata(#{ + db => DB, + shard => Shard, + domain => [emqx, ds, DB, shard_transition] + }), + ?tp( + dsrepl_shard_transition_begin, + #{shard => Shard, db => DB, transition => Trans, pid => self()} + ), + erlang:apply(Fun, [DB, Shard, Trans]). + +trans_add_local(DB, Shard, {add, Site}) -> + logger:info(#{msg => "Adding new local shard replica", site => Site}), + do_add_local(membership, DB, Shard). + +do_add_local(membership = Stage, DB, Shard) -> + ok = start_shard(DB, Shard), + case emqx_ds_replication_layer_shard:add_local_server(DB, Shard) of + ok -> + do_add_local(readiness, DB, Shard); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_add_local(Stage, DB, Shard) + end; +do_add_local(readiness = Stage, DB, Shard) -> + LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), + case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of + ready -> + logger:info(#{msg => "Local shard replica ready"}); + Status -> + logger:warning(#{ + msg => "Still waiting for local shard replica to be ready", + status => Status, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_add_local(Stage, DB, Shard) + end. + +trans_drop_local(DB, Shard, {del, Site}) -> + logger:info(#{msg => "Dropping local shard replica", site => Site}), + do_drop_local(DB, Shard). + +do_drop_local(DB, Shard) -> + case emqx_ds_replication_layer_shard:drop_local_server(DB, Shard) of + ok -> + ok = emqx_ds_builtin_db_sup:stop_shard({DB, Shard}), + ok = emqx_ds_storage_layer:drop_shard({DB, Shard}), + logger:info(#{msg => "Local shard replica dropped"}); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_drop_local(DB, Shard) + end. + +trans_rm_unresponsive(DB, Shard, Trans = {del, Site}) -> + %% NOTE + %% Let the replica handle its own removal first, thus the delay. + ok = delay(?REMOVE_REPLICA_DELAY), + Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard), + case Transitions of + [Trans | _] -> + logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), + do_rm_unresponsive(DB, Shard, Site); + _Outdated -> + exit({shutdown, skipped}) + end. + +do_rm_unresponsive(DB, Shard, Site) -> + Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), + case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of + ok -> + logger:info(#{msg => "Unresponsive shard replica removed"}); + {error, recoverable, Reason} -> + logger:warning(#{ + msg => "Shard membership change failed", + reason => Reason, + retry_in => ?TRANS_RETRY_TIMEOUT + }), + ok = timer:sleep(?TRANS_RETRY_TIMEOUT), + do_rm_unresponsive(DB, Shard, Site) + end. + +%% + +ensure_transition_handler(Shard, Trans, Handler, State) -> + ensure_transition_handler(Shard, Shard, Trans, Handler, State). + +ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) -> + case maps:get(Track, Ts, undefined) of + undefined -> + Pid = start_transition_handler(Shard, Trans, Handler, State), + State#{transitions := Ts#{Track => {Shard, Trans, Pid}}}; + _AlreadyRunning -> + %% NOTE: Avoiding multiple transition handlers for the same shard for safety. + State + end. + +start_transition_handler(Shard, Trans, Handler, #{db := DB}) -> + proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]). + +handle_exit(Pid, Reason, State = #{db := DB, transitions := Ts}) -> + case maps:to_list(maps:filter(fun(_, {_S, _T, P}) -> P == Pid end, Ts)) of + [{Track, {Shard, Trans, Pid}}] -> + ?tp( + dsrepl_shard_transition_end, + #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} + ), + ok = handle_transition_exit(Shard, Trans, Reason, State), + State#{transitions := maps:remove(Track, Ts)}; + [] -> + logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}), + State + end. + +handle_transition_exit(Shard, Trans, normal, _State = #{db := DB}) -> + %% NOTE: This will trigger the next transition if any. + ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans); +handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, _State) -> + ok; +handle_transition_exit(Shard, Trans, Reason, _State) -> + logger:warning(#{ + msg => "Shard membership transition failed", + shard => Shard, + transition => Trans, + reason => Reason + }), + %% FIXME: retry + ok. + %% allocate_shards(State = #{db := DB}) -> case emqx_ds_replication_layer_meta:allocate_shards(DB) of {ok, Shards} -> - logger:notice(#{msg => "Shards allocated", shards => Shards}), + logger:info(#{msg => "Shards allocated", shards => Shards}), ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)), ok = start_egresses(DB, Shards), ok = save_db_meta(DB, Shards), @@ -104,25 +308,23 @@ allocate_shards(State = #{db := DB}) -> end. start_shards(DB, Shards) -> - ok = lists:foreach( - fun(Shard) -> - ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}) - end, - Shards - ), - ok = logger:info(#{msg => "Shards started", shards => Shards}), + lists:foreach(fun(Shard) -> start_shard(DB, Shard) end, Shards). + +start_shard(DB, Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}), + ok = logger:info(#{msg => "Shard started", shard => Shard}), ok. start_egresses(DB, Shards) -> - ok = lists:foreach( - fun(Shard) -> - ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}) - end, - Shards - ), - logger:info(#{msg => "Egresses started", shards => Shards}), + lists:foreach(fun(Shard) -> start_egress(DB, Shard) end, Shards). + +start_egress(DB, Shard) -> + ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}), + ok = logger:info(#{msg => "Egress started", shard => Shard}), ok. +%% + save_db_meta(DB, Shards) -> persistent_term:put(?db_meta(DB), #{ shards => Shards, @@ -146,3 +348,8 @@ erase_shards_meta(DB, Shards) -> erase_shard_meta(DB, Shard) -> persistent_term:erase(?shard_meta(DB, Shard)). + +%% + +delay({MinDelay, Variance}) -> + timer:sleep(MinDelay + rand:uniform(Variance)). diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 24e7cdafb..872169765 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -26,18 +26,45 @@ -define(DB, testdb). opts() -> - #{ - backend => builtin, - storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 1, - n_sites => 3, - replication_factor => 3, - replication_options => #{ - wal_max_size_bytes => 128 * 1024, - wal_max_batch_size => 1024, - snapshot_interval => 128 - } - }. + opts(#{}). + +opts(Overrides) -> + maps:merge( + #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 16, + n_sites => 1, + replication_factor => 3, + replication_options => #{ + wal_max_size_bytes => 64 * 1024, + wal_max_batch_size => 1024, + snapshot_interval => 128 + } + }, + Overrides + ). + +appspec(emqx_durable_storage) -> + {emqx_durable_storage, #{ + before_start => fun snabbkaffe:fix_ct_logging/0, + override_env => [{egress_flush_interval, 1}] + }}. + +t_replication_transfers_snapshots(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + NodeSpecs = emqx_cth_cluster:mk_nodespecs( + [ + {t_replication_transfers_snapshots1, #{apps => Apps}}, + {t_replication_transfers_snapshots2, #{apps => Apps}}, + {t_replication_transfers_snapshots3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(NodeSpecs), + [{nodes, Nodes}, {specs, NodeSpecs} | Config]; +t_replication_transfers_snapshots('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). t_replication_transfers_snapshots(Config) -> NMsgs = 4000, @@ -45,9 +72,10 @@ t_replication_transfers_snapshots(Config) -> _Specs = [_, SpecOffline | _] = ?config(specs, Config), %% Initialize DB on all nodes and wait for it to be online. + Opts = opts(#{n_shards => 1, n_sites => 3}), ?assertEqual( [{ok, ok} || _ <- Nodes], - erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()]) + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts]) ), ?retry( 500, @@ -88,7 +116,7 @@ t_replication_transfers_snapshots(Config) -> Shard = hd(shards(NodeOffline, ?DB)), MessagesOffline = lists:keysort( #message.timestamp, - consume(NodeOffline, ?DB, Shard, ['#'], 0) + consume_shard(NodeOffline, ?DB, Shard, ['#'], 0) ), ?assertEqual( sample(40, Messages), @@ -99,26 +127,169 @@ t_replication_transfers_snapshots(Config) -> MessagesOffline ). +t_replication_rebalance(init, Config) -> + Apps = [appspec(emqx_durable_storage)], + Nodes = emqx_cth_cluster:start( + [ + {t_replication_rebalance1, #{apps => Apps}}, + {t_replication_rebalance2, #{apps => Apps}}, + {t_replication_rebalance3, #{apps => Apps}}, + {t_replication_rebalance4, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + [{nodes, Nodes} | Config]; +t_replication_rebalance('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_replication_rebalance(Config) -> + NMsgs = 800, + NClients = 5, + Nodes = [N1, N2, N3, N4] = ?config(nodes, Config), + + %% Initialize DB on the first node. + Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}), + ?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])), + ?assertMatch( + Shards when length(Shards) == 16, + shards_online(N1, ?DB) + ), + + %% Open DB on the rest of the nodes. + ?assertEqual( + [{ok, ok} || _ <- [N2, N3, N4]], + erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts]) + ), + + Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes], + ct:pal("Sites: ~p~n", [Sites]), + + %% Only N1 should be responsible for all shards initially. + ?assertEqual( + [[S1] || _ <- Nodes], + [ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes] + ), + + %% Fill the storage with messages and few additional generations. + %% This will force shards to trigger snapshot transfers during rebalance. + ClientMessages = emqx_utils:pmap( + fun(CID) -> + N = lists:nth(1 + (CID rem length(Nodes)), Nodes), + fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)}) + end, + lists:seq(1, NClients), + infinity + ), + Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)), + + %% Join the second site to the DB replication sites. + ?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])), + %% Should be no-op. + ?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]), + + %% Fill in some more messages *during* the rebalance. + MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}), + + ?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))), + + %% Now join the rest of the sites. + ?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])), + ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]), + + %% Fill in some more messages *during* the rebalance. + MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for 3/4 of the shards. + ?assertEqual( + [(16 * 3) div length(Nodes) || _ <- Nodes], + [n_shards_online(N, ?DB) || N <- Nodes] + ), + + %% Verify that the set of shard servers matches the target allocation. + Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes], + ShardServers = [ + shard_server_info(N, ?DB, Shard, Site, readiness) + || {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation), + Shard <- Shards + ], + ?assert( + lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers), + ShardServers + ), + + %% Verify that the messages are preserved after the rebalance. + Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2, + MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesN4)), + ?assertEqual(Messages, MessagesN4), + + %% Scale down the cluster by removing the first node. + ?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])), + ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]), + + ?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))), + + %% Verify that each node is now responsible for each shard. + ?assertEqual( + [0, 16, 16, 16], + [n_shards_online(N, ?DB) || N <- Nodes] + ), + + %% Verify that the messages are once again preserved after the rebalance. + MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)), + ?assertEqual(sample(20, Messages), sample(20, MessagesN3)), + ?assertEqual(Messages, MessagesN3). + +%% + +shard_server_info(Node, DB, Shard, Site, Info) -> + Server = shard_server(Node, DB, Shard, Site), + {Server, ds_repl_shard(Node, server_info, [Info, Server])}. + +shard_server(Node, DB, Shard, Site) -> + ds_repl_shard(Node, shard_server, [DB, Shard, Site]). + +ds_repl_meta(Node, Fun) -> + ds_repl_meta(Node, Fun, []). + +ds_repl_meta(Node, Fun, Args) -> + erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args). + +ds_repl_shard(Node, Fun, Args) -> + erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). + +transitions(Node, DB) -> + Shards = shards(Node, DB), + [{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])]. + shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). shards_online(Node, DB) -> erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]). +n_shards_online(Node, DB) -> + length(shards_online(Node, DB)). + fill_storage(Node, DB, NMsgs, Opts) -> fill_storage(Node, DB, NMsgs, 0, Opts). -fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs -> - R1 = push_message(Node, DB, I), +fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs -> + PAddGen = maps:get(p_addgen, Opts, 0.001), + R1 = push_message(Node, DB, I, Opts), R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> []. -push_message(Node, DB, I) -> +push_message(Node, DB, I, Opts) -> Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), - Message = message(Topic, Bytes, I * 100), + ClientId = maps:get(client_id, Opts, <>), + Message = message(ClientId, Topic, Bytes, I * 100), ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), [Message]. @@ -126,16 +297,22 @@ add_generation(Node, DB) -> ok = erpc:call(Node, emqx_ds, add_generation, [DB]), []. -message(Topic, Payload, PublishedAt) -> +message(ClientId, Topic, Payload, PublishedAt) -> #message{ - from = <>, + from = ClientId, topic = Topic, payload = Payload, timestamp = PublishedAt, id = emqx_guid:gen() }. -consume(Node, DB, Shard, TopicFilter, StartTime) -> +compare_message(M1, M2) -> + {M1#message.from, M1#message.timestamp} < {M2#message.from, M2#message.timestamp}. + +consume(Node, DB, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_test_helpers, consume, [DB, TopicFilter, StartTime]). + +consume_shard(Node, DB, Shard, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]). probably(P, Fun) -> @@ -156,26 +333,11 @@ suite() -> [{timetrap, {seconds, 60}}]. all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(TCName, Config) -> - Apps = [ - {emqx_durable_storage, #{ - before_start => fun snabbkaffe:fix_ct_logging/0, - override_env => [{egress_flush_interval, 1}] - }} - ], - WorkDir = emqx_cth_suite:work_dir(TCName, Config), - NodeSpecs = emqx_cth_cluster:mk_nodespecs( - [ - {emqx_ds_replication_SUITE1, #{apps => Apps}}, - {emqx_ds_replication_SUITE2, #{apps => Apps}}, - {emqx_ds_replication_SUITE3, #{apps => Apps}} - ], - #{work_dir => WorkDir} - ), - Nodes = emqx_cth_cluster:start(NodeSpecs), +init_per_testcase(TCName, Config0) -> + Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0), ok = snabbkaffe:start_trace(), - [{nodes, Nodes}, {specs, NodeSpecs} | Config]. + Config. -end_per_testcase(_TCName, Config) -> +end_per_testcase(TCName, Config) -> ok = snabbkaffe:stop(), - ok = emqx_cth_cluster:stop(?config(nodes, Config)). + emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).