feat(dsrepl): transfer storage snapshot during ra snapshot recovery

This commit is contained in:
Andrew Mayorov 2024-03-19 20:05:12 +01:00
parent 9e34fcd1d7
commit cc9926f159
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
9 changed files with 1027 additions and 109 deletions

View File

@ -21,7 +21,16 @@
-behaviour(supervisor).
%% API:
-export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]).
-export([
start_db/2,
start_shard/1,
start_egress/1,
stop_shard/1,
terminate_storage/1,
restart_storage/1,
ensure_shard/1,
ensure_egress/1
]).
-export([which_shards/1]).
%% behaviour callbacks:
@ -64,12 +73,22 @@ start_shard({DB, Shard}) ->
start_egress({DB, Shard}) ->
supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)).
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok.
stop_shard(Shard = {DB, _}) ->
Sup = ?via(#?shards_sup{db = DB}),
ok = supervisor:terminate_child(Sup, Shard),
ok = supervisor:delete_child(Sup, Shard).
-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
terminate_storage({DB, Shard}) ->
Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
supervisor:terminate_child(Sup, {Shard, storage}).
-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}.
restart_storage({DB, Shard}) ->
Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
supervisor:restart_child(Sup, {Shard, storage}).
-spec ensure_shard(emqx_ds_storage_layer:shard_id()) ->
ok | {error, _Reason}.
ensure_shard(Shard) ->

View File

@ -65,7 +65,9 @@
-export([
init/1,
apply/3
apply/3,
snapshot_module/0
]).
-export_type([
@ -80,6 +82,10 @@
batch/0
]).
-export_type([
ra_state/0
]).
-include_lib("emqx_utils/include/emqx_message.hrl").
-include("emqx_ds_replication_layer.hrl").
@ -140,6 +146,20 @@
-type generation_rank() :: {shard_id(), term()}.
%% Core state of the replication, i.e. the state of ra machine.
-type ra_state() :: #{
db_shard := {emqx_ds:db(), shard_id()},
latest := timestamp_us()
}.
%% Command. Each command is an entry in the replication log.
-type ra_command() :: #{
?tag := ?BATCH | add_generation | update_config | drop_generation,
_ => _
}.
-type timestamp_us() :: non_neg_integer().
%%================================================================================
%% API functions
%%================================================================================
@ -589,9 +609,12 @@ ra_drop_shard(DB, Shard) ->
%%
-spec init(_Args :: map()) -> ra_state().
init(#{db := DB, shard := Shard}) ->
#{db_shard => {DB, Shard}, latest => 0}.
-spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) ->
{ra_state(), _Reply, _Effects}.
apply(
#{index := RaftIdx},
#{
@ -671,3 +694,6 @@ timestamp_to_timeus(TimestampMs) ->
timeus_to_timestamp(TimestampUs) ->
TimestampUs div 1000.
snapshot_module() ->
emqx_ds_replication_snapshot.

View File

@ -147,19 +147,21 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) ->
Bootstrap = false;
{error, name_not_registered} ->
Bootstrap = true,
ok = ra:start_server(DB, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
cluster_name => ClusterName,
initial_members => Servers,
machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
log_init_args => maps:with(
Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
LogOpts = maps:with(
[
snapshot_interval,
resend_window
],
ReplicationOpts
)
),
ok = ra:start_server(DB, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
cluster_name => ClusterName,
initial_members => Servers,
machine => Machine,
log_init_args => LogOpts
})
end,
case Servers of

View File

