diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index a93a94168..79e2f6120 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -21,7 +21,16 @@ -behaviour(supervisor). %% API: --export([start_db/2, start_shard/1, start_egress/1, stop_shard/1, ensure_shard/1, ensure_egress/1]). +-export([ + start_db/2, + start_shard/1, + start_egress/1, + stop_shard/1, + terminate_storage/1, + restart_storage/1, + ensure_shard/1, + ensure_egress/1 +]). -export([which_shards/1]). %% behaviour callbacks: @@ -64,12 +73,22 @@ start_shard({DB, Shard}) -> start_egress({DB, Shard}) -> supervisor:start_child(?via(#?egress_sup{db = DB}), egress_spec(DB, Shard)). --spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. +-spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok. stop_shard(Shard = {DB, _}) -> Sup = ?via(#?shards_sup{db = DB}), ok = supervisor:terminate_child(Sup, Shard), ok = supervisor:delete_child(Sup, Shard). +-spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}. +terminate_storage({DB, Shard}) -> + Sup = ?via(#?shard_sup{db = DB, shard = Shard}), + supervisor:terminate_child(Sup, {Shard, storage}). + +-spec restart_storage(emqx_ds_storage_layer:shard_id()) -> {ok, _Child} | {error, _Reason}. +restart_storage({DB, Shard}) -> + Sup = ?via(#?shard_sup{db = DB, shard = Shard}), + supervisor:restart_child(Sup, {Shard, storage}). + -spec ensure_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}. ensure_shard(Shard) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 72f142b8f..05f75a765 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -65,7 +65,9 @@ -export([ init/1, - apply/3 + apply/3, + + snapshot_module/0 ]). -export_type([ @@ -80,6 +82,10 @@ batch/0 ]). +-export_type([ + ra_state/0 +]). + -include_lib("emqx_utils/include/emqx_message.hrl"). -include("emqx_ds_replication_layer.hrl"). @@ -140,6 +146,20 @@ -type generation_rank() :: {shard_id(), term()}. +%% Core state of the replication, i.e. the state of ra machine. +-type ra_state() :: #{ + db_shard := {emqx_ds:db(), shard_id()}, + latest := timestamp_us() +}. + +%% Command. Each command is an entry in the replication log. +-type ra_command() :: #{ + ?tag := ?BATCH | add_generation | update_config | drop_generation, + _ => _ +}. + +-type timestamp_us() :: non_neg_integer(). + %%================================================================================ %% API functions %%================================================================================ @@ -589,9 +609,12 @@ ra_drop_shard(DB, Shard) -> %% +-spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> #{db_shard => {DB, Shard}, latest => 0}. +-spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) -> + {ra_state(), _Reply, _Effects}. apply( #{index := RaftIdx}, #{ @@ -671,3 +694,6 @@ timestamp_to_timeus(TimestampMs) -> timeus_to_timestamp(TimestampUs) -> TimestampUs div 1000. + +snapshot_module() -> + emqx_ds_replication_snapshot. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 7540e01bb..62a6edab2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -147,19 +147,21 @@ start_shard(DB, Shard, #{replication_options := ReplicationOpts}) -> Bootstrap = false; {error, name_not_registered} -> Bootstrap = true, + Machine = {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, + LogOpts = maps:with( + [ + snapshot_interval, + resend_window + ], + ReplicationOpts + ), ok = ra:start_server(DB, #{ id => LocalServer, uid => <>, cluster_name => ClusterName, initial_members => Servers, - machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, - log_init_args => maps:with( - [ - snapshot_interval, - resend_window - ], - ReplicationOpts - ) + machine => Machine, + log_init_args => LogOpts }) end, case Servers of diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_snapshot.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_snapshot.erl new file mode 100644 index 000000000..ab06dff53 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_snapshot.erl @@ -0,0 +1,229 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_replication_snapshot). + +-include_lib("snabbkaffe/include/trace.hrl"). + +-behaviour(ra_snapshot). +-export([ + prepare/2, + write/3, + + begin_read/2, + read_chunk/3, + + begin_accept/2, + accept_chunk/2, + complete_accept/2, + + recover/1, + validate/1, + read_meta/1 +]). + +%% Read state. +-record(rs, { + phase :: machine_state | storage_snapshot, + started_at :: _Time :: integer(), + state :: emqx_ds_replication_layer:ra_state() | undefined, + reader :: emqx_ds_storage_snapshot:reader() | undefined +}). + +%% Write state. +-record(ws, { + phase :: machine_state | storage_snapshot, + started_at :: _Time :: integer(), + dir :: file:filename(), + meta :: ra_snapshot:meta(), + state :: emqx_ds_replication_layer:ra_state() | undefined, + writer :: emqx_ds_storage_snapshot:writer() | undefined +}). + +-type rs() :: #rs{}. +-type ws() :: #ws{}. + +-type ra_state() :: emqx_ds_replication_layer:ra_state(). + +%% Writing a snapshot. +%% This process is exactly the same as writing a ra log snapshot: store the +%% log meta and the machine state in a single snapshot file. + +-spec prepare(_RaftIndex, ra_state()) -> _State :: ra_state(). +prepare(Index, State) -> + ra_log_snapshot:prepare(Index, State). + +-spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) -> + ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}. +write(Dir, Meta, MachineState) -> + ra_log_snapshot:write(Dir, Meta, MachineState). + +%% Reading a snapshot. +%% This is triggered by the leader when it finds out that a follower is +%% behind so much that there are no log segments covering the gap anymore. +%% This process, on the other hand, MUST involve reading the storage snapshot, +%% (in addition to the log snapshot) to reconstruct the storage state on the +%% target node. + +-spec begin_read(_SnapshotDir :: file:filename(), _Context :: #{}) -> + {ok, ra_snapshot:meta(), rs()} | {error, _Reason :: term()}. +begin_read(Dir, _Context) -> + RS = #rs{ + phase = machine_state, + started_at = erlang:monotonic_time(millisecond) + }, + case ra_log_snapshot:recover(Dir) of + {ok, Meta, MachineState} -> + start_snapshot_reader(Meta, RS#rs{state = MachineState}); + Error -> + Error + end. + +start_snapshot_reader(Meta, RS) -> + ShardId = shard_id(RS), + logger:info(#{ + msg => "dsrepl_snapshot_read_started", + shard => ShardId + }), + {ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(ShardId), + {ok, Meta, RS#rs{reader = SnapReader}}. + +-spec read_chunk(rs(), _Size :: non_neg_integer(), _SnapshotDir :: file:filename()) -> + {ok, binary(), {next, rs()} | last} | {error, _Reason :: term()}. +read_chunk(RS = #rs{phase = machine_state, state = MachineState}, _Size, _Dir) -> + Chunk = term_to_binary(MachineState), + {ok, Chunk, {next, RS#rs{phase = storage_snapshot}}}; +read_chunk(RS = #rs{phase = storage_snapshot, reader = SnapReader0}, Size, _Dir) -> + case emqx_ds_storage_snapshot:read_chunk(SnapReader0, Size) of + {next, Chunk, SnapReader} -> + {ok, Chunk, {next, RS#rs{reader = SnapReader}}}; + {last, Chunk, SnapReader} -> + %% TODO: idempotence? + ?tp(dsrepl_snapshot_read_complete, #{reader => SnapReader}), + _ = complete_read(RS#rs{reader = SnapReader}), + {ok, Chunk, last}; + {error, Reason} -> + ?tp(dsrepl_snapshot_read_error, #{reason => Reason, reader => SnapReader0}), + _ = emqx_ds_storage_snapshot:release_reader(SnapReader0), + error(Reason) + end. + +complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) -> + _ = emqx_ds_storage_snapshot:release_reader(SnapReader), + logger:info(#{ + msg => "dsrepl_snapshot_read_complete", + shard => shard_id(RS), + duration_ms => erlang:monotonic_time(millisecond) - StartedAt, + read_bytes => emqx_ds_storage_snapshot:reader_info(bytes_read, SnapReader) + }). + +%% Accepting a snapshot. +%% This process is triggered by the target server, when the leader finds out +%% that the target server is severely lagging behind. This is receiving side of +%% `begin_read/2` and `read_chunk/3`. + +-spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) -> + {ok, ws()}. +begin_accept(Dir, Meta) -> + WS = #ws{ + phase = machine_state, + started_at = erlang:monotonic_time(millisecond), + dir = Dir, + meta = Meta + }, + {ok, WS}. + +-spec accept_chunk(binary(), ws()) -> + {ok, ws()} | {error, _Reason :: term()}. +accept_chunk(Chunk, WS = #ws{phase = machine_state}) -> + MachineState = binary_to_term(Chunk), + start_snapshot_writer(WS#ws{state = MachineState}); +accept_chunk(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) -> + %% TODO: idempotence? + case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of + {next, SnapWriter} -> + {ok, WS#ws{writer = SnapWriter}}; + {error, Reason} -> + ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}), + _ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0), + error(Reason) + end. + +start_snapshot_writer(WS) -> + ShardId = shard_id(WS), + logger:info(#{ + msg => "dsrepl_snapshot_write_started", + shard => ShardId + }), + _ = emqx_ds_builtin_db_sup:terminate_storage(ShardId), + {ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(ShardId), + {ok, WS#ws{phase = storage_snapshot, writer = SnapWriter}}. + +-spec complete_accept(ws()) -> ok | {error, ra_snapshot:file_err()}. +complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) -> + %% TODO: idempotence? + case emqx_ds_storage_snapshot:write_chunk(SnapWriter0, Chunk) of + {last, SnapWriter} -> + ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}), + _ = emqx_ds_storage_snapshot:release_writer(SnapWriter), + Result = complete_accept(WS#ws{writer = SnapWriter}), + ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}), + Result; + {error, Reason} -> + ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}), + _ = emqx_ds_storage_snapshot:abort_writer(SnapWriter0), + error(Reason) + end. + +complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> + ShardId = shard_id(WS), + logger:info(#{ + msg => "dsrepl_snapshot_read_complete", + shard => ShardId, + duration_ms => erlang:monotonic_time(millisecond) - StartedAt, + bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter) + }), + {ok, _} = emqx_ds_builtin_db_sup:restart_storage(ShardId), + write_machine_snapshot(WS). + +write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) -> + write(Dir, Meta, MachineState). + +%% Restoring machine state from a snapshot. +%% This is equivalent to restoring from a log snapshot. + +-spec recover(_SnapshotDir :: file:filename()) -> + {ok, ra_snapshot:meta(), ra_state()} | {error, _Reason}. +recover(Dir) -> + %% TODO: Verify that storage layer is online? + ra_log_snapshot:recover(Dir). + +-spec validate(_SnapshotDir :: file:filename()) -> + ok | {error, _Reason}. +validate(Dir) -> + ra_log_snapshot:validate(Dir). + +-spec read_meta(_SnapshotDir :: file:filename()) -> + {ok, ra_snapshot:meta()} | {error, _Reason}. +read_meta(Dir) -> + ra_log_snapshot:read_meta(Dir). + +shard_id(#rs{state = MachineState}) -> + shard_id(MachineState); +shard_id(#ws{state = MachineState}) -> + shard_id(MachineState); +shard_id(MachineState) -> + maps:get(db_shard, MachineState). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 521467d8c..130fea3ec 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -19,8 +19,11 @@ %% Replication layer API: -export([ - open_shard/2, + %% Lifecycle + start_link/2, drop_shard/1, + + %% Data store_batch/3, get_streams/3, get_delete_streams/3, @@ -29,14 +32,20 @@ update_iterator/3, next/3, delete_next/4, + + %% Generations update_config/3, add_generation/2, list_generations_with_lifetimes/1, - drop_generation/2 + drop_generation/2, + + %% Snapshotting + take_snapshot/1, + accept_snapshot/1 ]). %% gen_server --export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: -export([db_dir/1]). @@ -229,10 +238,7 @@ -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}). -record(call_list_generations_with_lifetimes, {}). -record(call_drop_generation, {gen_id :: gen_id()}). - --spec open_shard(shard_id(), options()) -> ok. -open_shard(Shard, Options) -> - emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). +-record(call_take_snapshot, {}). -spec drop_shard(shard_id()) -> ok. drop_shard(Shard) -> @@ -244,12 +250,24 @@ drop_shard(Shard) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(Shard, Messages, Options) -> +store_batch(Shard, Messages0, Options) -> %% We always store messages in the current generation: GenId = generation_current(Shard), - #{module := Mod, data := GenData} = generation_get(Shard, GenId), + #{module := Mod, data := GenData, since := Since} = generation_get(Shard, GenId), + case Messages0 of + [{Time, _Msg} | Rest] when Time < Since -> + %% FIXME: log / feedback + Messages = skip_outdated_messages(Since, Rest); + _ -> + Messages = Messages0 + end, Mod:store_batch(Shard, GenData, Messages, Options). +skip_outdated_messages(Since, [{Time, _Msg} | Rest]) when Time < Since -> + skip_outdated_messages(Since, Rest); +skip_outdated_messages(_Since, Messages) -> + Messages. + -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{integer(), stream()}]. get_streams(Shard, TopicFilter, StartTime) -> @@ -436,6 +454,20 @@ list_generations_with_lifetimes(ShardId) -> drop_generation(ShardId, GenId) -> gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity). +-spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}. +take_snapshot(ShardId) -> + case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of + {ok, Dir} -> + emqx_ds_storage_snapshot:new_reader(Dir); + Error -> + Error + end. + +-spec accept_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:writer()} | {error, _Reason}. +accept_snapshot(ShardId) -> + ok = drop_shard(ShardId), + handle_accept_snapshot(ShardId). + %%================================================================================ %% gen_server for the shard %%================================================================================ @@ -505,6 +537,9 @@ handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), commit_metadata(S), {reply, Reply, S}; +handle_call(#call_take_snapshot{}, _From, S) -> + Snapshot = handle_take_snapshot(S), + {reply, Snapshot, S}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -726,7 +761,11 @@ rocksdb_open(Shard, Options) -> -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - filename:join([emqx_ds:base_dir(), atom_to_list(DB), binary_to_list(ShardId)]). + filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]). + +-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename(). +checkpoint_dir({DB, ShardId}, Name) -> + filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]). -spec update_last_until(Schema, emqx_ds:time()) -> Schema | {error, exists | overlaps_existing_generations} @@ -759,6 +798,21 @@ run_post_creation_actions(#{new_gen_runtime_data := NewGenData}) -> %% Different implementation modules NewGenData. +handle_take_snapshot(#s{db = DB, shard_id = ShardId}) -> + Name = integer_to_list(erlang:system_time(millisecond)), + Dir = checkpoint_dir(ShardId, Name), + _ = filelib:ensure_dir(Dir), + case rocksdb:checkpoint(DB, Dir) of + ok -> + {ok, Dir}; + {error, _} = Error -> + Error + end. + +handle_accept_snapshot(ShardId) -> + Dir = db_dir(ShardId), + emqx_ds_storage_snapshot:new_writer(Dir). + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl deleted file mode 100644 index 136669ed2..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ /dev/null @@ -1,88 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_ds_storage_layer_sup). - --behaviour(supervisor). - -%% API: --export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]). - -%% behaviour callbacks: --export([init/1]). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --define(SUP, ?MODULE). - -%%================================================================================ -%% API funcions -%%================================================================================ - --spec start_link() -> {ok, pid()}. -start_link() -> - supervisor:start_link(?MODULE, []). - --spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> - supervisor:startchild_ret(). -start_shard(Shard, Options) -> - supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). - --spec stop_shard(emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. -stop_shard(Shard) -> - ok = supervisor:terminate_child(?SUP, Shard), - ok = supervisor:delete_child(?SUP, Shard). - --spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) -> - ok | {error, _Reason}. -ensure_shard(Shard, Options) -> - case start_shard(Shard, Options) of - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - ok; - {error, Reason} -> - {error, Reason} - end. - -%%================================================================================ -%% behaviour callbacks -%%================================================================================ - -init([]) -> - Children = [], - SupFlags = #{ - strategy => one_for_one, - intensity => 10, - period => 10 - }, - {ok, {SupFlags, Children}}. - -%%================================================================================ -%% Internal functions -%%================================================================================ - --spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> - supervisor:child_spec(). -shard_child_spec(Shard, Options) -> - #{ - id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard, Options]}, - shutdown => 5_000, - restart => permanent, - type => worker - }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_snapshot.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_snapshot.erl new file mode 100644 index 000000000..ab605e40e --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_snapshot.erl @@ -0,0 +1,325 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_snapshot). + +-include_lib("kernel/include/file.hrl"). + +-export([ + new_reader/1, + read_chunk/2, + abort_reader/1, + release_reader/1, + reader_info/2 +]). + +-export([ + new_writer/1, + write_chunk/2, + abort_writer/1, + release_writer/1, + writer_info/2 +]). + +-export_type([ + reader/0, + writer/0 +]). + +%% + +-define(FILECHUNK(RELPATH, POS, MORE), #{ + '$' => chunk, + rp => RELPATH, + pos => POS, + more => MORE +}). +-define(PAT_FILECHUNK(RELPATH, POS, MORE), #{ + '$' := chunk, + rp := RELPATH, + pos := POS, + more := MORE +}). + +-define(EOS(), #{ + '$' => eos +}). +-define(PAT_EOS(), #{ + '$' := eos +}). + +-define(PAT_HEADER(), #{'$' := _}). + +%% + +-record(reader, { + dirpath :: file:filename(), + files :: #{_RelPath => reader_file()}, + queue :: [_RelPath :: file:filename()] +}). + +-record(rfile, { + abspath :: file:filename(), + fd :: file:io_device() | eof, + pos :: non_neg_integer() +}). + +-opaque reader() :: #reader{}. +-type reader_file() :: #rfile{}. + +-type reason() :: {atom(), _AbsPath :: file:filename(), _Details :: term()}. + +%% @doc Initialize a reader for a snapshot directory. +%% Snapshot directory is a directory containing arbitrary number of regular +%% files in arbitrary subdirectory structure. Files are read in indeterminate +%% order. It's an error to have non-regular files in the directory (e.g. symlinks). +-spec new_reader(_Dir :: file:filename()) -> {ok, reader()}. +new_reader(DirPath) -> + %% NOTE + %% Opening all files at once, so there would be less error handling later + %% during transfer. + %% TODO + %% Beware of how errors are handled: if one file fails to open, the whole + %% process will exit. This is fine for the purpose of replication (because + %% ra spawns separate process for each transfer), but may not be suitable + %% for other use cases. + Files = emqx_utils_fs:traverse_dir( + fun(Path, Info, Acc) -> new_reader_file(Path, Info, DirPath, Acc) end, + #{}, + DirPath + ), + {ok, #reader{ + dirpath = DirPath, + files = Files, + queue = maps:keys(Files) + }}. + +new_reader_file(Path, #file_info{type = regular}, DirPath, Acc) -> + case file:open(Path, [read, binary, raw]) of + {ok, IoDev} -> + RelPath = emqx_utils_fs:find_relpath(Path, DirPath), + File = #rfile{abspath = Path, fd = IoDev, pos = 0}, + Acc#{RelPath => File}; + {error, Reason} -> + error({open_failed, Path, Reason}) + end; +new_reader_file(Path, #file_info{type = Type}, _, _Acc) -> + error({bad_file_type, Path, Type}); +new_reader_file(Path, {error, Reason}, _, _Acc) -> + error({inaccessible, Path, Reason}). + +%% @doc Read a chunk of data from the snapshot. +%% Returns `{last, Chunk, Reader}` when the last chunk is read. After that, one +%% should call `release_reader/1` to finalize the process (or `abort_reader/1` if +%% keeping the snapshot is desired). +-spec read_chunk(reader(), _Size :: non_neg_integer()) -> + {last | next, _Chunk :: iodata(), reader()} | {error, reason()}. +read_chunk(R = #reader{files = Files, queue = [RelPath | Rest]}, Size) -> + File = maps:get(RelPath, Files), + case read_chunk_file(RelPath, File, Size) of + {last, Chunk, FileRest} -> + {next, Chunk, R#reader{files = Files#{RelPath := FileRest}, queue = Rest}}; + {next, Chunk, FileRest} -> + {next, Chunk, R#reader{files = Files#{RelPath := FileRest}}}; + Error -> + Error + end; +read_chunk(R = #reader{queue = []}, _Size) -> + {last, make_packet(?EOS()), R}. + +read_chunk_file(RelPath, RFile0 = #rfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Size) -> + case file:read(IoDev, Size) of + {ok, Chunk} -> + ChunkSize = byte_size(Chunk), + HasMore = ChunkSize div Size, + RFile1 = RFile0#rfile{pos = Pos + ChunkSize}, + case ChunkSize < Size of + false -> + Status = next, + RFile = RFile1; + true -> + Status = last, + RFile = release_reader_file(RFile1) + end, + Packet = make_packet(?FILECHUNK(RelPath, Pos, HasMore), Chunk), + {Status, Packet, RFile}; + eof -> + Packet = make_packet(?FILECHUNK(RelPath, Pos, 0)), + {last, Packet, release_reader_file(RFile0)}; + {error, Reason} -> + {error, {read_failed, AbsPath, Reason}} + end. + +%% @doc Aborts the snapshot reader, but does not release the snapshot files. +-spec abort_reader(reader()) -> ok. +abort_reader(#reader{files = Files}) -> + lists:foreach(fun release_reader_file/1, maps:values(Files)). + +%% @doc Aborts the snapshot reader and deletes the snapshot files. +-spec release_reader(reader()) -> ok. +release_reader(R = #reader{dirpath = DirPath}) -> + ok = abort_reader(R), + file:del_dir_r(DirPath). + +release_reader_file(RFile = #rfile{fd = eof}) -> + RFile; +release_reader_file(RFile = #rfile{fd = IoDev}) -> + _ = file:close(IoDev), + RFile#rfile{fd = eof}. + +-spec reader_info(bytes_read, reader()) -> _Bytes :: non_neg_integer(). +reader_info(bytes_read, #reader{files = Files}) -> + maps:fold(fun(_, RFile, Sum) -> Sum + RFile#rfile.pos end, 0, Files). + +%% + +-record(writer, { + dirpath :: file:filename(), + files :: #{_RelPath :: file:filename() => writer_file()} +}). + +-record(wfile, { + abspath :: file:filename(), + fd :: file:io_device() | eof, + pos :: non_neg_integer() +}). + +-opaque writer() :: #writer{}. +-type writer_file() :: #wfile{}. + +%% @doc Initialize a writer into a snapshot directory. +%% The directory needs not to exist, it will be created if it doesn't. +%% Having non-empty directory is not an error, existing files will be +%% overwritten. +-spec new_writer(_Dir :: file:filename()) -> {ok, writer()} | {error, reason()}. +new_writer(DirPath) -> + case filelib:ensure_path(DirPath) of + ok -> + {ok, #writer{dirpath = DirPath, files = #{}}}; + {error, Reason} -> + {error, {mkdir_failed, DirPath, Reason}} + end. + +%% @doc Write a chunk of data to the snapshot. +%% Returns `{last, Writer}` when the last chunk is written. After that, one +%% should call `release_writer/1` to finalize the process. +-spec write_chunk(writer(), _Chunk :: binary()) -> + {last | next, writer()} | {error, _Reason}. +write_chunk(W, Packet) -> + case parse_packet(Packet) of + {?PAT_FILECHUNK(RelPath, Pos, More), Chunk} -> + write_chunk(W, RelPath, Pos, More, Chunk); + {?PAT_EOS(), _Rest} -> + %% TODO: Verify all files are `eof` at this point? + {last, W}; + Error -> + Error + end. + +write_chunk(W = #writer{files = Files}, RelPath, Pos, More, Chunk) -> + case Files of + #{RelPath := WFile} -> + write_chunk(W, WFile, RelPath, Pos, More, Chunk); + #{} when Pos == 0 -> + case new_writer_file(W, RelPath) of + WFile = #wfile{} -> + write_chunk(W, WFile, RelPath, Pos, More, Chunk); + Error -> + Error + end; + #{} -> + {error, {bad_chunk, RelPath, Pos}} + end. + +write_chunk(W = #writer{files = Files}, WFile0, RelPath, Pos, More, Chunk) -> + case write_chunk_file(WFile0, Pos, More, Chunk) of + WFile = #wfile{} -> + {next, W#writer{files = Files#{RelPath => WFile}}}; + Error -> + Error + end. + +new_writer_file(#writer{dirpath = DirPath}, RelPath) -> + AbsPath = filename:join(DirPath, RelPath), + _ = filelib:ensure_dir(AbsPath), + case file:open(AbsPath, [write, binary, raw]) of + {ok, IoDev} -> + #wfile{ + abspath = AbsPath, + fd = IoDev, + pos = 0 + }; + {error, Reason} -> + {error, {open_failed, AbsPath, Reason}} + end. + +write_chunk_file(WFile0 = #wfile{fd = IoDev, pos = Pos, abspath = AbsPath}, Pos, More, Chunk) -> + ChunkSize = byte_size(Chunk), + case file:write(IoDev, Chunk) of + ok -> + WFile1 = WFile0#wfile{pos = Pos + ChunkSize}, + case More of + 0 -> release_writer_file(WFile1); + _ -> WFile1 + end; + {error, Reason} -> + {error, {write_failed, AbsPath, Reason}} + end; +write_chunk_file(WFile = #wfile{pos = WPos}, Pos, _More, _Chunk) when Pos < WPos -> + WFile; +write_chunk_file(#wfile{abspath = AbsPath}, Pos, _More, _Chunk) -> + {error, {bad_chunk, AbsPath, Pos}}. + +%% @doc Abort the writer and clean up unfinished snapshot files. +-spec abort_writer(writer()) -> ok | {error, file:posix()}. +abort_writer(W = #writer{dirpath = DirPath}) -> + ok = release_writer(W), + file:del_dir_r(DirPath). + +%% @doc Release the writer and close all snapshot files. +-spec release_writer(writer()) -> ok. +release_writer(#writer{files = Files}) -> + ok = lists:foreach(fun release_writer_file/1, maps:values(Files)). + +release_writer_file(WFile = #wfile{fd = eof}) -> + WFile; +release_writer_file(WFile = #wfile{fd = IoDev}) -> + _ = file:close(IoDev), + WFile#wfile{fd = eof}. + +-spec writer_info(bytes_written, writer()) -> _Bytes :: non_neg_integer(). +writer_info(bytes_written, #writer{files = Files}) -> + maps:fold(fun(_, WFile, Sum) -> Sum + WFile#wfile.pos end, 0, Files). + +%% + +make_packet(Header) -> + term_to_binary(Header). + +make_packet(Header, Rest) -> + HeaderBytes = term_to_binary(Header), + <>. + +parse_packet(Packet) -> + try binary_to_term(Packet, [safe, used]) of + {Header = ?PAT_HEADER(), Length} -> + Rest = binary:part(Packet, Length, byte_size(Packet) - Length), + {Header, Rest}; + {Header, _} -> + {error, {bad_header, Header}} + catch + error:badarg -> + {error, bad_packet} + end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl new file mode 100644 index 000000000..d1f98b3c3 --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -0,0 +1,202 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_replication_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +-define(DB, testdb). + +opts() -> + #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}}, + n_shards => 1, + n_sites => 3, + replication_factor => 3, + replication_options => #{ + wal_max_size_bytes => 128 * 1024, + wal_max_batch_size => 1024, + snapshot_interval => 128 + } + }. + +t_replication_transfers_snapshots(Config) -> + NMsgs = 4000, + Nodes = [Node, NodeOffline | _] = ?config(nodes, Config), + _Specs = [_, SpecOffline | _] = ?config(specs, Config), + + %% Initialize DB on all nodes and wait for it to be online. + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, opts()]) + ), + ?retry( + 500, + 10, + ?assertMatch([_], shards_online(Node, ?DB)) + ), + + %% Stop the DB on the "offline" node. + ok = emqx_cth_cluster:stop_node(NodeOffline), + + %% Fill the storage with messages and few additional generations. + Messages = fill_storage(Node, ?DB, NMsgs, #{p_addgen => 0.01}), + + %% Restart the node. + [NodeOffline] = emqx_cth_cluster:restart(SpecOffline), + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := dsrepl_snapshot_accepted, + ?snk_meta := #{node := NodeOffline} + }) + ), + ?assertEqual( + ok, + erpc:call(NodeOffline, emqx_ds, open_db, [?DB, opts()]) + ), + + %% Trigger storage operation and wait the replica to be restored. + _ = add_generation(Node, ?DB), + ?assertMatch( + {ok, _}, + snabbkaffe:receive_events(SRef) + ), + + %% Wait until any pending replication activities are finished (e.g. Raft log entries). + ok = timer:sleep(3_000), + + %% Check that the DB has been restored. + Shard = hd(shards(NodeOffline, ?DB)), + MessagesOffline = lists:keysort( + #message.timestamp, + consume(NodeOffline, ?DB, Shard, ['#'], 0) + ), + ?assertEqual( + sample(40, Messages), + sample(40, MessagesOffline) + ), + ?assertEqual( + Messages, + MessagesOffline + ). + +shards(Node, DB) -> + erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). + +shards_online(Node, DB) -> + erpc:call(Node, emqx_ds_builtin_db_sup, which_shards, [DB]). + +fill_storage(Node, DB, NMsgs, Opts) -> + fill_storage(Node, DB, NMsgs, 0, Opts). + +fill_storage(Node, DB, NMsgs, I, Opts = #{p_addgen := PAddGen}) when I < NMsgs -> + R1 = push_message(Node, DB, I), + R2 = probably(PAddGen, fun() -> add_generation(Node, DB) end), + R1 ++ R2 ++ fill_storage(Node, DB, NMsgs, I + 1, Opts); +fill_storage(_Node, _DB, NMsgs, NMsgs, _Opts) -> + []. + +push_message(Node, DB, I) -> + Topic = emqx_topic:join([<<"topic">>, <<"foo">>, integer_to_binary(I)]), + {Bytes, _} = rand:bytes_s(120, rand:seed_s(default, I)), + Message = message(Topic, Bytes, I * 100), + ok = erpc:call(Node, emqx_ds, store_batch, [DB, [Message], #{sync => true}]), + [Message]. + +add_generation(Node, DB) -> + ok = erpc:call(Node, emqx_ds, add_generation, [DB]), + []. + +message(Topic, Payload, PublishedAt) -> + #message{ + from = <>, + topic = Topic, + payload = Payload, + timestamp = PublishedAt, + id = emqx_guid:gen() + }. + +consume(Node, DB, Shard, TopicFilter, StartTime) -> + Streams = erpc:call(Node, emqx_ds_storage_layer, get_streams, [ + {DB, Shard}, TopicFilter, StartTime + ]), + lists:flatmap( + fun({_Rank, Stream}) -> + {ok, It} = erpc:call(Node, emqx_ds_storage_layer, make_iterator, [ + {DB, Shard}, Stream, TopicFilter, StartTime + ]), + consume_stream(Node, DB, Shard, It) + end, + Streams + ). + +consume_stream(Node, DB, Shard, It) -> + case erpc:call(Node, emqx_ds_storage_layer, next, [{DB, Shard}, It, 100]) of + {ok, _NIt, _Msgs = []} -> + []; + {ok, NIt, Batch} -> + [Msg || {_Key, Msg} <- Batch] ++ consume_stream(Node, DB, Shard, NIt); + {ok, end_of_stream} -> + [] + end. + +probably(P, Fun) -> + case rand:uniform() of + X when X < P -> Fun(); + _ -> [] + end. + +sample(N, List) -> + L = length(List), + H = N div 2, + Filler = integer_to_list(L - N) ++ " more", + lists:sublist(List, H) ++ [Filler] ++ lists:sublist(List, L - H, L). + +%% + +suite() -> [{timetrap, {seconds, 60}}]. + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_testcase(TCName, Config) -> + Apps = [ + {emqx_durable_storage, #{ + before_start => fun snabbkaffe:fix_ct_logging/0, + override_env => [{egress_flush_interval, 1}] + }} + ], + WorkDir = emqx_cth_suite:work_dir(TCName, Config), + NodeSpecs = emqx_cth_cluster:mk_nodespecs( + [ + {emqx_ds_replication_SUITE1, #{apps => Apps}}, + {emqx_ds_replication_SUITE2, #{apps => Apps}}, + {emqx_ds_replication_SUITE3, #{apps => Apps}} + ], + #{work_dir => WorkDir} + ), + Nodes = emqx_cth_cluster:start(NodeSpecs), + ok = snabbkaffe:start_trace(), + [{nodes, Nodes}, {specs, NodeSpecs} | Config]. + +end_per_testcase(_TCName, Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_cluster:stop(?config(nodes, Config)). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_snapshot_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_snapshot_SUITE.erl new file mode 100644 index 000000000..a081267bc --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_snapshot_SUITE.erl @@ -0,0 +1,149 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_snapshot_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +opts() -> + #{storage => {emqx_ds_storage_bitfield_lts, #{}}}. + +%% + +t_snapshot_take_restore(_Config) -> + Shard = {?FUNCTION_NAME, _ShardId = <<"42">>}, + {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), + + %% Push some messages to the shard. + Msgs1 = [gen_message(N) || N <- lists:seq(1000, 2000)], + ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs1), #{})), + + %% Add new generation and push some more. + ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 3000)), + Msgs2 = [gen_message(N) || N <- lists:seq(4000, 5000)], + ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs2), #{})), + ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, 6000)), + + %% Take a snapshot of the shard. + {ok, SnapReader} = emqx_ds_storage_layer:take_snapshot(Shard), + + %% Push even more messages to the shard AFTER taking the snapshot. + Msgs3 = [gen_message(N) || N <- lists:seq(7000, 8000)], + ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, mk_batch(Msgs3), #{})), + + %% Destroy the shard. + _ = unlink(Pid), + ok = proc_lib:stop(Pid, shutdown, infinity), + ok = emqx_ds_storage_layer:drop_shard(Shard), + + %% Restore the shard from the snapshot. + {ok, SnapWriter} = emqx_ds_storage_layer:accept_snapshot(Shard), + ?assertEqual(ok, transfer_snapshot(SnapReader, SnapWriter)), + + %% Verify that the restored shard contains the messages up until the snapshot. + {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), + ?assertEqual( + Msgs1 ++ Msgs2, + lists:keysort(#message.timestamp, consume(Shard, ['#'])) + ). + +mk_batch(Msgs) -> + [{emqx_message:timestamp(Msg, microsecond), Msg} || Msg <- Msgs]. + +gen_message(N) -> + Topic = emqx_topic:join([<<"foo">>, <<"bar">>, integer_to_binary(N)]), + message(Topic, integer_to_binary(N), N * 100). + +message(Topic, Payload, PublishedAt) -> + #message{ + from = <>, + topic = Topic, + payload = Payload, + timestamp = PublishedAt, + id = emqx_guid:gen() + }. + +transfer_snapshot(Reader, Writer) -> + ChunkSize = rand:uniform(1024), + case emqx_ds_storage_snapshot:read_chunk(Reader, ChunkSize) of + {RStatus, Chunk, NReader} -> + Data = iolist_to_binary(Chunk), + {WStatus, NWriter} = emqx_ds_storage_snapshot:write_chunk(Writer, Data), + %% Verify idempotency. + ?assertEqual( + {WStatus, NWriter}, + emqx_ds_storage_snapshot:write_chunk(Writer, Data) + ), + %% Verify convergence. + ?assertEqual( + RStatus, + WStatus, + #{reader => NReader, writer => NWriter} + ), + case WStatus of + last -> + ?assertEqual(ok, emqx_ds_storage_snapshot:release_reader(NReader)), + ?assertEqual(ok, emqx_ds_storage_snapshot:release_writer(NWriter)), + ok; + next -> + transfer_snapshot(NReader, NWriter) + end; + {error, Reason} -> + {error, Reason, Reader} + end. + +consume(Shard, TopicFilter) -> + consume(Shard, TopicFilter, 0). + +consume(Shard, TopicFilter, StartTime) -> + Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime), + lists:flatmap( + fun({_Rank, Stream}) -> + {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime), + consume_stream(Shard, It) + end, + Streams + ). + +consume_stream(Shard, It) -> + case emqx_ds_storage_layer:next(Shard, It, 100) of + {ok, _NIt, _Msgs = []} -> + []; + {ok, NIt, Batch} -> + [Msg || {_DSKey, Msg} <- Batch] ++ consume_stream(Shard, NIt); + {ok, end_of_stream} -> + [] + end. + +%% + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_testcase(TCName, Config) -> + WorkDir = emqx_cth_suite:work_dir(TCName, Config), + Apps = emqx_cth_suite:start( + [{emqx_durable_storage, #{override_env => [{db_data_dir, WorkDir}]}}], + #{work_dir => WorkDir} + ), + [{apps, Apps} | Config]. + +end_per_testcase(_TCName, Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok.