Merge pull request #12839 from keynslug/feat/EMQX-12110/rebalancing-impl

feat(dsrepl): implement membership changes and rebalancing
This commit is contained in:
ieQu1 2024-04-09 10:46:31 +02:00 committed by GitHub
commit 8bbfa9ca8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1059 additions and 96 deletions

View File

@ -74,7 +74,7 @@ start_egress({DB, Shard}) ->
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
stop_shard(Shard = {DB, _}) ->
stop_shard({DB, Shard}) ->
Sup = ?via(#?shards_sup{db = DB}),
ok = supervisor:terminate_child(Sup, Shard),
ok = supervisor:delete_child(Sup, Shard).
@ -212,7 +212,7 @@ sup_spec(Id, Options) ->
shard_spec(DB, Shard) ->
#{
id => {shard, Shard},
id => Shard,
start => {?MODULE, start_link_sup, [#?shard_sup{db = DB, shard = Shard}, []]},
shutdown => infinity,
restart => permanent,

View File

@ -57,6 +57,12 @@
target_set/2
]).
%% Subscriptions to changes:
-export([
subscribe/2,
unsubscribe/1
]).
%% gen_server
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -73,7 +79,12 @@
n_shards/1
]).
-export_type([site/0, update_cluster_result/0]).
-export_type([
site/0,
transition/0,
subscription_event/0,
update_cluster_result/0
]).
-include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
@ -123,8 +134,16 @@
ok
| {error, {nonexistent_db, emqx_ds:db()}}
| {error, {nonexistent_sites, [site()]}}
| {error, {too_few_sites, [site()]}}
| {error, _}.
%% Subject of the subscription:
-type subject() :: emqx_ds:db().
%% Event for the subscription:
-type subscription_event() ::
{changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}.
%% Peristent term key:
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
@ -336,11 +355,21 @@ target_set(DB, Shard) ->
undefined
end.
%%================================================================================
subscribe(Pid, Subject) ->
gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity).
unsubscribe(Pid) ->
gen_server:call(?SERVER, {unsubscribe, Pid}, infinity).
%%================================================================================
%% behavior callbacks
%%================================================================================
-record(s, {}).
-record(s, {
subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}}
}).
init([]) ->
process_flag(trap_exit, true),
@ -348,14 +377,24 @@ init([]) ->
ensure_tables(),
ensure_site(),
S = #s{},
{ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}),
{ok, S}.
handle_call({subscribe, Pid, Subject}, _From, S) ->
{reply, ok, handle_subscribe(Pid, Subject, S)};
handle_call({unsubscribe, Pid}, _From, S) ->
{reply, ok, handle_unsubscribe(Pid, S)};
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) ->
ok = notify_subscribers(DB, {shard, DB, Shard}, S),
{noreply, S};
handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
{noreply, handle_unsubscribe(Pid, S)};
handle_info(_Info, S) ->
{noreply, S}.
@ -414,6 +453,8 @@ allocate_shards_trans(DB) ->
assign_db_sites_trans(DB, Sites) ->
Opts = db_config_trans(DB),
case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
[] when length(Sites) == 0 ->
mnesia:abort({too_few_sites, Sites});
[] ->
ok;
NonexistentSites ->
@ -440,15 +481,19 @@ modify_db_sites_trans(DB, Modifications) ->
case Sites of
Sites0 ->
ok;
_Chagned ->
_Changed ->
assign_db_sites_trans(DB, Sites)
end.
update_replica_set_trans(DB, Shard, Trans) ->
case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
[Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
%% NOTE
%% It's possible to complete a transition that's no longer planned. We
%% should anticipate that we may stray _away_ from the target set.
TargetSet1 = emqx_maybe:define(TargetSet0, ReplicaSet0),
ReplicaSet = apply_transition(Trans, ReplicaSet0),
case lists:usort(TargetSet0) of
case lists:usort(TargetSet1) of
ReplicaSet ->
TargetSet = undefined;
TS ->
@ -613,6 +658,38 @@ transaction(Fun, Args) ->
{error, Reason}
end.
%%====================================================================
handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) ->
case maps:is_key(Pid, Subs0) of
false ->
MRef = erlang:monitor(process, Pid),
Subs = Subs0#{Pid => {Subject, MRef}},
S#s{subs = Subs};
true ->
S
end.
handle_unsubscribe(Pid, S = #s{subs = Subs0}) ->
case maps:take(Pid, Subs0) of
{{_Subject, MRef}, Subs} ->
_ = erlang:demonitor(MRef, [flush]),
S#s{subs = Subs};
error ->
S
end.
notify_subscribers(EventSubject, Event, #s{subs = Subs}) ->
maps:foreach(
fun(Pid, {Subject, _MRef}) ->
Subject == EventSubject andalso
erlang:send(Pid, {changed, Event})
end,
Subs
).
%%====================================================================
%% @doc Intersperse elements of two lists.
%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
-spec intersperse([X], [Y]) -> [X | Y].

View File

@ -21,6 +21,7 @@
%% Static server configuration
-export([
shard_servers/2,
shard_server/3,
local_server/2
]).
@ -30,6 +31,14 @@
server/3
]).
%% Membership
-export([
add_local_server/2,
drop_local_server/2,
remove_server/3,
server_info/2
]).
-behaviour(gen_server).
-export([
init/1,
@ -38,21 +47,31 @@
terminate/2
]).
-type server() :: ra:server_id().
-define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000).
%%
start_link(DB, Shard, Opts) ->
gen_server:start_link(?MODULE, {DB, Shard, Opts}, []).
-spec shard_servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> [server()].
shard_servers(DB, Shard) ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
[
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
|| Site <- ReplicaSet
].
[shard_server(DB, Shard, Site) || Site <- ReplicaSet].
-spec shard_server(
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_replication_layer_meta:site()
) -> server().
shard_server(DB, Shard, Site) ->
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}.
-spec local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> server().
local_server(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
{server_name(DB, Shard, Site), node()}.
{server_name(DB, Shard, local_site()), node()}.
cluster_name(DB, Shard) ->
iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
@ -79,13 +98,11 @@ get_servers_leader_preferred(DB, Shard) ->
Servers = ra_leaderboard:lookup_members(ClusterName),
[Leader | lists:delete(Leader, Servers)];
undefined ->
%% TODO: Dynamic membership.
get_shard_servers(DB, Shard)
end.
get_server_local_preferred(DB, Shard) ->
%% NOTE: Contact random replica that is not a known leader.
%% TODO: Replica may be down, so we may need to retry.
%% NOTE: Contact either local server or a random replica.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
@ -94,15 +111,21 @@ get_server_local_preferred(DB, Shard) ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
%% TODO: Dynamic membership.
pick_random(get_shard_servers(DB, Shard))
end.
lookup_leader(DB, Shard) ->
%% NOTE
%% Does not block, but the result may be outdated or even unknown when there's
%% no servers on the local node.
ClusterName = get_cluster_name(DB, Shard),
ra_leaderboard:lookup_leader(ClusterName).
pick_local(Servers) ->
case lists:dropwhile(fun({_Name, Node}) -> Node =/= node() end, Servers) of
[Local | _] ->
case lists:keyfind(node(), 2, Servers) of
Local when is_tuple(Local) ->
Local;
[] ->
false ->
pick_random(Servers)
end.
@ -118,11 +141,166 @@ get_local_server(DB, Shard) ->
get_shard_servers(DB, Shard) ->
maps:get(servers, emqx_ds_replication_shard_allocator:shard_meta(DB, Shard)).
local_site() ->
emqx_ds_replication_layer_meta:this_site().
%%
%% @doc Add a local server to the shard cluster.
%% It's recommended to have the local server running before calling this function.
%% This function is idempotent.
-spec add_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
ok | emqx_ds:error(_Reason).
add_local_server(DB, Shard) ->
%% NOTE
%% Adding local server as "promotable" member to the cluster, which means
%% that it will affect quorum until it is promoted to a voter, which in
%% turn happens when the server has caught up sufficiently with the log.
%% We also rely on this "membership" to understand when the server's
%% readiness.
ShardServers = shard_servers(DB, Shard),
LocalServer = local_server(DB, Shard),
case server_info(uid, LocalServer) of
UID when is_binary(UID) ->
ServerRecord = #{
id => LocalServer,
membership => promotable,
uid => UID
};
unknown ->
ServerRecord = #{
id => LocalServer,
membership => voter
}
end,
Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT,
case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of
{ok, _, _Leader} ->
ok;
{error, already_member} ->
ok;
Error ->
{error, recoverable, Error}
end.
%% @doc Remove a local server from the shard cluster and clean up on-disk data.
%% It's required to have the local server running before calling this function.
%% This function is idempotent.
-spec drop_local_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
ok | emqx_ds:error(_Reason).
drop_local_server(DB, Shard) ->
ShardServers = shard_servers(DB, Shard),
LocalServer = local_server(DB, Shard),
case lookup_leader(DB, Shard) of
LocalServer ->
%% NOTE
%% Trigger leadership transfer *and* force to wait until the new leader
%% is elected and updated in the leaderboard. This should help to avoid
%% edge cases where entries appended right before removal are duplicated
%% due to client retries.
%% Timeouts are ignored, it's a best effort attempt.
[Candidate | _] = lists:delete(LocalServer, ShardServers),
_ = ra:transfer_leadership(LocalServer, Candidate),
_ = wait_until(fun() -> lookup_leader(DB, Shard) == Candidate end);
_Another ->
ok
end,
case remove_server(DB, Shard, LocalServer) of
ok ->
ra:force_delete_server(DB, LocalServer);
{error, _, _Reason} = Error ->
Error
end.
%% @doc Remove a (remote) server from the shard cluster.
%% The server might not be running when calling this function, e.g. the node
%% might be offline. Because of this, on-disk data will not be cleaned up.
%% This function is idempotent.
-spec remove_server(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), server()) ->
ok | emqx_ds:error(_Reason).
remove_server(DB, Shard, Server) ->
ShardServers = shard_servers(DB, Shard),
Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT,
case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of
{ok, _, _Leader} ->
ok;
{error, not_member} ->
ok;
Error ->
{error, recoverable, Error}
end.
-spec server_info
(readiness, server()) -> ready | {unready, _Status, _Membership} | unknown;
(leader, server()) -> server() | unknown;
(uid, server()) -> _UID :: binary() | unknown.
server_info(readiness, Server) ->
%% NOTE
%% Server is ready if it's either the leader or a follower with voter "membership"
%% status (meaning it was promoted after catching up with the log).
case current_leader(Server) of
Server ->
ready;
Leader when Leader /= unknown ->
member_info(readiness, Server, Leader);
unknown ->
unknown
end;
server_info(leader, Server) ->
current_leader(Server);
server_info(uid, Server) ->
maps:get(uid, ra_overview(Server), unknown).
member_info(readiness, Server, Leader) ->
Cluster = maps:get(cluster, ra_overview(Leader), #{}),
member_readiness(maps:get(Server, Cluster, #{})).
current_leader(Server) ->
%% NOTE: This call will block until the leader is known, or until the timeout.
case ra:members(Server) of
{ok, _Servers, Leader} ->
Leader;
_Error ->
unknown
end.
member_readiness(#{status := Status, voter_status := #{membership := Membership}}) ->
case Status of
normal when Membership =:= voter ->
ready;
_Other ->
{unready, Status, Membership}
end;
member_readiness(#{}) ->
unknown.
%%
ra_try_servers([Server | Rest], Fun, Args) ->
case erlang:apply(Fun, [Server | Args]) of
{ok, R, Leader} ->
{ok, R, Leader};
{error, Reason} when Reason == noproc; Reason == nodedown ->
ra_try_servers(Rest, Fun, Args);
ErrorOrTimeout ->
ErrorOrTimeout
end;
ra_try_servers([], _Fun, _Args) ->
{error, servers_unreachable}.
ra_overview(Server) ->
case ra:member_overview(Server) of
{ok, Overview, _Leader} ->
Overview;
_Error ->
#{}
end.
%%
init({DB, Shard, Opts}) ->
_ = process_flag(trap_exit, true),
_Meta = start_shard(DB, Shard, Opts),
ok = start_shard(DB, Shard, Opts),
{ok, {DB, Shard}}.
handle_call(_Call, _From, State) ->
@ -138,7 +316,6 @@ terminate(_Reason, {DB, Shard}) ->
%%
start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
Site = emqx_ds_replication_layer_meta:this_site(),
ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard),
@ -157,7 +334,7 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
),
ok = ra:start_server(DB, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
uid => server_uid(DB, Shard),
cluster_name => ClusterName,
initial_members => Servers,
machine => Machine,
@ -188,12 +365,17 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
end;
_ ->
ok
end,
#{
cluster_name => ClusterName,
servers => Servers,
local_server => LocalServer
}.
end.
server_uid(_DB, Shard) ->
%% NOTE
%% Each new "instance" of a server should have a unique identifier. Otherwise,
%% if some server migrates to another node during rebalancing, and then comes
%% back, `ra` will be very confused by it having the same UID as before.
%% Keeping the shard ID as a prefix to make it easier to identify the server
%% in the filesystem / logs / etc.
Ts = integer_to_binary(erlang:system_time(microsecond)),
<<Shard/binary, "_", Ts/binary>>.
%%
@ -207,3 +389,24 @@ memoize(Fun, Args) ->
Result ->
Result
end.
wait_until(Fun) ->
wait_until(Fun, 5_000, 250).
wait_until(Fun, Timeout, Sleep) ->
Deadline = erlang:monotonic_time(millisecond) + Timeout,
loop_until(Fun, Deadline, Sleep).
loop_until(Fun, Deadline, Sleep) ->
case Fun() of
true ->
ok;
false ->
case erlang:monotonic_time(millisecond) of
Now when Now < Deadline ->
timer:sleep(Sleep),
loop_until(Fun, Deadline, Sleep);
_ ->
timeout
end
end.

View File

@ -16,11 +16,16 @@
-module(emqx_ds_replication_shard_allocator).
-include_lib("snabbkaffe/include/trace.hrl").
-export([start_link/1]).
-export([n_shards/1]).
-export([shard_meta/2]).
%% Maintenace purposes:
-export([trigger_transitions/1]).
-behaviour(gen_server).
-export([
init/1,
@ -30,14 +35,37 @@
terminate/2
]).
-export([handle_transition/4]).
-define(db_meta(DB), {?MODULE, DB}).
-define(shard_meta(DB, SHARD), {?MODULE, DB, SHARD}).
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
-define(TRANS_RETRY_TIMEOUT, 5_000).
-define(CRASH_RETRY_DELAY, 20_000).
-define(REMOVE_REPLICA_DELAY, {10_000, 5_000}).
-ifdef(TEST).
-undef(TRANS_RETRY_TIMEOUT).
-undef(REMOVE_REPLICA_DELAY).
-define(TRANS_RETRY_TIMEOUT, 1_000).
-define(REMOVE_REPLICA_DELAY, {3_000, 2_000}).
-endif.
%%
-record(trigger_transitions, {}).
-spec start_link(emqx_ds:db()) -> {ok, pid()}.
start_link(DB) ->
gen_server:start_link(?MODULE, DB, []).
-spec trigger_transitions(pid()) -> ok.
trigger_transitions(Pid) ->
gen_server:cast(Pid, #trigger_transitions{}).
-spec n_shards(emqx_ds:db()) -> non_neg_integer().
n_shards(DB) ->
Meta = persistent_term:get(?db_meta(DB)),
maps:get(n_shards, Meta).
@ -47,26 +75,60 @@ shard_meta(DB, Shard) ->
%%
-define(ALLOCATE_RETRY_TIMEOUT, 1_000).
-record(transhdl, {
shard :: emqx_ds_replication_layer:shard_id(),
trans :: emqx_ds_replication_layer_meta:transition(),
pid :: pid()
}).
-type state() :: #{
db := emqx_ds:db(),
shards := [emqx_ds_replication_layer:shard_id()],
status := allocating | ready,
transitions := #{_Track => #transhdl{}}
}.
-spec init(emqx_ds:db()) -> {ok, state()}.
init(DB) ->
_ = erlang:process_flag(trap_exit, true),
_ = logger:set_process_metadata(#{db => DB, domain => [ds, db, shard_allocator]}),
State = #{db => DB, status => allocating},
handle_allocate_shards(State, ok).
_ = logger:set_process_metadata(#{db => DB, domain => [emqx, ds, DB, shard_allocator]}),
State = #{
db => DB,
shards => [],
status => allocating,
transitions => #{}
},
{ok, handle_allocate_shards(State)}.
-spec handle_call(_Call, _From, state()) -> {reply, ignored, state()}.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
-spec handle_cast(_Cast, state()) -> {noreply, state()}.
handle_cast(#trigger_transitions{}, State) ->
{noreply, handle_pending_transitions(State)};
handle_cast(_Cast, State) ->
{noreply, State}.
handle_info(timeout, State) ->
handle_allocate_shards(State, noreply);
-spec handle_info(Info, state()) -> {noreply, state()} when
Info ::
emqx_ds_replication_layer_meta:subscription_event()
| {timeout, reference(), allocate}
| {'EXIT', pid(), _Reason}.
handle_info({timeout, _TRef, allocate}, State) ->
{noreply, handle_allocate_shards(State)};
handle_info({changed, {shard, DB, Shard}}, State = #{db := DB}) ->
{noreply, handle_shard_changed(Shard, State)};
handle_info({changed, _}, State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
{noreply, handle_exit(Pid, Reason, State)};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #{db := DB, shards := Shards}) ->
-spec terminate(_Reason, state()) -> _Ok.
terminate(_Reason, State = #{db := DB, shards := Shards}) ->
unsubscribe_db_changes(State),
erase_db_meta(DB),
erase_shards_meta(DB, Shards);
terminate(_Reason, #{}) ->
@ -74,10 +136,14 @@ terminate(_Reason, #{}) ->
%%
handle_allocate_shards(State, Ret) ->
case allocate_shards(State) of
{ok, NState} ->
{Ret, NState};
handle_allocate_shards(State0) ->
case allocate_shards(State0) of
{ok, State} ->
%% NOTE
%% Subscribe to shard changes and trigger any yet unhandled transitions.
ok = subscribe_db_changes(State),
ok = trigger_transitions(self()),
State;
{error, Data} ->
_ = logger:notice(
Data#{
@ -85,7 +151,235 @@ handle_allocate_shards(State, Ret) ->
retry_in => ?ALLOCATE_RETRY_TIMEOUT
}
),
{Ret, State, ?ALLOCATE_RETRY_TIMEOUT}
_TRef = erlang:start_timer(?ALLOCATE_RETRY_TIMEOUT, self(), allocate),
State0
end.
subscribe_db_changes(#{db := DB}) ->
emqx_ds_replication_layer_meta:subscribe(self(), DB).
unsubscribe_db_changes(_State) ->
emqx_ds_replication_layer_meta:unsubscribe(self()).
%%
handle_shard_changed(Shard, State = #{db := DB}) ->
ok = save_shard_meta(DB, Shard),
handle_shard_transitions(Shard, next_transitions(DB, Shard), State).
handle_pending_transitions(State = #{db := DB, shards := Shards}) ->
lists:foldl(
fun(Shard, StateAcc) ->
handle_shard_transitions(Shard, next_transitions(DB, Shard), StateAcc)
end,
State,
Shards
).
next_transitions(DB, Shard) ->
emqx_ds_replication_layer_meta:replica_set_transitions(DB, Shard).
handle_shard_transitions(_Shard, [], State) ->
%% We reached the target allocation.
State;
handle_shard_transitions(Shard, [Trans | _Rest], State) ->
case transition_handler(Shard, Trans, State) of
{Track, Handler} ->
ensure_transition_handler(Track, Shard, Trans, Handler, State);
undefined ->
State
end.
transition_handler(Shard, Trans, _State = #{db := DB}) ->
ThisSite = emqx_ds_replication_layer_meta:this_site(),
case Trans of
{add, ThisSite} ->
{Shard, fun trans_add_local/3};
{del, ThisSite} ->
{Shard, fun trans_drop_local/3};
{del, Site} ->
ReplicaSet = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
case lists:member(Site, ReplicaSet) of
true ->
%% NOTE
%% Let the replica handle its own removal first, but still set
%% up a removal handler after a delay, in case the replica is
%% unresponsive.
Handler = {fun trans_delay/5, [
?REMOVE_REPLICA_DELAY,
fun trans_rm_unresponsive/3
]},
%% NOTE
%% Putting this transition handler on separate "track" so that it
%% won't block any changes with higher priority (e.g. managing
%% local replicas).
{{unresp, Shard}, Handler};
false ->
undefined
end;
_NotOurs ->
%% This site is not involved in the next queued transition.
undefined
end.
handle_transition(DB, Shard, Trans, Handler) ->
logger:set_process_metadata(#{
db => DB,
shard => Shard,
domain => [emqx, ds, DB, shard_transition]
}),
?tp(
dsrepl_shard_transition_begin,
#{shard => Shard, db => DB, transition => Trans, pid => self()}
),
apply_handler(Handler, DB, Shard, Trans).
apply_handler({Fun, Args}, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans | Args]);
apply_handler(Fun, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans]).
trans_add_local(DB, Shard, {add, Site}) ->
logger:info(#{msg => "Adding new local shard replica", site => Site}),
do_add_local(membership, DB, Shard).
do_add_local(membership = Stage, DB, Shard) ->
ok = start_shard(DB, Shard),
case emqx_ds_replication_layer_shard:add_local_server(DB, Shard) of
ok ->
do_add_local(readiness, DB, Shard);
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_add_local(Stage, DB, Shard)
end;
do_add_local(readiness = Stage, DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard),
case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of
ready ->
logger:info(#{msg => "Local shard replica ready"});
Status ->
logger:warning(#{
msg => "Still waiting for local shard replica to be ready",
status => Status,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_add_local(Stage, DB, Shard)
end.
trans_drop_local(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Dropping local shard replica", site => Site}),
do_drop_local(DB, Shard).
do_drop_local(DB, Shard) ->
case emqx_ds_replication_layer_shard:drop_local_server(DB, Shard) of
ok ->
ok = emqx_ds_builtin_db_sup:stop_shard({DB, Shard}),
ok = emqx_ds_storage_layer:drop_shard({DB, Shard}),
logger:info(#{msg => "Local shard replica dropped"});
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_drop_local(DB, Shard)
end.
trans_rm_unresponsive(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Removing unresponsive shard replica", site => Site}),
do_rm_unresponsive(DB, Shard, Site).
do_rm_unresponsive(DB, Shard, Site) ->
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of
ok ->
logger:info(#{msg => "Unresponsive shard replica removed"});
{error, recoverable, Reason} ->
logger:warning(#{
msg => "Shard membership change failed",
reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT
}),
ok = timer:sleep(?TRANS_RETRY_TIMEOUT),
do_rm_unresponsive(DB, Shard, Site)
end.
trans_delay(DB, Shard, Trans, Delay, NextHandler) ->
ok = delay(Delay),
%% NOTE: Proceed only if the transition we are going to handle is still desired.
case next_transitions(DB, Shard) of
[Trans | _] ->
apply_handler(NextHandler, DB, Shard, Trans);
_Outdated ->
exit({shutdown, skipped})
end.
%%
ensure_transition_handler(Track, Shard, Trans, Handler, State = #{transitions := Ts}) ->
case maps:get(Track, Ts, undefined) of
undefined ->
Pid = start_transition_handler(Shard, Trans, Handler, State),
Record = #transhdl{shard = Shard, trans = Trans, pid = Pid},
State#{transitions := Ts#{Track => Record}};
_AlreadyRunning ->
%% NOTE: Avoiding multiple transition handlers for the same shard for safety.
State
end.
start_transition_handler(Shard, Trans, Handler, #{db := DB}) ->
proc_lib:spawn_link(?MODULE, handle_transition, [DB, Shard, Trans, Handler]).
handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of
[{Track, #transhdl{shard = Shard, trans = Trans}}] ->
?tp(
dsrepl_shard_transition_end,
#{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason}
),
State = State0#{transitions := maps:remove(Track, Ts)},
handle_transition_exit(Shard, Trans, Reason, State);
[] ->
%% NOTE
%% Actually, it's sort of expected to have a portion of exit signals here,
%% because of `mria:with_middleman/3`. But it's impossible to tell them apart
%% from other singals.
logger:warning(#{msg => "Unexpected exit signal", pid => Pid, reason => Reason}),
State0
end.
handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) ->
%% NOTE: This will trigger the next transition if any.
ok = emqx_ds_replication_layer_meta:update_replica_set(DB, Shard, Trans),
State;
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
State;
handle_transition_exit(Shard, Trans, Reason, State) ->
logger:warning(#{
msg => "Shard membership transition failed",
shard => Shard,
transition => Trans,
reason => Reason,
retry_in => ?CRASH_RETRY_DELAY
}),
%% NOTE
%% In case of `{add, Site}` transition failure, we have no choice but to retry:
%% no other node can perform the transition and make progress towards the desired
%% state.
case Trans of
{add, _ThisSite} ->
{Track, Handler} = transition_handler(Shard, Trans, State),
RetryHandler = {fun trans_delay/5, [?CRASH_RETRY_DELAY, Handler]},
ensure_transition_handler(Track, Shard, Trans, RetryHandler, State);
_Another ->
State
end.
%%
@ -93,7 +387,7 @@ handle_allocate_shards(State, Ret) ->
allocate_shards(State = #{db := DB}) ->
case emqx_ds_replication_layer_meta:allocate_shards(DB) of
{ok, Shards} ->
logger:notice(#{msg => "Shards allocated", shards => Shards}),
logger:info(#{msg => "Shards allocated", shards => Shards}),
ok = start_shards(DB, emqx_ds_replication_layer_meta:my_shards(DB)),
ok = start_egresses(DB, Shards),
ok = save_db_meta(DB, Shards),
@ -104,25 +398,23 @@ allocate_shards(State = #{db := DB}) ->
end.
start_shards(DB, Shards) ->
ok = lists:foreach(
fun(Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard})
end,
Shards
),
ok = logger:info(#{msg => "Shards started", shards => Shards}),
lists:foreach(fun(Shard) -> start_shard(DB, Shard) end, Shards).
start_shard(DB, Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_shard({DB, Shard}),
ok = logger:info(#{msg => "Shard started", shard => Shard}),
ok.
start_egresses(DB, Shards) ->
ok = lists:foreach(
fun(Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard})
end,
Shards
),
logger:info(#{msg => "Egresses started", shards => Shards}),
lists:foreach(fun(Shard) -> start_egress(DB, Shard) end, Shards).
start_egress(DB, Shard) ->
ok = emqx_ds_builtin_db_sup:ensure_egress({DB, Shard}),
ok = logger:info(#{msg => "Egress started", shard => Shard}),
ok.
%%
save_db_meta(DB, Shards) ->
persistent_term:put(?db_meta(DB), #{
shards => Shards,
@ -146,3 +438,10 @@ erase_shards_meta(DB, Shards) ->
erase_shard_meta(DB, Shard) ->
persistent_term:erase(?shard_meta(DB, Shard)).
%%
delay({MinDelay, Variance}) ->
timer:sleep(MinDelay + rand:uniform(Variance));
delay(Delay) ->
timer:sleep(Delay).

View File

@ -26,18 +26,45 @@
-define(DB, testdb).
opts() ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 1,
n_sites => 3,
replication_factor => 3,
replication_options => #{
wal_max_size_bytes => 128 * 1024,
wal_max_batch_size => 1024,
snapshot_interval => 128
}
}.
opts(#{}).
opts(Overrides) ->
maps:merge(
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 16,
n_sites => 1,
replication_factor => 3,
replication_options => #{
wal_max_size_bytes => 64 * 1024,
wal_max_batch_size => 1024,
snapshot_interval => 128
}
},
Overrides
).
appspec(emqx_durable_storage) ->
{emqx_durable_storage, #{
before_start => fun snabbkaffe:fix_ct_logging/0,
override_env => [{egress_flush_interval, 1}]
}}.
t_replication_transfers_snapshots(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[
{t_replication_transfers_snapshots1, #{apps => Apps}},
{t_replication_transfers_snapshots2, #{apps => Apps}},
{t_replication_transfers_snapshots3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
[{nodes, Nodes}, {specs, NodeSpecs} | Config];
t_replication_transfers_snapshots('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_replication_transfers_snapshots(Config) ->
NMsgs = 4000,
@ -45,9 +72,10 @@ t_replication_transfers_snapshots(Config) ->
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
%% Initialize DB on all nodes and wait for it to be online.
Opts = opts(#{n_shards => 1, n_sites => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()])
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry(
500,
@ -88,7 +116,7 @@ t_replication_transfers_snapshots(Config) ->
Shard = hd(shards(NodeOffline, ?DB)),
MessagesOffline = lists:keysort(
#message.timestamp,
consume(NodeOffline, ?DB, Shard, ['#'], 0)
consume_shard(NodeOffline, ?DB, Shard, ['#'], 0)
),
?assertEqual(
sample(40, Messages),
@ -99,26 +127,391 @@ t_replication_transfers_snapshots(Config) ->
MessagesOffline
).
t_rebalance(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_rebalance1, #{apps => Apps}},
{t_rebalance2, #{apps => Apps}},
{t_rebalance3, #{apps => Apps}},
{t_rebalance4, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance(Config) ->
%% This testcase verifies that the storage rebalancing works correctly:
%% 1. Join/leave operations are applied successfully.
%% 2. Message data survives the rebalancing.
%% 3. Shard cluster membership converges to the target replica allocation.
%% 4. Replication factor is respected.
NMsgs = 800,
NClients = 5,
Nodes = [N1, N2, N3, N4] = ?config(nodes, Config),
%% Initialize DB on the first node.
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
?assertMatch(
Shards when length(Shards) == 16,
shards_online(N1, ?DB)
),
%% Open DB on the rest of the nodes.
?assertEqual(
[{ok, ok} || _ <- [N2, N3, N4]],
erpc:multicall([N2, N3, N4], emqx_ds, open_db, [?DB, Opts])
),
Sites = [S1, S2 | _Rest] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]),
%% Only N1 should be responsible for all shards initially.
?assertEqual(
[[S1] || _ <- Nodes],
[ds_repl_meta(N, db_sites, [?DB]) || N <- Nodes]
),
%% Fill the storage with messages and few additional generations.
%% This will force shards to trigger snapshot transfers during rebalance.
ClientMessages = emqx_utils:pmap(
fun(CID) ->
N = lists:nth(1 + (CID rem length(Nodes)), Nodes),
fill_storage(N, ?DB, NMsgs, #{client_id => integer_to_binary(CID)})
end,
lists:seq(1, NClients),
infinity
),
Messages1 = lists:sort(fun compare_message/2, lists:append(ClientMessages)),
%% Join the second site to the DB replication sites.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, join_db_site, [?DB, S2])),
ct:pal("Transitions (~p -> ~p): ~p~n", [[S1], [S1, S2], transitions(N1, ?DB)]),
%% Fill in some more messages *during* the rebalance.
MessagesRB1 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB1">>}),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
%% Now join the rest of the sites.
?assertEqual(ok, ds_repl_meta(N2, assign_db_sites, [?DB, Sites])),
ct:pal("Transitions (~p -> ~p): ~p~n", [[S1, S2], Sites, transitions(N1, ?DB)]),
%% Fill in some more messages *during* the rebalance.
MessagesRB2 = fill_storage(N4, ?DB, NMsgs, #{client_id => <<"RB2">>}),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
%% Verify that each node is now responsible for 3/4 of the shards.
?assertEqual(
[(16 * 3) div length(Nodes) || _ <- Nodes],
[n_shards_online(N, ?DB) || N <- Nodes]
),
%% Verify that the set of shard servers matches the target allocation.
Allocation = [ds_repl_meta(N, my_shards, [?DB]) || N <- Nodes],
ShardServers = [
shard_server_info(N, ?DB, Shard, Site, readiness)
|| {N, Site, Shards} <- lists:zip3(Nodes, Sites, Allocation),
Shard <- Shards
],
?assert(
lists:all(fun({_Server, Status}) -> Status == ready end, ShardServers),
ShardServers
),
%% Verify that the messages are preserved after the rebalance.
Messages = Messages1 ++ MessagesRB1 ++ MessagesRB2,
MessagesN4 = lists:sort(fun compare_message/2, consume(N4, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesN4)),
?assertEqual(Messages, MessagesN4),
%% Scale down the cluster by removing the first node.
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S1])),
ct:pal("Transitions (~p -> ~p): ~p~n", [Sites, tl(Sites), transitions(N1, ?DB)]),
?retry(1000, 10, ?assertEqual([], transitions(N2, ?DB))),
%% Verify that each node is now responsible for each shard.
?assertEqual(
[0, 16, 16, 16],
[n_shards_online(N, ?DB) || N <- Nodes]
),
%% Verify that the messages are once again preserved after the rebalance.
MessagesN3 = lists:sort(fun compare_message/2, consume(N3, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesN3)),
?assertEqual(Messages, MessagesN3).
t_join_leave_errors(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_join_leave_errors1, #{apps => Apps}},
{t_join_leave_errors2, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_join_leave_errors('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_join_leave_errors(Config) ->
%% This testcase verifies that logical errors arising during handling of
%% join/leave operations are reported correctly.
[N1, N2] = ?config(nodes, Config),
Opts = opts(#{n_shards => 16, n_sites => 1, replication_factor => 3}),
?assertEqual(ok, erpc:call(N1, emqx_ds, open_db, [?DB, Opts])),
?assertEqual(ok, erpc:call(N2, emqx_ds, open_db, [?DB, Opts])),
[S1, S2] = [ds_repl_meta(N, this_site) || N <- [N1, N2]],
?assertEqual([S1], ds_repl_meta(N1, db_sites, [?DB])),
%% Attempts to join a nonexistent DB / site.
?assertEqual(
{error, {nonexistent_db, boo}},
ds_repl_meta(N1, join_db_site, [_DB = boo, S1])
),
?assertEqual(
{error, {nonexistent_sites, [<<"NO-MANS-SITE">>]}},
ds_repl_meta(N1, join_db_site, [?DB, <<"NO-MANS-SITE">>])
),
%% NOTE: Leaving a non-existent site is not an error.
?assertEqual(
ok,
ds_repl_meta(N1, leave_db_site, [?DB, <<"NO-MANS-SITE">>])
),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)),
%% Impossible to leave the last site.
?assertEqual(
{error, {too_few_sites, []}},
ds_repl_meta(N1, leave_db_site, [?DB, S1])
),
%% "Move" the DB to the other node.
?assertEqual(ok, ds_repl_meta(N1, join_db_site, [?DB, S2])),
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertMatch([_ | _], transitions(N1, ?DB)),
?retry(1000, 10, ?assertEqual([], transitions(N1, ?DB))),
%% Should be no-op.
?assertEqual(ok, ds_repl_meta(N2, leave_db_site, [?DB, S1])),
?assertEqual([], transitions(N1, ?DB)).
t_rebalance_chaotic_converges(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Nodes = emqx_cth_cluster:start(
[
{t_rebalance_chaotic_converges1, #{apps => Apps}},
{t_rebalance_chaotic_converges2, #{apps => Apps}},
{t_rebalance_chaotic_converges3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
[{nodes, Nodes} | Config];
t_rebalance_chaotic_converges('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance_chaotic_converges(Config) ->
%% This testcase verifies that even a very chaotic sequence of join/leave
%% operations will still be handled consistently, and that the shard
%% allocation will converge to the target state.
NMsgs = 500,
Nodes = [N1, N2, N3] = ?config(nodes, Config),
%% Initialize DB on first two nodes.
Opts = opts(#{n_shards => 16, n_sites => 2, replication_factor => 3}),
?assertEqual(
[{ok, ok}, {ok, ok}],
erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
),
%% Open DB on the last node.
?assertEqual(
ok,
erpc:call(N3, emqx_ds, open_db, [?DB, Opts])
),
%% Find out which sites there are.
Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]),
%% Initially, the DB is assigned to [S1, S2].
?retry(500, 10, ?assertEqual([16, 16], [n_shards_online(N, ?DB) || N <- [N1, N2]])),
?assertEqual(
lists:sort([S1, S2]),
ds_repl_meta(N1, db_sites, [?DB])
),
%% Fill the storage with messages and few additional generations.
Messages0 = lists:append([
fill_storage(N1, ?DB, NMsgs, #{client_id => <<"C1">>}),
fill_storage(N2, ?DB, NMsgs, #{client_id => <<"C2">>}),
fill_storage(N3, ?DB, NMsgs, #{client_id => <<"C3">>})
]),
%% Construct a chaotic transition sequence that changes assignment to [S2, S3].
Sequence = [
{N1, join_db_site, S3},
{N2, leave_db_site, S2},
{N3, leave_db_site, S1},
{N1, join_db_site, S2},
{N2, join_db_site, S1},
{N3, leave_db_site, S3},
{N1, leave_db_site, S1},
{N2, join_db_site, S3}
],
%% Apply the sequence while also filling the storage with messages.
TransitionMessages = lists:map(
fun({N, Operation, Site}) ->
%% Apply the transition.
?assertEqual(ok, ds_repl_meta(N, Operation, [?DB, Site])),
%% Give some time for at least one transition to complete.
Transitions = transitions(N, ?DB),
ct:pal("Transitions after ~p: ~p", [Operation, Transitions]),
?retry(200, 10, ?assertNotEqual(Transitions, transitions(N, ?DB))),
%% Fill the storage with messages.
CID = integer_to_binary(erlang:system_time()),
fill_storage(N, ?DB, NMsgs, #{client_id => CID})
end,
Sequence
),
%% Wait for the last transition to complete.
?retry(500, 20, ?assertEqual([], transitions(N1, ?DB))),
?assertEqual(
lists:sort([S2, S3]),
ds_repl_meta(N1, db_sites, [?DB])
),
%% Check that all messages are still there.
Messages = lists:append(TransitionMessages) ++ Messages0,
MessagesDB = lists:sort(fun compare_message/2, consume(N1, ?DB, ['#'], 0)),
?assertEqual(sample(20, Messages), sample(20, MessagesDB)),
?assertEqual(Messages, MessagesDB).
t_rebalance_offline_restarts(init, Config) ->
Apps = [appspec(emqx_durable_storage)],
Specs = emqx_cth_cluster:mk_nodespecs(
[
{t_rebalance_offline_restarts1, #{apps => Apps}},
{t_rebalance_offline_restarts2, #{apps => Apps}},
{t_rebalance_offline_restarts3, #{apps => Apps}}
],
#{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
),
Nodes = emqx_cth_cluster:start(Specs),
[{nodes, Nodes}, {nodespecs, Specs} | Config];
t_rebalance_offline_restarts('end', Config) ->
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
t_rebalance_offline_restarts(Config) ->
%% This testcase verifies that rebalancing progresses if nodes restart or
%% go offline and never come back.
Nodes = [N1, N2, N3] = ?config(nodes, Config),
_Specs = [NS1, NS2, _] = ?config(nodespecs, Config),
%% Initialize DB on all 3 nodes.
Opts = opts(#{n_shards => 8, n_sites => 3, replication_factor => 3}),
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, Opts])
),
?retry(
500,
10,
?assertEqual([8 || _ <- Nodes], [n_shards_online(N, ?DB) || N <- Nodes])
),
%% Find out which sites are there.
Sites = [S1, S2, S3] = [ds_repl_meta(N, this_site) || N <- Nodes],
ct:pal("Sites: ~p~n", [Sites]),
%% Shut down N3 and then remove it from the DB.
ok = emqx_cth_cluster:stop_node(N3),
?assertEqual(ok, ds_repl_meta(N1, leave_db_site, [?DB, S3])),
Transitions = transitions(N1, ?DB),
ct:pal("Transitions: ~p~n", [Transitions]),
%% Wait until at least one transition completes.
?block_until(#{?snk_kind := dsrepl_shard_transition_end}),
%% Restart N1 and N2.
[N1] = emqx_cth_cluster:restart(NS1),
[N2] = emqx_cth_cluster:restart(NS2),
?assertEqual(
[{ok, ok}, {ok, ok}],
erpc:multicall([N1, N2], emqx_ds, open_db, [?DB, Opts])
),
%% Target state should still be reached eventually.
?retry(1000, 20, ?assertEqual([], transitions(N1, ?DB))),
?assertEqual(lists:sort([S1, S2]), ds_repl_meta(N1, db_sites, [?DB])).
%%
shard_server_info(Node, DB, Shard, Site, Info) ->
Server = shard_server(Node, DB, Shard, Site),
{Server, ds_repl_shard(Node, server_info, [Info, Server])}.
shard_server(Node, DB, Shard, Site) ->
ds_repl_shard(Node, shard_server, [DB, Shard, Site]).
ds_repl_meta(Node, Fun) ->
ds_repl_meta(Node, Fun, []).
ds_repl_meta(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_meta, Fun, Args).
ds_repl_shard(Node, Fun, Args) ->
erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args).
transitions(Node, DB) ->
Shards = shards(Node, DB),
[{S, T} || S <- Shards, T <- ds_repl_meta(Node, replica_set_transitions, [DB, S])].
shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
shards_online(Node, DB) ->
erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
n_shards_online(Node, DB) ->
length(shards_online(Node, DB)).
fill_storage(Node, DB, NMsgs, Opts) ->
fill_storage(Node, DB, NMsgs, 0, Opts).
fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs ->
R1 = push_message(Node, DB, I),
fill_storage(Node, DB, NMsgs, I, Opts) when I < NMsgs ->
PAddGen = maps:get(p_addgen, Opts, 0.001),
R1 = push_message(Node, DB, I, Opts),
R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
[].
push_message(Node, DB, I) ->
push_message(Node, DB, I, Opts) ->
Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
{Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
Message = message(Topic, Bytes, I * 100),
ClientId = maps:get(client_id, Opts, <<?MODULE_STRING>>),
Message = message(ClientId, Topic, Bytes, I * 100),
ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
[Message].
@ -126,16 +519,22 @@ add_generation(Node, DB) ->
ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
[].
message(Topic, Payload, PublishedAt) ->
message(ClientId, Topic, Payload, PublishedAt) ->
#message{
from = <<?MODULE_STRING>>,
from = ClientId,
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
consume(Node, DB, Shard, TopicFilter, StartTime) ->
compare_message(M1, M2) ->
{M1#message.from, M1#message.timestamp} < {M2#message.from, M2#message.timestamp}.
consume(Node, DB, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_test_helpers, consume, [DB, TopicFilter, StartTime]).
consume_shard(Node, DB, Shard, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_test_helpers, storage_consume, [{DB, Shard}, TopicFilter, StartTime]).
probably(P, Fun) ->
@ -156,26 +555,11 @@ suite() -> [{timetrap, {seconds, 60}}].
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TCName, Config) ->
Apps = [
{emqx_durable_storage, #{
before_start => fun snabbkaffe:fix_ct_logging/0,
override_env => [{egress_flush_interval, 1}]
}}
],
WorkDir = emqx_cth_suite:work_dir(TCName, Config),
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[
{emqx_ds_replication_SUITE1, #{apps => Apps}},
{emqx_ds_replication_SUITE2, #{apps => Apps}},
{emqx_ds_replication_SUITE3, #{apps => Apps}}
],
#{work_dir => WorkDir}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
init_per_testcase(TCName, Config0) ->
Config = emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config0),
ok = snabbkaffe:start_trace(),
[{nodes, Nodes}, {specs, NodeSpecs} | Config].
Config.
end_per_testcase(_TCName, Config) ->
end_per_testcase(TCName, Config) ->
ok = snabbkaffe:stop(),
ok = emqx_cth_cluster:stop(?config(nodes, Config)).
emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config).