@ -0,0 +1,229 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_replication_snapshot).
-include_lib("snabbkaffe/include/trace.hrl").
-behaviour(ra_snapshot).
-export([
prepare/2,
write/3,
begin_read/2,
read_chunk/3,
begin_accept/2,
accept_chunk/2,
complete_accept/2,
recover/1,
validate/1,
read_meta/1
]).
%% Read state.
-record(rs, {
phase :: machine_state | storage_snapshot,
started_at :: _Time :: integer(),
state :: emqx_ds_replication_layer:ra_state() | undefined,
reader :: emqx_ds_storage_snapshot:reader() | undefined
}).
%% Write state.
-record(ws, {
phase :: machine_state | storage_snapshot,
started_at :: _Time :: integer(),
dir :: file:filename(),
meta :: ra_snapshot:meta(),
state :: emqx_ds_replication_layer:ra_state() | undefined,
writer :: emqx_ds_storage_snapshot:writer() | undefined
}).
-type rs() :: #rs{}.
-type ws() :: #ws{}.
-type ra_state() :: emqx_ds_replication_layer:ra_state().
%% Writing a snapshot.
%% This process is exactly the same as writing a ra log snapshot: store the
%% log meta and the machine state in a single snapshot file.
-spec prepare(_RaftIndex, ra_state()) -> _State :: ra_state().
prepare(Index, State) ->
ra_log_snapshot:prepare(Index, State).
-spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
write(Dir, Meta, MachineState) ->
ra_log_snapshot:write(Dir, Meta, MachineState).
%% Reading a snapshot.
%% This is triggered by the leader when it finds out that a follower is
%% behind so much that there are no log segments covering the gap anymore.
%% This process, on the other hand, MUST involve reading the storage snapshot,
%% (in addition to the log snapshot) to reconstruct the storage state on the
%% target node.
-spec begin_read(_SnapshotDir :: file:filename(), _Context :: #{}) ->
{ok, ra_snapshot:meta(), rs()} | {error, _Reason :: term()}.
begin_read(Dir, _Context) ->
RS = #rs{
phase = machine_state,
started_at = erlang:monotonic_time(millisecond)
},
case ra_log_snapshot:recover(Dir) of
{ok, Meta, MachineState} ->
start_snapshot_reader(Meta, RS#rs{state = MachineState});
Error ->
Error
end.
start_snapshot_reader(Meta, RS) ->
ShardId = shard_id(RS),
logger:info(#{
msg => "dsrepl_snapshot_read_started",
shard => ShardId
}),
{ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(ShardId),
{ok, Meta, RS#rs{reader = SnapReader}}.
-spec read_chunk(rs(), _Size :: non_neg_integer(), _SnapshotDir :: file:filename()) ->
{ok, binary(), {next, rs()} | last} | {error, _Reason :: term()}.
read_chunk(RS = #rs{phase = machine_state, state = MachineState}, _Size, _Dir) ->
Chunk = term_to_binary(MachineState),
{ok, Chunk, {next, RS#rs{phase = storage_snapshot}}};
read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir) ->
case emqx_ds_storage_snapshot:read_chunk(SnapReader0, Size) of
{next, Chunk, SnapReader} ->
{ok, Chunk, {next, RS#rs{reader = SnapReader}}};
{last, Chunk, SnapReader} ->
%% TODO: idempotence?
?tp(dsrepl_snapshot_read_complete, #{reader => SnapReader}),
_ = complete_read(RS#rs{reader = SnapReader}),
{ok, Chunk, last};
{error, Reason} ->
?tp(dsrepl_snapshot_read_error, #{reason => Reason, reader => SnapReader0}),
_ = emqx_ds_storage_snapshot:release_reader(SnapReader0),
error(Reason)
end.
complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
_ = emqx_ds_storage_snapshot:release_reader(SnapReader),
logger:info(#{
msg => "dsrepl_snapshot_read_complete",
shard => shard_id(RS),
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
read_bytes => emqx_ds_storage_snapshot:reader_info(bytes_read, SnapReader)
}).
%% Accepting a snapshot.
%% This process is triggered by the target server, when the leader finds out
%% that the target server is severely lagging behind. This is receiving side of
%% `begin_read/2` and `read_chunk/3`.
-spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) ->
{ok, ws()}.
begin_accept(Dir, Meta) ->
WS = #ws{
phase = machine_state,
started_at = erlang:monotonic_time(millisecond),
dir = Dir,
meta = Meta
},
{ok, WS}.
-spec accept_chunk(binary(), ws()) ->
{ok, ws()} | {error, _Reason :: term()}.
accept_chunk(Chunk, WS = #ws{phase = machine_state}) ->
MachineState = binary_to_term(Chunk),
start_snapshot_writer(WS#ws{state = MachineState});
accept_chunk(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
%% TODO: idempotence?
case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
{next, SnapWriter} ->
{ok, WS#ws{writer = SnapWriter}};
{error, Reason} ->
?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
_ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
error(Reason)
end.
start_snapshot_writer(WS) ->
ShardId = shard_id(WS),
logger:info(#{
msg => "dsrepl_snapshot_write_started",
shard => ShardId
}),
_ = emqx_ds_builtin_db_sup:terminate_storage(ShardId),
{ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(ShardId),
{ok, WS#ws{phase = storage_snapshot, writer = SnapWriter}}.
-spec complete_accept(ws()) -> ok | {error, ra_snapshot:file_err()}.
complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ->
%% TODO: idempotence?
case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of
{last, SnapWriter} ->
?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
_ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
Result = complete_accept(WS#ws{writer = SnapWriter}),
?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}),
Result;
{error, Reason} ->
?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
_ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0),
error(Reason)
end.
complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
ShardId = shard_id(WS),
logger:info(#{
msg => "dsrepl_snapshot_read_complete",
shard => ShardId,
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)
}),
{ok, _} = emqx_ds_builtin_db_sup:restart_storage(ShardId),
write_machine_snapshot(WS).
write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
write(Dir, Meta, MachineState).
%% Restoring machine state from a snapshot.
%% This is equivalent to restoring from a log snapshot.
-spec recover(_SnapshotDir :: file:filename()) ->
{ok, ra_snapshot:meta(), ra_state()} | {error, _Reason}.
recover(Dir) ->
%% TODO: Verify that storage layer is online?
ra_log_snapshot:recover(Dir).
-spec validate(_SnapshotDir :: file:filename()) ->
ok | {error, _Reason}.
validate(Dir) ->
ra_log_snapshot:validate(Dir).
-spec read_meta(_SnapshotDir :: file:filename()) ->
{ok, ra_snapshot:meta()} | {error, _Reason}.
read_meta(Dir) ->
ra_log_snapshot:read_meta(Dir).
shard_id(#rs{state = MachineState}) ->
shard_id(MachineState);
shard_id(#ws{state = MachineState}) ->
shard_id(MachineState);
shard_id(MachineState) ->
maps:get(db_shard, MachineState).

View File

@ -19,8 +19,11 @@
%% Replication layer API:
-export([
open_shard/2,
%% Lifecycle
start_link/2,
drop_shard/1,
%% Data
store_batch/3,
get_streams/3,
get_delete_streams/3,
@ -29,14 +32,20 @@
update_iterator/3,
next/3,
delete_next/4,
%% Generations
update_config/3,
add_generation/2,
list_generations_with_lifetimes/1,
drop_generation/2
drop_generation/2,
%% Snapshotting
take_snapshot/1,
accept_snapshot/1
]).
%% gen_server
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([db_dir/1]).
@ -229,10 +238,7 @@
-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
-record(call_list_generations_with_lifetimes, {}).
-record(call_drop_generation, {gen_id :: gen_id()}).
-spec open_shard(shard_id(), options()) -> ok.
open_shard(Shard, Options) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
-record(call_take_snapshot, {}).
-spec drop_shard(shard_id()) -> ok.
drop_shard(Shard) ->
@ -244,12 +250,24 @@ drop_shard(Shard) ->
emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) ->
store_batch(Shard, Messages0, Options) ->
%% We always store messages in the current generation:
GenId = generation_current(Shard),
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
#{module := Mod, data := GenData, since := Since} = generation_get(Shard, GenId),
case Messages0 of
[{Time, _Msg} | Rest] when Time < Since ->
%% FIXME: log / feedback
Messages = skip_outdated_messages(Since, Rest);
_ ->
Messages = Messages0
end,
Mod:store_batch(Shard, GenData, Messages, Options).
skip_outdated_messages(Since, [{Time, _Msg} | Rest]) when Time < Since ->
skip_outdated_messages(Since, Rest);
skip_outdated_messages(_Since, Messages) ->
Messages.
-spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{integer(), stream()}].
get_streams(Shard, TopicFilter, StartTime) ->
@ -436,6 +454,20 @@ list_generations_with_lifetimes(ShardId) ->
drop_generation(ShardId, GenId) ->
gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
-spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}.
take_snapshot(ShardId) ->
case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of
{ok, Dir} ->
emqx_ds_storage_snapshot:new_reader(Dir);
Error ->
Error
end.
-spec accept_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:writer()} | {error, _Reason}.
accept_snapshot(ShardId) ->
ok = drop_shard(ShardId),
handle_accept_snapshot(ShardId).
%%================================================================================
%% gen_server for the shard
%%================================================================================
@ -505,6 +537,9 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
{Reply, S} = handle_drop_generation(S0, GenId),
commit_metadata(S),
{reply, Reply, S};
handle_call(#call_take_snapshot{}, _From, S) ->
Snapshot = handle_take_snapshot(S),
{reply, Snapshot, S};
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
@ -726,7 +761,11 @@ rocksdb_open(Shard, Options) ->
-spec db_dir(shard_id()) -> file:filename().
db_dir({DB, ShardId}) ->
filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
checkpoint_dir({DB, ShardId}, Name) ->
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]).
-spec update_last_until(Schema, emqx_ds:time()) ->
Schema | {error, exists | overlaps_existing_generations}
@ -759,6 +798,21 @@ run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) ->
%% Different implementation modules
NewGenData.
handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
Name = integer_to_list(erlang:system_time(millisecond)),
Dir = checkpoint_dir(ShardId, Name),
_ = filelib:ensure_dir(Dir),
case rocksdb:checkpoint(DB, Dir) of
ok ->
{ok, Dir};
{error, _} = Error ->
Error
end.
handle_accept_snapshot(ShardId) ->
Dir = db_dir(ShardId),
emqx_ds_storage_snapshot:new_writer(Dir).
%%--------------------------------------------------------------------------------
%% Schema access
%%--------------------------------------------------------------------------------

View File

@ -1,88 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_layer_sup).
-behaviour(supervisor).
%% API:
-export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]).
%% behaviour callbacks:
-export([init/1]).
%%================================================================================
%% Type declarations
%%================================================================================
-define(SUP, ?MODULE).
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor:start_link(?MODULE, []).
-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:startchild_ret().
start_shard(Shard, Options) ->
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard),
ok = supervisor:delete_child(?SUP, Shard).
-spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
ok | {error, _Reason}.
ensure_shard(Shard, Options) ->
case start_shard(Shard, Options) of
{ok, _Pid} ->
ok;
{error, {already_started, _Pid}} ->
ok;
{error, Reason} ->
{error, Reason}
end.
%%================================================================================
%% behaviour callbacks
%%================================================================================
init([]) ->
Children = [],
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
{ok, {SupFlags, Children}}.
%%================================================================================
%% Internal functions
%%================================================================================
-spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:child_spec().
shard_child_spec(Shard, Options) ->
#{
id => Shard,
start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
shutdown => 5_000,
restart => permanent,
type => worker
}.

