feat(dsrepl): implement membership changes and rebalancing

This commit is contained in:
Andrew Mayorov 2024-04-05 17:39:17 +02:00
parent d6058b7f51
commit 556ffc78c9
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 561 additions and 82 deletions

View File

@ -21,6 +21,7 @@
%% Static server configuration
-export([
shard_servers/2,
shard_server/3,
local_server/2
]).
@ -30,6 +31,14 @@
server/3
]).
%% Membership
-export([
add_local_server/2,
drop_local_server/2,
remove_server/3,
server_info/2
]).
-behaviour(gen_server).
-export([
init/1,
@ -38,21 +47,31 @@
terminate/2
]).
-type server() :: ra:server_id().
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
%%
start_link(DB, Shard, Opts) ->
gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
-spec shard_servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [server()].
shard_servers(DB, Shard) ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
[
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
|| Site <- ReplicaSet
].
[shard_server(DB, Shard, Site) || Site <- ReplicaSet].
-spec shard_server(
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_replication_layer_meta:site()
) -> server().
shard_server(DB, Shard, Site) ->
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}.
-spec local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> server().
local_server(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
{server_name(DB, Shard, Site), node()}.
{server_name(DB, Shard, local_site()), node()}.
cluster_name(DB, Shard) ->
iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
@ -61,6 +80,14 @@ server_name(DB, Shard, Site) ->
DBBin = atom_to_binary(DB),
binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>).
server_uid(_DB, Shard) ->
%% NOTE
%% Each new "instance" of a server should have a unique identifier. Otherwise,
%% if some server migrates to another node during rebalancing, and then comes
%% back, `ra` will be very confused by it having the same UID as before.
Ts = integer_to_binary(erlang:system_time(microsecond)),
<<Shard/binary, "_", Ts/binary>>.
%%
servers(DB, Shard, _Order = leader_preferred) ->
@ -118,11 +145,100 @@ get_local_server(DB, Shard) ->
get_shard_servers(DB, Shard) ->
maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)).
local_site() ->
emqx_ds_replication_layer_meta:this_site().
%%
add_local_server(DB, Shard) ->
%% NOTE
%% Adding local server as "promotable" member to the cluster, which means
%% that it will affect quorum until it is promoted to a voter, which in
%% turn happens when the server has caught up sufficiently with the log.
%% We also rely on this "membership" to understand when the server's
%% readiness.
ShardServers = shard_servers(DB, Shard),
LocalServer = local_server(DB, Shard),
ServerRecord = #{
id => LocalServer,
membership => promotable,
uid => server_uid(DB, Shard)
},
case ra:add_member(ShardServers, ServerRecord, ?MEMBERSHIP_CHANGE_TIMEOUT) of
{ok, _, _Leader} ->
ok;
{error, already_member} ->
ok;
{error, Reason} ->
{error, recoverable, Reason}
end.
drop_local_server(DB, Shard) ->
LocalServer = local_server(DB, Shard),
case remove_server(DB, Shard, LocalServer) of
ok ->
ra:force_delete_server(DB, LocalServer);
{error, _, _Reason} = Error ->
Error
end.
remove_server(DB, Shard, Server) ->
ShardServers = shard_servers(DB, Shard),
case ra:remove_member(ShardServers, Server, ?MEMBERSHIP_CHANGE_TIMEOUT) of
{ok, _, _Leader} ->
ok;
{error, not_member} ->
ok;
{error, Reason} ->
{error, recoverable, Reason}
end.
server_info(readiness, Server) ->
%% NOTE
%% Server is ready if it's either the leader or a follower with voter "membership"
%% status (meaning it was promoted after catching up with the log).
case current_leader(Server) of
Server ->
ready;
Leader when Leader /= unknown ->
member_info(readiness, Server, Leader);
unknown ->
unknown
end;
server_info(leader, Server) ->
current_leader(Server).
member_info(readiness, Server, Leader) ->
case ra:member_overview(Leader) of
{ok, #{cluster := Cluster}, _} ->
member_readiness(maps:get(Server, Cluster));
_Error ->
unknown
end.
current_leader(Server) ->
case ra:members(Server) of
{ok, _Servers, Leader} ->
Leader;
_Error ->
unknown
end.
member_readiness(#{status := Status, voter_status := #{membership := Membership}}) ->
case Status of
normal when Membership =:= voter ->
ready;
_Other ->
{unready, Status, Membership}
end;
member_readiness(#{}) ->
unknown.
%%
init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true),
_Meta = start_shard(DB, Shard, Opts),
ok = start_shard(DB, Shard, Opts),
{ok, {DB, Shard}}.
handle_call(_Call, _From, State) ->
@ -138,7 +254,6 @@ terminate(_Reason, {DB, Shard}) ->
%%
start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
Site = emqx_ds_replication_layer_meta:this_site(),
ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard),
@ -157,7 +272,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
),
ok = ra:start_server(DB, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
uid => server_uid(DB, Shard),
cluster_name => ClusterName,
initial_members => Servers,
machine => Machine,
@ -188,12 +303,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
end;
_ ->
ok
end,
#{
cluster_name => ClusterName,
servers => Servers,
local_server => LocalServer
}.
end.
%%

View File

@ -16,6 +16,8 @@
-module(emqx_ds_replication_shard_allocator).
-include_lib("snabbkaffe/include/trace.hrl").
-export([start_link/1]).
-export([n_shards/1]).
@ -30,9 +32,23 @@
terminate/2
]).
-export([handle_transition/4]).
-define(db_meta(DB), {?MODULE, DB}).
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
-define(TRANS_RETRY_TIMEOUT, 5_000).
-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
-ifdef(TEST).
-undef(TRANS_RETRY_TIMEOUT).
-undef(REMOVE_REPLICA_DELAY).
-define(TRANS_RETRY_TIMEOUT, 1_000).
-define(REMOVE_REPLICA_DELAY, {4_000, 2_000}).
-endif.
%%
start_link(DB) ->
@ -47,13 +63,11 @@ shard_meta(DB, Shard) ->
%%
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
init(DB) ->
_ = erlang:process_flag(trap_exit, true),
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
State = #{db => DB, status => allocating},
handle_allocate_shards(State, ok).
_ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}),
State = #{db => DB, transitions => #{}, status => allocating},
{ok, handle_allocate_shards(State)}.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
@ -61,12 +75,19 @@ handle_call(_Call, _From, State) ->
handle_cast(_Cast, State) ->
{noreply, State}.
handle_info(timeout, State) ->
handle_allocate_shards(State, noreply);
handle_info({timeout, _TRef, allocate}, State) ->
{noreply, handle_allocate_shards(State)};
handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) ->
{noreply, handle_shard_changed(Shard, State)};
handle_info({changed, _}, State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
{noreply, handle_exit(Pid, Reason, State)};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #{db := DB, shards := Shards}) ->
terminate(_Reason, State = #{db := DB, shards := Shards}) ->
unsubscribe_db_changes(State),
erase_db_meta(DB),
erase_shards_meta(DB, Shards);
terminate(_Reason, #{}) ->
@ -74,10 +95,11 @@ terminate(_Reason, #{}) ->
%%
handle_allocate_shards(State, Ret) ->
handle_allocate_shards(State) ->
case allocate_shards(State) of
{ok, NState} ->
{Ret, NState};
ok = subscribe_db_changes(State),
NState;
{error, Data} ->
_ = logger:notice(
Data#{
@ -85,15 +107,197 @@ handle_allocate_shards(State, Ret) ->
retry_in => ?ALLOCATE_RETRY_TIMEOUT
}
),
{Ret, State, ?ALLOCATE_RETRY_TIMEOUT}
_TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate),
State
end.
subscribe_db_changes(#{db := DB}) ->
emqx_ds_replication_layer_meta:subscribe(self(), DB).
unsubscribe_db_changes(_State) ->
emqx_ds_replication_layer_meta:unsubscribe(self()).
%%
handle_shard_changed(Shard, State = #{db := DB}) ->
ok = save_shard_meta(DB, Shard),
Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard),
handle_shard_transitions(Shard, Transitions, State).
handle_shard_transitions(Shard, Transitions, State = #{db := DB}) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(),
case Transitions of
[] ->
%% We reached the target allocation.
State;
[Trans = {add, ThisSite} | _Rest] ->
ensure_transition_handler(Shard, Trans, fun trans_add_local/3, State);
[Trans = {del, ThisSite} | _Rest] ->
ensure_transition_handler(Shard, Trans, fun trans_drop_local/3, State);
[Trans = {del, Site} | _Rest] ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
case lists:member(Site, ReplicaSet) of
true ->
%% NOTE
%% Putting this transition handler on separate "track" so that it
%% won't block any changes with higher priority (e.g. managing
%% local replicas).
Handler = fun trans_rm_unresponsive/3,
ensure_transition_handler(unresp, Shard, Trans, Handler, State);
false ->
State
end;
[_Trans | _Rest] ->
%% This site is not involved in the next queued transition.
State
end.
handle_transition(DB, Shard, Trans, Fun) ->
logger:set_process_metadata(#{
db => DB,
shard => Shard,
domain => [emqx, ds, DB, shard_transition]
}),
?tp(
dsrepl_shard_transition_begin,
#{shard => Shard, db => DB, transition => Trans, pid => self()}
),
erlang:apply(Fun, [DB, Shard, Trans]).
trans_add_local(DB, Shard, {add, Site}) ->
logger:info(#{msg => "Adding new local shard replica", site => Site}),
do_add_local(membership, DB, Shard).
do_add_local(membership = Stage, DB, Shard) ->
ok = start_shard(DB, Shard),
case emqx_ds_replication_layer_shard:add_local_server(DB, Shard) of
ok ->
do_add_local(readiness, DB, Shard);
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_add_local(Stage, DB, Shard)
end;
do_add_local(readiness = Stage, DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard),
case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of
ready ->
logger:info(#{msg => "Local shard replica ready"});
Status ->
logger:warning(#{
msg => "Still waiting for local shard replica to be ready",
status => Status,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_add_local(Stage, DB, Shard)
end.
trans_drop_local(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Dropping local shard replica", site => Site}),
do_drop_local(DB, Shard).
do_drop_local(DB, Shard) ->
case emqx_ds_replication_layer_shard:drop_local_server(DB, Shard) of
ok ->
ok = emqx_ds_builtin_db_sup:stop_shard({DB, Shard}),
ok = emqx_ds_storage_layer:drop_shard({DB, Shard}),
logger:info(#{msg => "Local shard replica dropped"});
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_drop_local(DB, Shard)
end.
trans_rm_unresponsive(DB, Shard, Trans = {del, Site}) ->
%% NOTE
%% Let the replica handle its own removal first, thus the delay.
ok = delay(?REMOVE_REPLICA_DELAY),
Transitions = emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard),
case Transitions of
[Trans | _] ->
logger:info(#{msg => "Removing unresponsive shard replica", site => Site}),
do_rm_unresponsive(DB, Shard, Site);
_Outdated ->
exit({shutdown, skipped})
end.
do_rm_unresponsive(DB, Shard, Site) ->
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of
ok ->
logger:info(#{msg => "Unresponsive shard replica removed"});
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_rm_unresponsive(DB, Shard, Site)
end.
%%
ensure_transition_handler(Shard, Trans, Handler, State) ->
ensure_transition_handler(Shard, Shard, Trans, Handler, State).
ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
case maps:get(Track, Ts, undefined) of
undefined ->
Pid = start_transition_handler(Shard, Trans, Handler, State),
State#{transitions := Ts#{Track => {Shard, Trans, Pid}}};
_AlreadyRunning ->
%% NOTE: Avoiding multiple transition handlers for the same shard for safety.
State
end.
start_transition_handler(Shard, Trans, Handler, #{db := DB}) ->
proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]).
handle_exit(Pid, Reason, State = #{db := DB, transitions := Ts}) ->
case maps:to_list(maps:filter(fun(_, {_S, _T, P}) -> P == Pid end, Ts)) of
[{Track, {Shard, Trans, Pid}}] ->
?tp(
dsrepl_shard_transition_end,
#{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason}
),
ok = handle_transition_exit(Shard, Trans, Reason, State),
State#{transitions := maps:remove(Track, Ts)};
[] ->
logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}),
State
end.
handle_transition_exit(Shard, Trans, normal, _State = #{db := DB}) ->
%% NOTE: This will trigger the next transition if any.
ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans);
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, _State) ->
ok;
handle_transition_exit(Shard, Trans, Reason, _State) ->
logger:warning(#{
msg => "Shard membership transition failed",
shard => Shard,
transition => Trans,
reason => Reason
}),
%% FIXME: retry
ok.
%%
allocate_shards(State = #{db := DB}) ->
case emqx_ds_replication_layer_meta:allocate_shards(DB) of
{ok, Shards} ->
logger:notice(#{msg => "Shards allocated", shards => Shards}),
logger:info(#{msg => "Shards allocated", shards => Shards}),
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
ok = start_egresses(DB, Shards),
ok = save_db_meta(DB, Shards),
@ -104,25 +308,23 @@ allocate_shards(State = #{db := DB}) ->
end.
start_shards(DB, Shards) ->
ok = lists:foreach(
fun(Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard})
end,
Shards
),
ok = logger:info(#{msg => "Shards started", shards => Shards}),
lists:foreach(fun(Shard) -> start_shard(DB, Shard) end, Shards).
start_shard(DB, Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}),
ok = logger:info(#{msg => "Shard started", shard => Shard}),
ok.
start_egresses(DB, Shards) ->
ok = lists:foreach(
fun(Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard})
end,
Shards
),
logger:info(#{msg => "Egresses started", shards => Shards}),
lists:foreach(fun(Shard) -> start_egress(DB, Shard) end, Shards).
start_egress(DB, Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}),
ok = logger:info(#{msg => "Egress started", shard => Shard}),
ok.
%%
save_db_meta(DB, Shards) ->
persistent_term:put(?db_meta(DB), #{
shards => Shards,
@ -146,3 +348,8 @@ erase_shards_meta(DB, Shards) ->
erase_shard_meta(DB, Shard) ->
persistent_term:erase(?shard_meta(DB, Shard)).
%%
delay({MinDelay, Variance}) ->
timer:sleep(MinDelay + rand:uniform(Variance)).

View File

@ -26,18 +26,45 @@
-define(DB, testdb).
opts() ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 1,
n_sites => 3,
replication_factor => 3,
replication_options => #{
wal_max_size_bytes => 128 * 1024,
wal_max_batch_size => 1024,
snapshot_interval => 128
}
}.
opts(#{}).
opts(Overrides) ->
maps:merge(
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 16,
n_sites => 1,
replication_factor => 3,
replication_options => #{
wal_max_size_bytes => 64 * 1024,
wal_max_batch_size => 1024,
snapshot_interval => 128
}
},
Overrides
).
appspec(emqx_durable_storage) ->
{emqx_durable_storage, #{
before_start => fun snabbkaffe:fix_ct_logging/0,
override_env => [{egress_flush_interval, 1}]
}}.
t_replication_transfers_snapshots(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[
{t_replication_transfers_snapshots1, #{apps => Apps}},
{t_replication_transfers_snapshots2, #{apps => Apps}},
{t_replication_transfers_snapshots3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
[{nodes, Nodes}, {specs, NodeSpecs} | Config];
t_replication_transfers_snapshots('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_replication_transfers_snapshots(Config) ->
NMsgs = 4000,
@ -45,9 +72,10 @@ t_replication_transfers_snapshots(Config) ->
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
%% Initialize DB on all nodes and wait for it to be online.
Opts = opts(#{n_shards => 1, n_sites => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()])
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry(
500,
@ -88,7 +116,7 @@ t_replication_transfers_snapshots(Config) ->
Shard = hd(shards(NodeOffline, ?DB)),
MessagesOffline = lists:keysort(
#message.timestamp,
consume(NodeOffline, ?DB, Shard, ['#'], 0)
consume_shard(NodeOffline, ?DB, Shard, ['#'], 0)
),
?assertEqual(
sample(40, Messages),
@ -99,26 +127,169 @@ t_replication_transfers_snapshots(Config) ->
MessagesOffline
).
t_replication_rebalance(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_replication_rebalance1, #{apps => Apps}},
{t_replication_rebalance2, #{apps => Apps}},
{t_replication_rebalance3, #{apps => Apps}},
{t_replication_rebalance4, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_replication_rebalance('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_replication_rebalance(Config) ->
NMsgs = 800,
NClients = 5,
Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
%% Initialize DB on the first node.
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
?assertMatch(
Shards when length(Shards) == 16,
shards_online(N1, ?DB)
),
%% Open DB on the rest of the nodes.
?assertEqual(
[{ok, ok} || _ <- [N2, N3, N4]],
erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts])
),
Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]),
%% Only N1 should be responsible for all shards initially.
?assertEqual(
[[S1] || _ <- Nodes],
[ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes]
),
%% Fill the storage with messages and few additional generations.
%% This will force shards to trigger snapshot transfers during rebalance.
ClientMessages = emqx_utils:pmap(
fun(CID) ->
N = lists:nth(1 + (CID rem length(Nodes)), Nodes),
fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)})
end,
lists:seq(1, NClients),
infinity
),
Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)),
%% Join the second site to the DB replication sites.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])),
ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]),
%% Fill in some more messages *during* the rebalance.
MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
%% Now join the rest of the sites.
?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
%% Fill in some more messages *during* the rebalance.
MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
%% Verify that each node is now responsible for 3/4 of the shards.
?assertEqual(
[(16 * 3) div length(Nodes) || _ <- Nodes],
[n_shards_online(N, ?DB) || N <- Nodes]
),
%% Verify that the set of shard servers matches the target allocation.
Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
ShardServers = [
shard_server_info(N, ?DB, Shard, Site, readiness)
|| {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
Shard <- Shards
],
?assert(
lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
ShardServers
),
%% Verify that the messages are preserved after the rebalance.
Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2,
MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesN4)),
?assertEqual(Messages, MessagesN4),
%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
%% Verify that each node is now responsible for each shard.
?assertEqual(
[0, 16, 16, 16],
[n_shards_online(N, ?DB) || N <- Nodes]
),
%% Verify that the messages are once again preserved after the rebalance.
MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesN3)),
?assertEqual(Messages, MessagesN3).
%%
shard_server_info(Node, DB, Shard, Site, Info) ->
Server = shard_server(Node, DB, Shard, Site),
{Server, ds_repl_shard(Node, server_info, [Info, Server])}.
shard_server(Node, DB, Shard, Site) ->
ds_repl_shard(Node, shard_server, [DB, Shard, Site]).
ds_repl_meta(Node, Fun) ->
ds_repl_meta(Node, Fun, []).
ds_repl_meta(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args).
ds_repl_shard(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
transitions(Node, DB) ->
Shards = shards(Node, DB),
[{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])].
shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
shards_online(Node, DB) ->
erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
n_shards_online(Node, DB) ->
length(shards_online(Node, DB)).
fill_storage(Node, DB, NMsgs, Opts) ->
fill_storage(Node, DB, NMsgs, 0, Opts).
fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs ->
R1 = push_message(Node, DB, I),
fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs ->
PAddGen = maps:get(p_addgen, Opts, 0.001),
R1 = push_message(Node, DB, I, Opts),
R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
[].
push_message(Node, DB, I) ->
push_message(Node, DB, I, Opts) ->
Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
{Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
Message = message(Topic, Bytes, I * 100),
ClientId = maps:get(client_id, Opts, <<?MODULE_STRING>>),
Message = message(ClientId, Topic, Bytes, I * 100),
ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
[Message].
@ -126,16 +297,22 @@ add_generation(Node, DB) ->
ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
[].
message(Topic, Payload, PublishedAt) ->
message(ClientId, Topic, Payload, PublishedAt) ->
#message{
from = <<?MODULE_STRING>>,
from = ClientId,
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
consume(Node, DB, Shard, TopicFilter, StartTime) ->
compare_message(M1, M2) ->
{M1#message.from, M1#message.timestamp} < {M2#message.from, M2#message.timestamp}.
consume(Node, DB, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_test_helpers, consume, [DB, TopicFilter, StartTime]).
consume_shard(Node, DB, Shard, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]).
probably(P, Fun) ->
@ -156,26 +333,11 @@ suite() -> [{timetrap, {seconds, 60}}].
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TCName, Config) ->
Apps = [
{emqx_durable_storage, #{
before_start => fun snabbkaffe:fix_ct_logging/0,
override_env => [{egress_flush_interval, 1}]
}}
],
WorkDir = emqx_cth_suite:work_dir(TCName, Config),
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[
{emqx_ds_replication_SUITE1, #{apps => Apps}},
{emqx_ds_replication_SUITE2, #{apps => Apps}},
{emqx_ds_replication_SUITE3, #{apps => Apps}}
],
#{work_dir => WorkDir}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
init_per_testcase(TCName, Config0) ->
Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0),
ok = snabbkaffe:start_trace(),
[{nodes, Nodes}, {specs, NodeSpecs} | Config].
Config.
end_per_testcase(_TCName, Config) ->
end_per_testcase(TCName, Config) ->
ok = snabbkaffe:stop(),
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).