View File

@ -0,0 +1,325 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_snapshot).
-include_lib("kernel/include/file.hrl").
-export([
new_reader/1,
read_chunk/2,
abort_reader/1,
release_reader/1,
reader_info/2
]).
-export([
new_writer/1,
write_chunk/2,
abort_writer/1,
release_writer/1,
writer_info/2
]).
-export_type([
reader/0,
writer/0
]).
%%
-define(FILECHUNK(RELPATH, POS, MORE), #{
'$' => chunk,
rp => RELPATH,
pos => POS,
more => MORE
}).
-define(PAT_FILECHUNK(RELPATH, POS, MORE), #{
'$' := chunk,
rp := RELPATH,
pos := POS,
more := MORE
}).
-define(EOS(), #{
'$' => eos
}).
-define(PAT_EOS(), #{
'$' := eos
}).
-define(PAT_HEADER(), #{'$' := _}).
%%
-record(reader, {
dirpath :: file:filename(),
files :: #{_RelPath => reader_file()},
queue :: [_RelPath :: file:filename()]
}).
-record(rfile, {
abspath :: file:filename(),
fd :: file:io_device() | eof,
pos :: non_neg_integer()
}).
-opaque reader() :: #reader{}.
-type reader_file() :: #rfile{}.
-type reason() :: {atom(), _AbsPath :: file:filename(), _Details :: term()}.
%% @doc Initialize a reader for a snapshot directory.
%% Snapshot directory is a directory containing arbitrary number of regular
%% files in arbitrary subdirectory structure. Files are read in indeterminate
%% order. It's an error to have non-regular files in the directory (e.g. symlinks).
-spec new_reader(_Dir :: file:filename()) -> {ok, reader()}.
new_reader(DirPath) ->
%% NOTE
%% Opening all files at once, so there would be less error handling later
%% during transfer.
%% TODO
%% Beware of how errors are handled: if one file fails to open, the whole
%% process will exit. This is fine for the purpose of replication (because
%% ra spawns separate process for each transfer), but may not be suitable
%% for other use cases.
Files = emqx_utils_fs:traverse_dir(
fun(Path, Info, Acc) -> new_reader_file(Path, Info, DirPath, Acc) end,
#{},
DirPath
),
{ok, #reader{
dirpath = DirPath,
files = Files,
queue = maps:keys(Files)
}}.
new_reader_file(Path, #file_info{type = regular}, DirPath, Acc) ->
case file:open(Path, [read, binary, raw]) of
{ok, IoDev} ->
RelPath = emqx_utils_fs:find_relpath(Path, DirPath),
File = #rfile{abspath = Path, fd = IoDev, pos = 0},
Acc#{RelPath => File};
{error, Reason} ->
error({open_failed, Path, Reason})
end;
new_reader_file(Path, #file_info{type = Type}, _, _Acc) ->
error({bad_file_type, Path, Type});
new_reader_file(Path, {error, Reason}, _, _Acc) ->
error({inaccessible, Path, Reason}).
%% @doc Read a chunk of data from the snapshot.
%% Returns `{last, Chunk, Reader}` when the last chunk is read. After that, one
%% should call `release_reader/1` to finalize the process (or `abort_reader/1` if
%% keeping the snapshot is desired).
-spec read_chunk(reader(), _Size :: non_neg_integer()) ->
{last | next, _Chunk :: iodata(), reader()} | {error, reason()}.
read_chunk(R = #reader{files = Files, queue = [RelPath | Rest]}, Size) ->
File = maps:get(RelPath, Files),
case read_chunk_file(RelPath, File, Size) of
{last, Chunk, FileRest} ->
{next, Chunk, R#reader{files = Files#{RelPath := FileRest}, queue = Rest}};
{next, Chunk, FileRest} ->
{next, Chunk, R#reader{files = Files#{RelPath := FileRest}}};
Error ->
Error
end;
read_chunk(R = #reader{queue = []}, _Size) ->
{last, make_packet(?EOS()), R}.
read_chunk_file(RelPath, RFile0 = #rfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Size) ->
case file:read(IoDev, Size) of
{ok, Chunk} ->
ChunkSize = byte_size(Chunk),
HasMore = ChunkSize div Size,
RFile1 = RFile0#rfile{pos = Pos + ChunkSize},
case ChunkSize < Size of
false ->
Status = next,
RFile = RFile1;
true ->
Status = last,
RFile = release_reader_file(RFile1)
end,
Packet = make_packet(?FILECHUNK(RelPath, Pos, HasMore), Chunk),
{Status, Packet, RFile};
eof ->
Packet = make_packet(?FILECHUNK(RelPath, Pos, 0)),
{last, Packet, release_reader_file(RFile0)};
{error, Reason} ->
{error, {read_failed, AbsPath, Reason}}
end.
%% @doc Aborts the snapshot reader, but does not release the snapshot files.
-spec abort_reader(reader()) -> ok.
abort_reader(#reader{files = Files}) ->
lists:foreach(fun release_reader_file/1, maps:values(Files)).
%% @doc Aborts the snapshot reader and deletes the snapshot files.
-spec release_reader(reader()) -> ok.
release_reader(R = #reader{dirpath = DirPath}) ->
ok = abort_reader(R),
file:del_dir_r(DirPath).
release_reader_file(RFile = #rfile{fd = eof}) ->
RFile;
release_reader_file(RFile = #rfile{fd = IoDev}) ->
_ = file:close(IoDev),
RFile#rfile{fd = eof}.
-spec reader_info(bytes_read, reader()) -> _Bytes :: non_neg_integer().
reader_info(bytes_read, #reader{files = Files}) ->
maps:fold(fun(_, RFile, Sum) -> Sum + RFile#rfile.pos end, 0, Files).
%%
-record(writer, {
dirpath :: file:filename(),
files :: #{_RelPath :: file:filename() => writer_file()}
}).
-record(wfile, {
abspath :: file:filename(),
fd :: file:io_device() | eof,
pos :: non_neg_integer()
}).
-opaque writer() :: #writer{}.
-type writer_file() :: #wfile{}.
%% @doc Initialize a writer into a snapshot directory.
%% The directory needs not to exist, it will be created if it doesn't.
%% Having non-empty directory is not an error, existing files will be
%% overwritten.
-spec new_writer(_Dir :: file:filename()) -> {ok, writer()} | {error, reason()}.
new_writer(DirPath) ->
case filelib:ensure_path(DirPath) of
ok ->
{ok, #writer{dirpath = DirPath, files = #{}}};
{error, Reason} ->
{error, {mkdir_failed, DirPath, Reason}}
end.
%% @doc Write a chunk of data to the snapshot.
%% Returns `{last, Writer}` when the last chunk is written. After that, one
%% should call `release_writer/1` to finalize the process.
-spec write_chunk(writer(), _Chunk :: binary()) ->
{last | next, writer()} | {error, _Reason}.
write_chunk(W, Packet) ->
case parse_packet(Packet) of
{?PAT_FILECHUNK(RelPath, Pos, More), Chunk} ->
write_chunk(W, RelPath, Pos, More, Chunk);
{?PAT_EOS(), _Rest} ->
%% TODO: Verify all files are `eof` at this point?
{last, W};
Error ->
Error
end.
write_chunk(W = #writer{files = Files}, RelPath, Pos, More, Chunk) ->
case Files of
#{RelPath := WFile} ->
write_chunk(W, WFile, RelPath, Pos, More, Chunk);
#{} when Pos == 0 ->
case new_writer_file(W, RelPath) of
WFile = #wfile{} ->
write_chunk(W, WFile, RelPath, Pos, More, Chunk);
Error ->
Error
end;
#{} ->
{error, {bad_chunk, RelPath, Pos}}
end.
write_chunk(W = #writer{files = Files}, WFile0, RelPath, Pos, More, Chunk) ->
case write_chunk_file(WFile0, Pos, More, Chunk) of
WFile = #wfile{} ->
{next, W#writer{files = Files#{RelPath => WFile}}};
Error ->
Error
end.
new_writer_file(#writer{dirpath = DirPath}, RelPath) ->
AbsPath = filename:join(DirPath, RelPath),
_ = filelib:ensure_dir(AbsPath),
case file:open(AbsPath, [write, binary, raw]) of
{ok, IoDev} ->
#wfile{
abspath = AbsPath,
fd = IoDev,
pos = 0
};
{error, Reason} ->
{error, {open_failed, AbsPath, Reason}}
end.
write_chunk_file(WFile0 = #wfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Pos, More, Chunk) ->
ChunkSize = byte_size(Chunk),
case file:write(IoDev, Chunk) of
ok ->
WFile1 = WFile0#wfile{pos = Pos + ChunkSize},
case More of
0 -> release_writer_file(WFile1);
_ -> WFile1
end;
{error, Reason} ->
{error, {write_failed, AbsPath, Reason}}
end;
write_chunk_file(WFile = #wfile{pos = WPos}, Pos, _More, _Chunk) when Pos < WPos ->
WFile;
write_chunk_file(#wfile{abspath = AbsPath}, Pos, _More, _Chunk) ->
{error, {bad_chunk, AbsPath, Pos}}.
%% @doc Abort the writer and clean up unfinished snapshot files.
-spec abort_writer(writer()) -> ok | {error, file:posix()}.
abort_writer(W = #writer{dirpath = DirPath}) ->
ok = release_writer(W),
file:del_dir_r(DirPath).
%% @doc Release the writer and close all snapshot files.
-spec release_writer(writer()) -> ok.
release_writer(#writer{files = Files}) ->
ok = lists:foreach(fun release_writer_file/1, maps:values(Files)).
release_writer_file(WFile = #wfile{fd = eof}) ->
WFile;
release_writer_file(WFile = #wfile{fd = IoDev}) ->
_ = file:close(IoDev),
WFile#wfile{fd = eof}.
-spec writer_info(bytes_written, writer()) -> _Bytes :: non_neg_integer().
writer_info(bytes_written, #writer{files = Files}) ->
maps:fold(fun(_, WFile, Sum) -> Sum + WFile#wfile.pos end, 0, Files).
%%
make_packet(Header) ->
term_to_binary(Header).
make_packet(Header, Rest) ->
HeaderBytes = term_to_binary(Header),
<<HeaderBytes/binary, Rest/binary>>.
parse_packet(Packet) ->
try binary_to_term(Packet, [safe, used]) of
{Header = ?PAT_HEADER(), Length} ->
Rest = binary:part(Packet, Length, byte_size(Packet) - Length),
{Header, Rest};
{Header, _} ->
{error, {bad_header, Header}}
catch
error:badarg ->
{error, bad_packet}
end.

View File

@ -0,0 +1,202 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_replication_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
-define(DB, testdb).
opts() ->
#{
backend => builtin,
storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => 1,
n_sites => 3,
replication_factor => 3,
replication_options => #{
wal_max_size_bytes => 128 * 1024,
wal_max_batch_size => 1024,
snapshot_interval => 128
}
}.
t_replication_transfers_snapshots(Config) ->
NMsgs = 4000,
Nodes = [Node, NodeOffline | _] = ?config(nodes, Config),
_Specs = [_, SpecOffline | _] = ?config(specs, Config),
%% Initialize DB on all nodes and wait for it to be online.
?assertEqual(
[{ok, ok} || _ <- Nodes],
erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()])
),
?retry(
500,
10,
?assertMatch([_], shards_online(Node, ?DB))
),
%% Stop the DB on the "offline" node.
ok = emqx_cth_cluster:stop_node(NodeOffline),
%% Fill the storage with messages and few additional generations.
Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}),
%% Restart the node.
[NodeOffline] = emqx_cth_cluster:restart(SpecOffline),
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{
?snk_kind := dsrepl_snapshot_accepted,
?snk_meta := #{node := NodeOffline}
})
),
?assertEqual(
ok,
erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()])
),
%% Trigger storage operation and wait the replica to be restored.
_ = add_generation(Node, ?DB),
?assertMatch(
{ok, _},
snabbkaffe:receive_events(SRef)
),
%% Wait until any pending replication activities are finished (e.g. Raft log entries).
ok = timer:sleep(3_000),
%% Check that the DB has been restored.
Shard = hd(shards(NodeOffline, ?DB)),
MessagesOffline = lists:keysort(
#message.timestamp,
consume(NodeOffline, ?DB, Shard, ['#'], 0)
),
?assertEqual(
sample(40, Messages),
sample(40, MessagesOffline)
),
?assertEqual(
Messages,
MessagesOffline
).
shards(Node, DB) ->
erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]).
shards_online(Node, DB) ->
erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]).
fill_storage(Node, DB, NMsgs, Opts) ->
fill_storage(Node, DB, NMsgs, 0, Opts).
fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs ->
R1 = push_message(Node, DB, I),
R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end),
R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts);
fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) ->
[].
push_message(Node, DB, I) ->
Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]),
{Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)),
Message = message(Topic, Bytes, I * 100),
ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]),
[Message].
add_generation(Node, DB) ->
ok = erpc:call(Node, emqx_ds, add_generation, [DB]),
[].
message(Topic, Payload, PublishedAt) ->
#message{
from = <<?MODULE_STRING>>,
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
consume(Node, DB, Shard, TopicFilter, StartTime) ->
Streams = erpc:call(Node, emqx_ds_storage_layer, get_streams, [
{DB, Shard}, TopicFilter, StartTime
]),
lists:flatmap(
fun({_Rank, Stream}) ->
{ok, It} = erpc:call(Node, emqx_ds_storage_layer, make_iterator, [
{DB, Shard}, Stream, TopicFilter, StartTime
]),
consume_stream(Node, DB, Shard, It)
end,
Streams
).
consume_stream(Node, DB, Shard, It) ->
case erpc:call(Node, emqx_ds_storage_layer, next, [{DB, Shard}, It, 100]) of
{ok, _NIt, _Msgs = []} ->
[];
{ok, NIt, Batch} ->
[Msg || {_Key, Msg} <- Batch] ++ consume_stream(Node, DB, Shard, NIt);
{ok, end_of_stream} ->
[]
end.
probably(P, Fun) ->
case rand:uniform() of
X when X < P -> Fun();
_ -> []
end.
sample(N, List) ->
L = length(List),
H = N div 2,
Filler = integer_to_list(L - N) ++ " more",
lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L).
%%
suite() -> [{timetrap, {seconds, 60}}].
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TCName, Config) ->
Apps = [
{emqx_durable_storage, #{
before_start => fun snabbkaffe:fix_ct_logging/0,
override_env => [{egress_flush_interval, 1}]
}}
],
WorkDir = emqx_cth_suite:work_dir(TCName, Config),
NodeSpecs = emqx_cth_cluster:mk_nodespecs(
[
{emqx_ds_replication_SUITE1, #{apps => Apps}},
{emqx_ds_replication_SUITE2, #{apps => Apps}},
{emqx_ds_replication_SUITE3, #{apps => Apps}}
],
#{work_dir => WorkDir}
),
Nodes = emqx_cth_cluster:start(NodeSpecs),
ok = snabbkaffe:start_trace(),
[{nodes, Nodes}, {specs, NodeSpecs} | Config].
end_per_testcase(_TCName, Config) ->
ok = snabbkaffe:stop(),
ok = emqx_cth_cluster:stop(?config(nodes, Config)).

View File

@ -0,0 +1,149 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_snapshot_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
opts() ->
#{storage => {emqx_ds_storage_bitfield_lts, #{}}}.
%%
t_snapshot_take_restore(_Config) ->
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
%% Push some messages to the shard.
Msgs1 = [gen_message(N) || N <- lists:seq(1000, 2000)],
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs1), #{})),
%% Add new generation and push some more.
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 3000)),
Msgs2 = [gen_message(N) || N <- lists:seq(4000, 5000)],
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs2), #{})),
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 6000)),
%% Take a snapshot of the shard.
{ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(Shard),
%% Push even more messages to the shard AFTER taking the snapshot.
Msgs3 = [gen_message(N) || N <- lists:seq(7000, 8000)],
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs3), #{})),
%% Destroy the shard.
_ = unlink(Pid),
ok = proc_lib:stop(Pid, shutdown, infinity),
ok = emqx_ds_storage_layer:drop_shard(Shard),
%% Restore the shard from the snapshot.
{ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(Shard),
?assertEqual(ok, transfer_snapshot(SnapReader, SnapWriter)),
%% Verify that the restored shard contains the messages up until the snapshot.
{ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
?assertEqual(
Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, consume(Shard, ['#']))
).
mk_batch(Msgs) ->
[{emqx_message:timestamp(Msg, microsecond), Msg} || Msg <- Msgs].
gen_message(N) ->
Topic = emqx_topic:join([<<"foo">>, <<"bar">>, integer_to_binary(N)]),
message(Topic, integer_to_binary(N), N * 100).
message(Topic, Payload, PublishedAt) ->
#message{
from = <<?MODULE_STRING>>,
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
transfer_snapshot(Reader, Writer) ->
ChunkSize = rand:uniform(1024),
case emqx_ds_storage_snapshot:read_chunk(Reader, ChunkSize) of
{RStatus, Chunk, NReader} ->
Data = iolist_to_binary(Chunk),
{WStatus, NWriter} = emqx_ds_storage_snapshot:write_chunk(Writer, Data),
%% Verify idempotency.
?assertEqual(
{WStatus, NWriter},
emqx_ds_storage_snapshot:write_chunk(Writer, Data)
),
%% Verify convergence.
?assertEqual(
RStatus,
WStatus,
#{reader => NReader, writer => NWriter}
),
case WStatus of
last ->
?assertEqual(ok, emqx_ds_storage_snapshot:release_reader(NReader)),
?assertEqual(ok, emqx_ds_storage_snapshot:release_writer(NWriter)),
ok;
next ->
transfer_snapshot(NReader, NWriter)
end;
{error, Reason} ->
{error, Reason, Reader}
end.
consume(Shard, TopicFilter) ->
consume(Shard, TopicFilter, 0).
consume(Shard, TopicFilter, StartTime) ->
Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime),
lists:flatmap(
fun({_Rank, Stream}) ->
{ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime),
consume_stream(Shard, It)
end,
Streams
).
consume_stream(Shard, It) ->
case emqx_ds_storage_layer:next(Shard, It, 100) of
{ok, _NIt, _Msgs = []} ->
[];
{ok, NIt, Batch} ->
[Msg || {_DSKey, Msg} <- Batch] ++ consume_stream(Shard, NIt);
{ok, end_of_stream} ->
[]
end.
%%
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(TCName, Config) ->
WorkDir = emqx_cth_suite:work_dir(TCName, Config),
Apps = emqx_cth_suite:start(
[{emqx_durable_storage, #{override_env => [{db_data_dir, WorkDir}]}}],
#{work_dir => WorkDir}
),
[{apps, Apps} | Config].
end_per_testcase(_TCName, Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.