From b6894c18fa034a636d683a84f6b9eee9cb495043 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 10 Jun 2024 18:21:47 +0200 Subject: [PATCH 01/19] chore(dsrepl): improve tracepoints usability a bit --- .../src/emqx_ds_replication_layer.erl | 13 +++++++------ .../src/emqx_ds_replication_snapshot.erl | 6 ++++-- .../src/emqx_ds_storage_layer.erl | 6 +++--- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 0a1173e70..34cb5fc4d 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -143,7 +143,12 @@ %% Core state of the replication, i.e. the state of ra machine. -type ra_state() :: #{ + %% Shard ID. db_shard := {emqx_ds:db(), shard_id()}, + + %% Unique timestamp tracking real time closely. + %% With microsecond granularity it should be nearly impossible for it to run + %% too far ahead of the real time clock. latest := timestamp_us() }. @@ -755,11 +760,7 @@ apply( }, #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> - %% NOTE - %% Unique timestamp tracking real time closely. - %% With microsecond granularity it should be nearly impossible for it to run - %% too far ahead than the real time clock. - ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}), + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), State = State0#{latest := Latest}, @@ -839,7 +840,7 @@ apply( tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest), - ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), + ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl index 9267aee77..58b8e4f0a 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl @@ -69,6 +69,7 @@ prepare(Index, State) -> -spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) -> ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}. write(Dir, Meta, MachineState) -> + ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}), ra_log_snapshot:write(Dir, Meta, MachineState). %% Reading a snapshot. @@ -165,6 +166,7 @@ complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) -> -spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) -> {ok, ws()}. begin_accept(Dir, Meta) -> + ?tp(dsrepl_snapshot_accept_started, #{meta => Meta}), WS = #ws{ phase = machine_state, started_at = erlang:monotonic_time(millisecond), @@ -207,7 +209,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}), _ = emqx_ds_storage_snapshot:release_writer(SnapWriter), Result = complete_accept(WS#ws{writer = SnapWriter}), - ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}), + ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS), state => WS#ws.state}), Result; {error, Reason} -> ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}), @@ -218,7 +220,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> ShardId = shard_id(WS), logger:info(#{ - msg => "dsrepl_snapshot_read_complete", + msg => "dsrepl_snapshot_write_complete", shard => ShardId, duration_ms => erlang:monotonic_time(millisecond) - StartedAt, bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 818d0bcb7..c31a6bc52 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -312,9 +312,6 @@ store_batch(Shard, Messages, Options) -> prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. - ?tp(emqx_ds_storage_layer_prepare_batch, #{ - shard => Shard, messages => Messages, options => Options - }), %% FIXME: always store messages in the current generation case generation_at(Shard, Time) of {GenId, #{module := Mod, data := GenData}} -> @@ -322,6 +319,9 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> Result = case Mod:prepare_batch(Shard, GenData, Messages, Options) of {ok, CookedBatch} -> + ?tp(emqx_ds_storage_layer_batch_cooked, #{ + shard => Shard, gen => GenId, batch => CookedBatch + }), {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> Error From 2705226eb51676618b3d81e7fdf8f878ee4b8e45 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 10 Jun 2024 18:23:34 +0200 Subject: [PATCH 02/19] feat(dsrepl): release log entries occasionally Also make tracepoints in `apply/3` callback implementation more uniform. --- .../src/emqx_ds_replication_layer.erl | 72 +++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 34cb5fc4d..fe7da3f8f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -579,6 +579,10 @@ list_nodes() -> %% Too large for normal operation, need better backpressure mechanism. -define(RA_TIMEOUT, 60 * 1000). +%% How often to release Raft logs? +%% Each N log entries mark everything up to the last N entries "releasable". +-define(RA_RELEASE_LOG_FREQ, 1000). + -define(SAFE_ERPC(EXPR), try EXPR @@ -746,6 +750,8 @@ ra_drop_shard(DB, Shard) -> %% +-define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). + -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> #{db_shard => {DB, Shard}, latest => 0}. @@ -753,7 +759,7 @@ init(#{db := DB, shard := Shard}) -> -spec apply(ra_machine:command_meta_data(), ra_command(), ra_state()) -> {ra_state(), _Reply, _Effects}. apply( - #{index := RaftIdx}, + RaftMeta, #{ ?tag := ?BATCH, ?batch_messages := MessagesIn @@ -765,17 +771,17 @@ apply( Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - %% TODO: Need to measure effects of changing frequency of `release_cursor`. - Effect = {release_cursor, RaftIdx, State}, - {State, Result, Effect}; + Effects = try_release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( - _RaftMeta, + RaftMeta, #{?tag := add_generation, ?since := Since}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> ?tp( info, - ds_replication_layer_add_generation, + ds_ra_add_generation, #{ shard => DBShard, since => Since @@ -785,15 +791,17 @@ apply( Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - {State, Result}; + Effects = release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( - _RaftMeta, + RaftMeta, #{?tag := update_config, ?since := Since, ?config := Opts}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> ?tp( notice, - ds_replication_layer_update_config, + ds_ra_update_config, #{ shard => DBShard, config => Opts, @@ -803,7 +811,9 @@ apply( {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), State = State0#{latest := Latest}, - {State, Result}; + Effects = release_log(RaftMeta, State), + Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), + {State, Result, Effects}; apply( _RaftMeta, #{?tag := drop_generation, ?generation := GenId}, @@ -811,7 +821,7 @@ apply( ) -> ?tp( info, - ds_replication_layer_drop_generation, + ds_ra_drop_generation, #{ shard => DBShard, generation => GenId @@ -828,7 +838,7 @@ apply( set_ts(DBShard, Latest), ?tp( debug, - emqx_ds_replication_layer_storage_event, + ds_ra_storage_event, #{ shard => DBShard, payload => CustomEvent, latest => Latest } @@ -836,6 +846,44 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. +try_release_log(RaftMeta = #{index := CurrentIdx}, State) -> + %% NOTE + %% Release everything up to the last log entry, but only if there were more than + %% `?RA_RELEASE_LOG_FREQ` new entries since the last release. + case get_log_need_release(RaftMeta) of + undefined -> + []; + PrevIdx when ?RA_RELEASE_LOG_FREQ > CurrentIdx - PrevIdx -> + []; + _PrevIdx -> + %% TODO + %% Number of log entries is not the best metric. Because cursor release + %% means storage flush (see `emqx_ds_replication_snapshot:write/3`), we + %% should do that not too often (so the storage is happy with L0 SST size) + %% and not too rarely (so we don't accumulate huge Raft logs). + release_log(RaftMeta, State) + end. + +release_log(RaftMeta = #{index := CurrentIdx}, State) -> + %% NOTE + %% Release everything up to the last log entry. This is important: any log entries + %% following `CurrentIdx` should not contribute to `State` (that will be recovered + %% from a snapshot). + update_log_need_release(RaftMeta), + {release_cursor, CurrentIdx, State}. + +get_log_need_release(RaftMeta) -> + case erlang:get(?pd_ra_idx_need_release) of + undefined -> + update_log_need_release(RaftMeta), + undefined; + LastIdx -> + LastIdx + end. + +update_log_need_release(#{index := CurrentIdx}) -> + erlang:put(?pd_ra_idx_need_release, CurrentIdx). + -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), From 0c0757b8c20ce939b1e5fdd1faf30568906b87dd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 10 Jun 2024 18:27:02 +0200 Subject: [PATCH 03/19] feat(dsrepl): enable WAL-less batch writes --- .../src/emqx_ds_replication_snapshot.erl | 3 +- .../src/emqx_ds_storage_bitfield_lts.erl | 2 +- .../src/emqx_ds_storage_layer.erl | 40 +++++++++++++++++++ .../src/emqx_ds_storage_reference.erl | 2 +- 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl index 58b8e4f0a..3a62b3b0f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl @@ -70,6 +70,7 @@ prepare(Index, State) -> ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}. write(Dir, Meta, MachineState) -> ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}), + ok = emqx_ds_storage_layer:flush(shard_id(MachineState)), ra_log_snapshot:write(Dir, Meta, MachineState). %% Reading a snapshot. @@ -229,7 +230,7 @@ complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> write_machine_snapshot(WS). write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) -> - write(Dir, Meta, MachineState). + ra_log_snapshot:write(Dir, Meta, MachineState). %% Restoring machine state from a snapshot. %% This is equivalent to restoring from a log snapshot. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index c978e416f..d73f3f2ec 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -326,7 +326,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, []), + Result = rocksdb:write_batch(DB, Batch, [{disable_wal, true}]), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index c31a6bc52..df1c822c4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -44,6 +44,7 @@ drop_generation/2, %% Snapshotting + flush/1, take_snapshot/1, accept_snapshot/1, @@ -279,6 +280,7 @@ -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}). -record(call_list_generations_with_lifetimes, {}). -record(call_drop_generation, {gen_id :: gen_id()}). +-record(call_flush, {}). -record(call_take_snapshot, {}). -spec drop_shard(shard_id()) -> ok. @@ -539,6 +541,10 @@ shard_info(ShardId, status) -> error:badarg -> down end. +-spec flush(shard_id()) -> ok | {error, _}. +flush(ShardId) -> + gen_server:call(?REF(ShardId), #call_flush{}, infinity). + -spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}. take_snapshot(ShardId) -> case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of @@ -566,6 +572,7 @@ start_link(Shard = {_, _}, Options) -> shard_id :: shard_id(), db :: rocksdb:db_handle(), cf_refs :: cf_refs(), + cf_need_flush :: gen_id(), schema :: shard_schema(), shard :: shard() }). @@ -591,10 +598,12 @@ init({ShardId, Options}) -> {Scm, CFRefs0} end, Shard = open_shard(ShardId, DB, CFRefs, Schema), + CurrentGenId = maps:get(current_generation, Schema), S = #s{ shard_id = ShardId, db = DB, cf_refs = CFRefs, + cf_need_flush = CurrentGenId, schema = Schema, shard = Shard }, @@ -635,6 +644,9 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), {reply, Reply, S}; +handle_call(#call_flush{}, _From, S0) -> + {Reply, S} = handle_flush(S0), + {reply, Reply, S}; handle_call(#call_take_snapshot{}, _From, S) -> Snapshot = handle_take_snapshot(S), {reply, Snapshot, S}; @@ -866,6 +878,10 @@ rocksdb_open(Shard, Options) -> DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true}, + %% NOTE + %% With WAL-less writes, it's important to have CFs flushed atomically. + %% For example, bitfield-lts backend needs data + trie CFs to be consistent. + {atomic_flush, true}, {enable_write_thread_adaptive_yield, false} | maps:get(db_options, Options, []) ], @@ -921,6 +937,30 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. +handle_flush(S = #s{db = DB, cf_need_flush = NeedFlushGenId, schema = Schema}) -> + %% NOTE + %% There could have been few generations added since the last time `flush/1` was + %% called. Strictly speaking, we don't need to flush them all at once as part of + %% a single atomic flush, but the error handling is a bit easier this way. + CurrentGenId = maps:get(current_generation, Schema), + GenIds = lists:seq(NeedFlushGenId, CurrentGenId), + CFHandles = lists:flatmap( + fun(GenId) -> + #{?GEN_KEY(GenId) := #{cf_refs := CFRefs}} = Schema, + {_, CFHandles} = lists:unzip(CFRefs), + CFHandles + end, + GenIds + ), + case rocksdb:flush(DB, CFHandles, [{wait, true}]) of + ok -> + %% Current generation will always need a flush. + ?tp(ds_storage_flush_complete, #{gens => GenIds, cfs => CFHandles}), + {ok, S#s{cf_need_flush = CurrentGenId}}; + {error, _} = Error -> + {Error, S} + end. + handle_take_snapshot(#s{db = DB, shard_id = ShardId}) -> Name = integer_to_list(erlang:system_time(millisecond)), Dir = checkpoint_dir(ShardId, Name), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index b4c3ade3f..c70b80e68 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + Res = rocksdb:write_batch(DB, Batch, _WriteOptions = [{disable_wal, true}]), rocksdb:release_batch(Batch), Res. From 7895e9cc45934ddeff1b50add6b11f62ff0d3c1c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 11 Jun 2024 14:58:34 +0200 Subject: [PATCH 04/19] feat(dsstore): make WAL-less mode optional And make the upper layer choose when to use it. --- .../src/emqx_ds_replication_layer.erl | 3 +- .../src/emqx_ds_storage_bitfield_lts.erl | 20 +++--- .../src/emqx_ds_storage_layer.erl | 62 ++++++++++++++----- .../src/emqx_ds_storage_reference.erl | 10 +-- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 16 ++--- 5 files changed, 74 insertions(+), 37 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index fe7da3f8f..af0aeec7d 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -57,6 +57,7 @@ ra_store_batch/3 ]). +-behaviour(ra_machine). -export([ init/1, apply/3, @@ -768,7 +769,7 @@ apply( ) -> ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), - Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), + Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), Effects = try_release_log(RaftMeta, State), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d73f3f2ec..9c3a930eb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,8 +28,8 @@ create/5, open/5, drop/5, - prepare_batch/4, - commit_batch/3, + prepare_batch/3, + commit_batch/4, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -269,11 +269,10 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}, ...], - emqx_ds:message_store_opts() + [{emqx_ds:time(), emqx_types:message()}, ...] ) -> {ok, cooked_batch()}. -prepare_batch(_ShardId, S, Messages, _Options) -> +prepare_batch(_ShardId, S, Messages) -> _ = erase(?lts_persist_ops), {Payloads, MaxTs} = lists:mapfoldl( @@ -294,12 +293,14 @@ prepare_batch(_ShardId, S, Messages, _Options) -> -spec commit_batch( emqx_ds_storage_layer:shard_id(), s(), - cooked_batch() + cooked_batch(), + emqx_ds_storage_layer:db_write_opts() ) -> ok | emqx_ds:error(_). commit_batch( _ShardId, _Data, - #{?cooked_payloads := [], ?cooked_lts_ops := LTS} + #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, + _WriteOpts ) -> %% Assert: [] = LTS, @@ -307,7 +308,8 @@ commit_batch( commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, + WriteOpts ) -> {ok, Batch} = rocksdb:batch(), %% Commit LTS trie to the storage: @@ -326,7 +328,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, [{disable_wal, true}]), + Result = rocksdb:write_batch(DB, Batch, WriteOpts), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index df1c822c4..d8624f56b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -27,7 +27,7 @@ %% Data store_batch/3, prepare_batch/3, - commit_batch/2, + commit_batch/3, get_streams/3, get_delete_streams/3, @@ -70,7 +70,9 @@ shard_id/0, options/0, prototype/0, - cooked_batch/0 + cooked_batch/0, + batch_store_opts/0, + db_write_opts/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -113,6 +115,23 @@ -type gen_id() :: 0..16#ffff. +%% Options affecting how batches should be stored. +%% See also: `emqx_ds:message_store_opts()'. +-type batch_store_opts() :: + #{ + %% Whether the whole batch given to `store_batch' should be inserted atomically as + %% a unit. Default: `false'. + atomic => boolean(), + %% Should the storage make sure that the batch is written durably? Non-durable + %% writes are in general unsafe but require much less resources, i.e. with RocksDB + %% non-durable (WAL-less) writes do not usually involve _any_ disk I/O. + %% Default: `true'. + durable => boolean() + }. + +%% Options affecting how batches should be prepared. +-type batch_prepare_opts() :: #{}. + %% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6 -opaque stream_v1() :: #{ @@ -203,6 +222,9 @@ %% Generation callbacks %%================================================================================ +%% See: `rocksdb:write_options()'. +-type db_write_opts() :: [_Option]. + %% Create the new schema given generation id and the options. %% Create rocksdb column families. -callback create( @@ -225,15 +247,15 @@ -callback prepare_batch( shard_id(), generation_data(), - [{emqx_ds:time(), emqx_types:message()}, ...], - emqx_ds:message_store_opts() + [{emqx_ds:time(), emqx_types:message()}, ...] ) -> {ok, term()} | emqx_ds:error(_). -callback commit_batch( shard_id(), generation_data(), - _CookedBatch + _CookedBatch, + db_write_opts() ) -> ok | emqx_ds:error(_). -callback get_streams( @@ -290,16 +312,16 @@ drop_shard(Shard) -> -spec store_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], - emqx_ds:message_store_opts() + batch_store_opts() ) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> ?tp(emqx_ds_storage_layer_store_batch, #{ shard => Shard, messages => Messages, options => Options }), - case prepare_batch(Shard, Messages, Options) of + case prepare_batch(Shard, Messages, #{}) of {ok, CookedBatch} -> - commit_batch(Shard, CookedBatch); + commit_batch(Shard, CookedBatch, Options); ignore -> ok; Error = {error, _, _} -> @@ -309,9 +331,9 @@ store_batch(Shard, Messages, Options) -> -spec prepare_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], - emqx_ds:message_store_opts() + batch_prepare_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> +prepare_batch(Shard, Messages = [{Time, _} | _], _Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. %% FIXME: always store messages in the current generation @@ -319,7 +341,7 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> {GenId, #{module := Mod, data := GenData}} -> T0 = erlang:monotonic_time(microsecond), Result = - case Mod:prepare_batch(Shard, GenData, Messages, Options) of + case Mod:prepare_batch(Shard, GenData, Messages) of {ok, CookedBatch} -> ?tp(emqx_ds_storage_layer_batch_cooked, #{ shard => Shard, gen => GenId, batch => CookedBatch @@ -338,11 +360,16 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> prepare_batch(_Shard, [], _Options) -> ignore. --spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result(). -commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> +-spec commit_batch( + shard_id(), + cooked_batch(), + batch_store_opts() +) -> emqx_ds:store_batch_result(). +commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) -> #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), + WriteOptions = mk_write_options(Options), T0 = erlang:monotonic_time(microsecond), - Result = Mod:commit_batch(Shard, GenData, CookedBatch), + Result = Mod:commit_batch(Shard, GenData, CookedBatch, WriteOptions), T1 = erlang:monotonic_time(microsecond), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result. @@ -994,6 +1021,13 @@ handle_event(Shard, Time, Event) -> GenId = generation_current(Shard), handle_event(Shard, Time, ?mk_storage_event(GenId, Event)). +%%-------------------------------------------------------------------------------- + +mk_write_options(#{durable := false}) -> + [{disable_wal, true}]; +mk_write_options(#{}) -> + []. + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index c70b80e68..026ae92b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,8 +31,8 @@ create/5, open/5, drop/5, - prepare_batch/4, - commit_batch/3, + prepare_batch/3, + commit_batch/4, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -102,10 +102,10 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -prepare_batch(_ShardId, _Data, Messages, _Options) -> +prepare_batch(_ShardId, _Data, Messages) -> {ok, Messages}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun({TS, Msg}) -> @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = [{disable_wal, true}]), + Res = rocksdb:write_batch(DB, Batch, WriteOpts), rocksdb:release_batch(Batch), Res. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bd0f382b2..866b4d381 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -64,7 +64,7 @@ t_iterate(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), %% Iterate through individual topics: [ begin @@ -94,7 +94,7 @@ t_delete(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), %% Iterate through topics: StartTime = 0, @@ -125,7 +125,7 @@ t_get_streams(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), GetStream = fun(Topic) -> StartTime = 0, emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime) @@ -152,7 +152,7 @@ t_get_streams(_Config) -> end || I <- lists:seq(1, 200) ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}), %% Check that "foo/bar/baz" topic now appears in two streams: %% "foo/bar/baz" and "foo/bar/+": NewStreams = lists:sort(GetStream("foo/bar/baz")), @@ -180,7 +180,7 @@ t_new_generation_inherit_trie(_Config) -> || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), @@ -194,7 +194,7 @@ t_new_generation_inherit_trie(_Config) -> || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), %% We should get only two streams for wildcard query, for "foo" and for "bar". ?assertMatch( [_Foo, _Bar], @@ -217,13 +217,13 @@ t_replay(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), %% Check various topic filters: Messages = [M || {_TS, M} <- Batch1 ++ Batch2], %% Missing topics (no ghost messages): From 80ea2e62f7e59e87607f10fce58a70e3adfa7623 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 13 Jun 2024 14:50:43 +0200 Subject: [PATCH 05/19] fix(stream): ensure that `chain/1` preserves the order --- apps/emqx_utils/src/emqx_utils_stream.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 510b3e377..bab09b6b3 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -190,7 +190,7 @@ transpose_tail(S, Tail) -> %% @doc Make a stream by concatenating multiple streams. -spec chain([stream(X)]) -> stream(X). chain(L) -> - lists:foldl(fun chain/2, empty(), L). + lists:foldr(fun chain/2, empty(), L). %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. From 2180cc7c269c73be1ce31864d888335d80438136 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 13 Jun 2024 15:26:34 +0200 Subject: [PATCH 06/19] fix(dsstore): avoid storing `cf_refs()` in the RocksDB itself This is both pointless and confusing. --- .../src/emqx_ds_storage_layer.erl | 50 +++++++++++++------ 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d8624f56b..e0e0256c4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -111,7 +111,8 @@ -type shard_id() :: {emqx_ds:db(), binary()}. --type cf_refs() :: [{string(), rocksdb:cf_handle()}]. +-type cf_ref() :: {string(), rocksdb:cf_handle()}. +-type cf_refs() :: [cf_ref()]. -type gen_id() :: 0..16#ffff. @@ -179,7 +180,7 @@ %% Module-specific data defined at generation creation time: data := Data, %% Column families used by this generation - cf_refs := cf_refs(), + cf_names := [string()], %% Time at which this was created. Might differ from `since', in particular for the %% first generation. created_at := emqx_message:timestamp(), @@ -789,9 +790,9 @@ handle_drop_generation(S0, GenId) -> #s{ shard_id = ShardId, db = DB, - schema = #{?GEN_KEY(GenId) := GenSchema} = OldSchema, - shard = OldShard, - cf_refs = OldCFRefs + schema = #{?GEN_KEY(GenId) := GenSchema} = Schema0, + shard = #{?GEN_KEY(GenId) := #{data := RuntimeData}} = Shard0, + cf_refs = CFRefs0 } = S0, %% 1. Commit the metadata first, so other functions are less %% likely to see stale data, and replicas don't end up @@ -800,16 +801,16 @@ handle_drop_generation(S0, GenId) -> %% %% Note: in theory, this operation may be interrupted in the %% middle. This will leave column families hanging. - Shard = maps:remove(?GEN_KEY(GenId), OldShard), - Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + Shard = maps:remove(?GEN_KEY(GenId), Shard0), + Schema = maps:remove(?GEN_KEY(GenId), Schema0), S1 = S0#s{ shard = Shard, schema = Schema }, commit_metadata(S1), %% 2. Now, actually drop the data from RocksDB: - #{module := Mod, cf_refs := GenCFRefs} = GenSchema, - #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, + #{module := Mod, cf_names := GenCFNames} = GenSchema, + GenCFRefs = [cf_ref(Name, CFRefs0) || Name <- GenCFNames], try Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) catch @@ -826,7 +827,7 @@ handle_drop_generation(S0, GenId) -> } ) end, - CFRefs = OldCFRefs -- GenCFRefs, + CFRefs = CFRefs0 -- GenCFRefs, S = S1#s{cf_refs = CFRefs}, {ok, S}. @@ -878,7 +879,7 @@ new_generation(ShardId, DB, Schema0, Shard0, Since) -> GenSchema = #{ module => Mod, data => GenData, - cf_refs => NewCFRefs, + cf_names => cf_names(NewCFRefs), created_at => erlang:system_time(millisecond), since => Since, until => undefined @@ -964,18 +965,22 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. -handle_flush(S = #s{db = DB, cf_need_flush = NeedFlushGenId, schema = Schema}) -> +handle_flush(S = #s{db = DB, cf_refs = CFRefs, cf_need_flush = NeedFlushGenId, shard = Shard}) -> %% NOTE %% There could have been few generations added since the last time `flush/1` was %% called. Strictly speaking, we don't need to flush them all at once as part of %% a single atomic flush, but the error handling is a bit easier this way. - CurrentGenId = maps:get(current_generation, Schema), + CurrentGenId = maps:get(current_generation, Shard), GenIds = lists:seq(NeedFlushGenId, CurrentGenId), CFHandles = lists:flatmap( fun(GenId) -> - #{?GEN_KEY(GenId) := #{cf_refs := CFRefs}} = Schema, - {_, CFHandles} = lists:unzip(CFRefs), - CFHandles + case Shard of + #{?GEN_KEY(GenId) := #{cf_names := CFNames}} -> + [cf_handle(N, CFRefs) || N <- CFNames]; + #{} -> + %% Generation was probably dropped. + [] + end end, GenIds ), @@ -1028,6 +1033,19 @@ mk_write_options(#{durable := false}) -> mk_write_options(#{}) -> []. +-spec cf_names(cf_refs()) -> [string()]. +cf_names(CFRefs) -> + {CFNames, _CFHandles} = lists:unzip(CFRefs), + CFNames. + +-spec cf_ref(_Name :: string(), cf_refs()) -> cf_ref(). +cf_ref(Name, CFRefs) -> + lists:keyfind(Name, 1, CFRefs). + +-spec cf_handle(_Name :: string(), cf_refs()) -> rocksdb:cf_handle(). +cf_handle(Name, CFRefs) -> + element(2, cf_ref(Name, CFRefs)). + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- From cd0663074e03ff44ecc24bf1757f60077c679936 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 13 Jun 2024 15:29:26 +0200 Subject: [PATCH 07/19] test(dsrepl): add `add_generation` events into the mix They usually cause storage layer to perform flushes, and thus enable testing `handle_flush/1` codepath in different circumstances. --- .../emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index abe154807..2276dfb03 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -232,14 +232,14 @@ t_rebalance(Config) -> ], Stream1 = emqx_utils_stream:interleave( [ - {10, Stream0}, + {20, Stream0}, emqx_utils_stream:const(add_generation) ], false ), Stream = emqx_utils_stream:interleave( [ - {50, Stream0}, + {50, Stream1}, emqx_utils_stream:list(Sequence) ], true @@ -604,7 +604,7 @@ t_drop_generation(Config) -> after emqx_cth_cluster:stop(Nodes) end, - fun(Trace) -> + fun(_Trace) -> %% TODO: some idempotency errors still happen %% ?assertMatch([], ?of_kind(ds_storage_layer_failed_to_drop_generation, Trace)), true From 19072414cbf09a8354cbc5d4b5d2f3e15aaa3232 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 13 Jun 2024 18:10:22 +0200 Subject: [PATCH 08/19] chore: bump `erlang-rocksdb` to 1.8.0-emqx-6 --- mix.exs | 2 +- rebar.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 6a7e6bda7..cc8f1682e 100644 --- a/mix.exs +++ b/mix.exs @@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}, - {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true}, + {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-6", override: true}, {:ekka, github: "emqx/ekka", tag: "0.19.4", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true}, diff --git a/rebar.config b/rebar.config index e1d8d23f3..55ea83fdd 100644 --- a/rebar.config +++ b/rebar.config @@ -82,7 +82,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, - {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, + {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.4"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}}, From 8538a5a5b61f767b0f883eeee4e5fb2eff402e44 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 13 Jun 2024 18:32:57 +0200 Subject: [PATCH 09/19] test(dsrepl): anticipate transitionless membership changes E.g. when a membership change is applied twice in a row. --- apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index ba9589e97..c8162e42a 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -207,7 +207,12 @@ apply_stream(DB, NodeStream0, Stream0, N) -> %% Give some time for at least one transition to complete. Transitions = transitions(Node, DB), ct:pal("Transitions after ~p: ~p", [Operation, Transitions]), - ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))), + case Transitions of + [_ | _] -> + ?retry(200, 10, ?assertNotEqual(Transitions, transitions(Node, DB))); + [] -> + ok + end, apply_stream(DB, NodeStream0, Stream, N); [Fun | Stream] when is_function(Fun) -> Fun(), From 5fd5fc76e515cad1e2187a47cb12896f88ab1f1a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 14:16:05 +0200 Subject: [PATCH 10/19] fix(dsstore): ensure backward compatibility --- .../src/emqx_ds_storage_layer.erl | 99 +++++++++++++++++-- 1 file changed, 91 insertions(+), 8 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e0e0256c4..6d1744a07 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -1133,23 +1133,106 @@ erase_schema_runtime(Shard) -> -undef(PERSISTENT_TERM). --define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>). +-define(ROCKSDB_SCHEMA_KEY(V), <<"schema_", V>>). + +-define(ROCKSDB_SCHEMA_KEY, ?ROCKSDB_SCHEMA_KEY("v2")). +-define(ROCKSDB_SCHEMA_KEYS, [ + ?ROCKSDB_SCHEMA_KEY, + ?ROCKSDB_SCHEMA_KEY("v1") +]). -spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found. get_schema_persistent(DB) -> - case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of + get_schema_persistent(DB, ?ROCKSDB_SCHEMA_KEYS). + +get_schema_persistent(DB, [Key | Rest]) -> + case rocksdb:get(DB, Key, []) of {ok, Blob} -> - Schema = binary_to_term(Blob), - %% Sanity check: - #{current_generation := _, prototype := _} = Schema, - Schema; + deserialize_schema(Key, Blob); not_found -> - not_found - end. + get_schema_persistent(DB, Rest) + end; +get_schema_persistent(_DB, []) -> + not_found. -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok. put_schema_persistent(DB, Schema) -> Blob = term_to_binary(Schema), rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []). +-spec deserialize_schema(_SchemaVsn :: binary(), binary()) -> shard_schema(). +deserialize_schema(SchemaVsn, Blob) -> + %% Sanity check: + Schema = #{current_generation := _, prototype := _} = binary_to_term(Blob), + decode_schema(SchemaVsn, Schema). + +decode_schema(?ROCKSDB_SCHEMA_KEY, Schema) -> + Schema; +decode_schema(?ROCKSDB_SCHEMA_KEY("v1"), Schema) -> + maps:map(fun decode_schema_v1/2, Schema). + +decode_schema_v1(?GEN_KEY(_), Generation = #{}) -> + decode_generation_schema_v1(Generation); +decode_schema_v1(_, V) -> + V. + +decode_generation_schema_v1(SchemaV1 = #{cf_refs := CFRefs}) -> + %% Drop potentially dead CF references from the time generation was created. + Schema = maps:remove(cf_refs, SchemaV1), + Schema#{cf_names => cf_names(CFRefs)}; +decode_generation_schema_v1(Schema = #{}) -> + Schema. + +%%-------------------------------------------------------------------------------- + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +decode_schema_v1_test() -> + SchemaV1 = #{ + current_generation => 42, + prototype => {emqx_ds_storage_reference, #{}}, + ?GEN_KEY(41) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_refs => [{"emqx_ds_storage_reference41", erlang:make_ref()}], + created_at => 12345, + since => 0, + until => 123456 + }, + ?GEN_KEY(42) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_refs => [{"emqx_ds_storage_reference42", erlang:make_ref()}], + created_at => 54321, + since => 123456, + until => undefined + } + }, + ?assertEqual( + #{ + current_generation => 42, + prototype => {emqx_ds_storage_reference, #{}}, + ?GEN_KEY(41) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_names => ["emqx_ds_storage_reference41"], + created_at => 12345, + since => 0, + until => 123456 + }, + ?GEN_KEY(42) => #{ + module => emqx_ds_storage_reference, + data => {schema}, + cf_names => ["emqx_ds_storage_reference42"], + created_at => 54321, + since => 123456, + until => undefined + } + }, + deserialize_schema(?ROCKSDB_SCHEMA_KEY("v1"), term_to_binary(SchemaV1)) + ). + +-endif. + -undef(ROCKSDB_SCHEMA_KEY). From ae89b61af05660507205e438c7331cb69429e250 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 14:16:50 +0200 Subject: [PATCH 11/19] feat(cth-cluster): make `restart/1` more generic --- .../emqx_persistent_session_ds_SUITE.erl | 2 +- apps/emqx/test/emqx_cth_cluster.erl | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 920e2528f..15391de6e 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -163,7 +163,7 @@ mk_clientid(Prefix, ID) -> restart_node(Node, NodeSpec) -> ?tp(will_restart_node, #{}), - emqx_cth_cluster:restart(Node, NodeSpec), + emqx_cth_cluster:restart(NodeSpec), wait_nodeup(Node), ?tp(restarted_node, #{}), ok. diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 981b2e5eb..20400a1c4 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -38,7 +38,7 @@ %% in `end_per_suite/1` or `end_per_group/2`) with the result from step 2. -module(emqx_cth_cluster). --export([start/1, start/2, restart/1, restart/2]). +-export([start/1, start/2, restart/1]). -export([stop/1, stop_node/1]). -export([start_bare_nodes/1, start_bare_nodes/2]). @@ -163,13 +163,13 @@ wait_clustered([Node | Nodes] = All, Check, Deadline) -> wait_clustered(All, Check, Deadline) end. -restart(NodeSpec) -> - restart(maps:get(name, NodeSpec), NodeSpec). - -restart(Node, Spec) -> - ct:pal("Stopping peer node ~p", [Node]), - ok = emqx_cth_peer:stop(Node), - start([Spec#{boot_type => restart}]). +restart(NodeSpecs = [_ | _]) -> + Nodes = [maps:get(name, Spec) || Spec <- NodeSpecs], + ct:pal("Stopping peer nodes: ~p", [Nodes]), + ok = stop(Nodes), + start([Spec#{boot_type => restart} || Spec <- NodeSpecs]); +restart(NodeSpec = #{}) -> + restart([NodeSpec]). mk_nodespecs(Nodes, ClusterOpts) -> NodeSpecs = lists:zipwith( From 8db70b5bbccf4f03060e2a7feaffc752d7574f0c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 16:43:33 +0200 Subject: [PATCH 12/19] test(dsrepl): add crash-restart-recover testcase That verifies nothing is lost in the event of abrupt node failures. --- .../test/emqx_ds_replication_SUITE.erl | 71 +++++++++++++++++++ .../test/emqx_ds_test_helpers.erl | 6 +- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 2276dfb03..a6ca86f7e 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -794,6 +794,77 @@ t_store_batch_fail(_Config) -> ] ). +t_crash_restart_recover(init, Config) -> + Apps = [appspec(emqx_durable_storage), emqx_ds_builtin_raft], + Specs = emqx_cth_cluster:mk_nodespecs( + [ + {t_crash_stop_recover1, #{apps => Apps}}, + {t_crash_stop_recover2, #{apps => Apps}}, + {t_crash_stop_recover3, #{apps => Apps}} + ], + #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)} + ), + Nodes = emqx_cth_cluster:start(Specs), + [{nodes, Nodes}, {nodespecs, Specs} | Config]; +t_crash_restart_recover('end', Config) -> + ok = emqx_cth_cluster:stop(?config(nodes, Config)). + +t_crash_restart_recover(Config) -> + %% This testcase verifies that in the event of abrupt site failure message data is + %% correctly preserved. + Nodes = [N1, N2, N3] = ?config(nodes, Config), + _Specs = [_, NS2, NS3] = ?config(nodespecs, Config), + DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}), + + %% Prepare test event stream. + {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( + ?FUNCTION_NAME, _NClients = 8, _NMsgs = 400 + ), + Stream1 = emqx_utils_stream:interleave( + [ + {300, Stream0}, + emqx_utils_stream:const(add_generation) + ], + false + ), + Stream = emqx_utils_stream:interleave( + [ + {1000, Stream1}, + emqx_utils_stream:list([ + fun() -> kill_restart_node_async(N2, NS2, DBOpts) end, + fun() -> kill_restart_node_async(N3, NS3, DBOpts) end + ]) + ], + true + ), + + ?check_trace( + begin + %% Initialize DB on all nodes. + ?assertEqual( + [{ok, ok} || _ <- Nodes], + erpc:multicall(Nodes, emqx_ds, open_db, [?DB, DBOpts]) + ), + + %% Apply the test events, including simulated node crashes. + NodeStream = emqx_utils_stream:const(N1), + emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), + timer:sleep(5000), + + %% Verify that all the data is there. + emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) + end, + [] + ). + +kill_restart_node_async(Node, Spec, DBOpts) -> + erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]). + +kill_restart_node(Node, Spec, DBOpts) -> + ok = emqx_cth_peer:kill(Node), + _ = emqx_cth_cluster:restart(Spec), + ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]). + %% shard_server_info(Node, DB, Shard, Site, Info) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index c8162e42a..0b6b634c5 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -188,12 +188,14 @@ apply_stream(DB, NodeStream0, Stream0, N) -> ?ON(Node, emqx_ds:store_batch(DB, [Msg], #{sync => true})), apply_stream(DB, NodeStream, Stream, N + 1); [add_generation | Stream] -> - %% FIXME: + ?tp(notice, test_add_generation, #{}), [Node | NodeStream] = emqx_utils_stream:next(NodeStream0), ?ON(Node, emqx_ds:add_generation(DB)), apply_stream(DB, NodeStream, Stream, N); [{Node, Operation, Arg} | Stream] when - Operation =:= join_db_site; Operation =:= leave_db_site; Operation =:= assign_db_sites + Operation =:= join_db_site; + Operation =:= leave_db_site; + Operation =:= assign_db_sites -> ?tp(notice, test_apply_operation, #{node => Node, operation => Operation, arg => Arg}), %% Apply the transition. From b6a249baa9a03a77d549028101c2a4e46838aa43 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 17:32:49 +0200 Subject: [PATCH 13/19] feat(cth-peer): add brutal `kill/1` facility --- apps/emqx/test/emqx_cth_peer.erl | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/apps/emqx/test/emqx_cth_peer.erl b/apps/emqx/test/emqx_cth_peer.erl index b3849739a..9db595d73 100644 --- a/apps/emqx/test/emqx_cth_peer.erl +++ b/apps/emqx/test/emqx_cth_peer.erl @@ -22,6 +22,7 @@ -export([start/2, start/3, start/4]). -export([start_link/2, start_link/3, start_link/4]). -export([stop/1]). +-export([kill/1]). start(Name, Args) -> start(Name, Args, []). @@ -66,6 +67,32 @@ stop(Node) when is_atom(Node) -> ok end. +%% @doc Kill a node abruptly, through mechanisms provided by OS. +%% Relies on POSIX `kill`. +kill(Node) -> + try erpc:call(Node, os, getpid, []) of + OSPid -> + Pid = whereis(Node), + _ = is_pid(Pid) andalso unlink(Pid), + Result = kill_os_process(OSPid), + %% Either ensure control process stops, or try to stop if not killed. + _ = is_pid(Pid) andalso catch peer:stop(Pid), + Result + catch + error:{erpc, _} = Reason -> + {error, Reason} + end. + +kill_os_process(OSPid) -> + Cmd = "kill -SIGKILL " ++ OSPid, + Port = erlang:open_port({spawn, Cmd}, [binary, exit_status, hide]), + receive + {Port, {exit_status, 0}} -> + ok; + {Port, {exit_status, EC}} -> + {error, EC} + end. + parse_node_name(NodeName) -> case string:tokens(atom_to_list(NodeName), "@") of [Name, Host] -> From 8ff48ac5ea1ad47712b4c235f7ea1f4a1b4069bd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 14 Jun 2024 18:41:44 +0200 Subject: [PATCH 14/19] feat(dsrepl): rely on accumulated log size to decide when to flush --- .../src/emqx_ds_replication_layer.erl | 100 ++++++++++++------ 1 file changed, 70 insertions(+), 30 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index af0aeec7d..a2476ba66 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -581,8 +581,18 @@ list_nodes() -> -define(RA_TIMEOUT, 60 * 1000). %% How often to release Raft logs? -%% Each N log entries mark everything up to the last N entries "releasable". --define(RA_RELEASE_LOG_FREQ, 1000). +%% Each time we written approximately this number of bytes. +%% Close to the RocksDB's default of 64 MiB. +-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000_000). +%% ...Or at least each N log entries. +-define(RA_RELEASE_LOG_MIN_FREQ, 64_000). + +-ifdef(TEST). +-undef(RA_RELEASE_LOG_APPROX_SIZE). +-undef(RA_RELEASE_LOG_MIN_FREQ). +-define(RA_RELEASE_LOG_APPROX_SIZE, 50_000). +-define(RA_RELEASE_LOG_MIN_FREQ, 1_000). +-endif. -define(SAFE_ERPC(EXPR), try @@ -752,6 +762,7 @@ ra_drop_shard(DB, Shard) -> %% -define(pd_ra_idx_need_release, '$emqx_ds_raft_idx_need_release'). +-define(pd_ra_bytes_need_release, '$emqx_ds_raft_bytes_need_release'). -spec init(_Args :: map()) -> ra_state(). init(#{db := DB, shard := Shard}) -> @@ -768,11 +779,11 @@ apply( #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), - {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), + {Stats, Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), - Effects = try_release_log(RaftMeta, State), + Effects = try_release_log(Stats, RaftMeta, State), Effects =/= [] andalso ?tp(ds_ra_effects, #{effects => Effects, meta => RaftMeta}), {State, Result, Effects}; apply( @@ -847,22 +858,27 @@ apply( Effects = handle_custom_event(DBShard, Latest, CustomEvent), {State#{latest => Latest}, ok, Effects}. -try_release_log(RaftMeta = #{index := CurrentIdx}, State) -> +try_release_log({_N, BatchSize}, RaftMeta = #{index := CurrentIdx}, State) -> %% NOTE - %% Release everything up to the last log entry, but only if there were more than - %% `?RA_RELEASE_LOG_FREQ` new entries since the last release. - case get_log_need_release(RaftMeta) of - undefined -> - []; - PrevIdx when ?RA_RELEASE_LOG_FREQ > CurrentIdx - PrevIdx -> - []; - _PrevIdx -> - %% TODO - %% Number of log entries is not the best metric. Because cursor release - %% means storage flush (see `emqx_ds_replication_snapshot:write/3`), we - %% should do that not too often (so the storage is happy with L0 SST size) - %% and not too rarely (so we don't accumulate huge Raft logs). - release_log(RaftMeta, State) + %% Because cursor release means storage flush (see + %% `emqx_ds_replication_snapshot:write/3`), we should do that not too often + %% (so the storage is happy with L0 SST sizes) and not too rarely (so we don't + %% accumulate huge Raft logs). + case inc_bytes_need_release(BatchSize) of + AccSize when AccSize > ?RA_RELEASE_LOG_APPROX_SIZE -> + release_log(RaftMeta, State); + _NotYet -> + case get_log_need_release(RaftMeta) of + undefined -> + []; + PrevIdx when CurrentIdx - PrevIdx > ?RA_RELEASE_LOG_MIN_FREQ -> + %% Release everything up to the last log entry, but only if there were + %% more than %% `?RA_RELEASE_LOG_MIN_FREQ` new entries since the last + %% release. + release_log(RaftMeta, State); + _ -> + [] + end end. release_log(RaftMeta = #{index := CurrentIdx}, State) -> @@ -871,6 +887,7 @@ release_log(RaftMeta = #{index := CurrentIdx}, State) -> %% following `CurrentIdx` should not contribute to `State` (that will be recovered %% from a snapshot). update_log_need_release(RaftMeta), + reset_bytes_need_release(), {release_cursor, CurrentIdx, State}. get_log_need_release(RaftMeta) -> @@ -885,6 +902,17 @@ get_log_need_release(RaftMeta) -> update_log_need_release(#{index := CurrentIdx}) -> erlang:put(?pd_ra_idx_need_release, CurrentIdx). +get_bytes_need_release() -> + emqx_maybe:define(erlang:get(?pd_ra_bytes_need_release), 0). + +inc_bytes_need_release(Size) -> + Acc = get_bytes_need_release() + Size, + erlang:put(?pd_ra_bytes_need_release, Acc), + Acc. + +reset_bytes_need_release() -> + erlang:put(?pd_ra_bytes_need_release, 0). + -spec tick(integer(), ra_state()) -> ra_machine:effects(). tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), @@ -893,19 +921,20 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, []). + assign_timestamps(Latest, Messages, [], 0, 0). -assign_timestamps(Latest, [MessageIn | Rest], Acc) -> - case emqx_message:timestamp(MessageIn, microsecond) of - TimestampUs when TimestampUs > Latest -> - Message = assign_timestamp(TimestampUs, MessageIn), - assign_timestamps(TimestampUs, Rest, [Message | Acc]); +assign_timestamps(Latest0, [Message0 | Rest], Acc, N, Sz) -> + case emqx_message:timestamp(Message0, microsecond) of + TimestampUs when TimestampUs > Latest0 -> + Latest = TimestampUs, + Message = assign_timestamp(TimestampUs, Message0); _Earlier -> - Message = assign_timestamp(Latest + 1, MessageIn), - assign_timestamps(Latest + 1, Rest, [Message | Acc]) - end; -assign_timestamps(Latest, [], Acc) -> - {Latest, lists:reverse(Acc)}. + Latest = Latest0 + 1, + Message = assign_timestamp(Latest, Message0) + end, + assign_timestamps(Latest, Rest, [Message | Acc], N + 1, Sz + approx_message_size(Message0)); +assign_timestamps(Latest, [], Acc, N, Size) -> + {{N, Size}, Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. @@ -938,3 +967,14 @@ handle_custom_event(DBShard, Latest, Event) -> set_ts({DB, Shard}, TS) -> emqx_ds_builtin_raft_sup:set_gvar(DB, ?gv_timestamp(Shard), TS). + +%% + +approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) -> + MinOverhead = 40, + MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload). + +clientid_size(ClientID) when is_binary(ClientID) -> + byte_size(ClientID); +clientid_size(ClientID) -> + erlang:external_size(ClientID). From 5b5f33c421c75a5765e5348b7543a88a2e27413e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 24 Jun 2024 12:49:59 +0200 Subject: [PATCH 15/19] chore(dsstore): resurrect `prepare_batch` entry tracepoint --- .../src/emqx_ds_storage_layer.erl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 6d1744a07..1ff63aa51 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -317,9 +317,6 @@ drop_shard(Shard) -> ) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> - ?tp(emqx_ds_storage_layer_store_batch, #{ - shard => Shard, messages => Messages, options => Options - }), case prepare_batch(Shard, Messages, #{}) of {ok, CookedBatch} -> commit_batch(Shard, CookedBatch, Options); @@ -334,9 +331,12 @@ store_batch(Shard, Messages, Options) -> [{emqx_ds:time(), emqx_types:message()}], batch_prepare_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _} | _], _Options) -> +prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. + ?tp(emqx_ds_storage_layer_prepare_batch, #{ + shard => Shard, messages => Messages, options => Options + }), %% FIXME: always store messages in the current generation case generation_at(Shard, Time) of {GenId, #{module := Mod, data := GenData}} -> @@ -344,9 +344,6 @@ prepare_batch(Shard, Messages = [{Time, _} | _], _Options) -> Result = case Mod:prepare_batch(Shard, GenData, Messages) of {ok, CookedBatch} -> - ?tp(emqx_ds_storage_layer_batch_cooked, #{ - shard => Shard, gen => GenId, batch => CookedBatch - }), {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> Error From 733751fadd21fedf2940e9526a236c3ea8493a0b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 24 Jun 2024 13:04:13 +0200 Subject: [PATCH 16/19] refactor(dsstore): keep passing `Options` to both prepare + commit --- .../src/emqx_ds_storage_bitfield_lts.erl | 22 +++++++++++++------ .../src/emqx_ds_storage_layer.erl | 21 +++++------------- .../src/emqx_ds_storage_reference.erl | 15 +++++++++---- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 9c3a930eb..20c3bc087 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,7 +28,7 @@ create/5, open/5, drop/5, - prepare_batch/3, + prepare_batch/4, commit_batch/4, get_streams/4, get_delete_streams/4, @@ -269,10 +269,11 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}, ...] + [{emqx_ds:time(), emqx_types:message()}, ...], + emqx_ds_storage_layer:batch_store_opts() ) -> {ok, cooked_batch()}. -prepare_batch(_ShardId, S, Messages) -> +prepare_batch(_ShardId, S, Messages, _Options) -> _ = erase(?lts_persist_ops), {Payloads, MaxTs} = lists:mapfoldl( @@ -294,13 +295,13 @@ prepare_batch(_ShardId, S, Messages) -> emqx_ds_storage_layer:shard_id(), s(), cooked_batch(), - emqx_ds_storage_layer:db_write_opts() + emqx_ds_storage_layer:batch_store_opts() ) -> ok | emqx_ds:error(_). commit_batch( _ShardId, _Data, #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, - _WriteOpts + _Options ) -> %% Assert: [] = LTS, @@ -309,7 +310,7 @@ commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, - WriteOpts + Options ) -> {ok, Batch} = rocksdb:batch(), %% Commit LTS trie to the storage: @@ -328,7 +329,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, WriteOpts), + Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE @@ -966,6 +967,13 @@ pop_lts_persist_ops() -> L end. +-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) -> + _RocksDBOpts :: [{atom(), _}]. +write_batch_opts(#{durable := false}) -> + [{disable_wal, true}]; +write_batch_opts(#{}) -> + []. + -ifdef(TEST). serialize(Msg) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 1ff63aa51..fe1d36a35 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -71,8 +71,7 @@ options/0, prototype/0, cooked_batch/0, - batch_store_opts/0, - db_write_opts/0 + batch_store_opts/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -223,9 +222,6 @@ %% Generation callbacks %%================================================================================ -%% See: `rocksdb:write_options()'. --type db_write_opts() :: [_Option]. - %% Create the new schema given generation id and the options. %% Create rocksdb column families. -callback create( @@ -248,7 +244,8 @@ -callback prepare_batch( shard_id(), generation_data(), - [{emqx_ds:time(), emqx_types:message()}, ...] + [{emqx_ds:time(), emqx_types:message()}, ...], + batch_store_opts() ) -> {ok, term()} | emqx_ds:error(_). @@ -256,7 +253,7 @@ shard_id(), generation_data(), _CookedBatch, - db_write_opts() + batch_store_opts() ) -> ok | emqx_ds:error(_). -callback get_streams( @@ -342,7 +339,7 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> {GenId, #{module := Mod, data := GenData}} -> T0 = erlang:monotonic_time(microsecond), Result = - case Mod:prepare_batch(Shard, GenData, Messages) of + case Mod:prepare_batch(Shard, GenData, Messages, Options) of {ok, CookedBatch} -> {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> @@ -365,9 +362,8 @@ prepare_batch(_Shard, [], _Options) -> ) -> emqx_ds:store_batch_result(). commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) -> #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), - WriteOptions = mk_write_options(Options), T0 = erlang:monotonic_time(microsecond), - Result = Mod:commit_batch(Shard, GenData, CookedBatch, WriteOptions), + Result = Mod:commit_batch(Shard, GenData, CookedBatch, Options), T1 = erlang:monotonic_time(microsecond), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result. @@ -1025,11 +1021,6 @@ handle_event(Shard, Time, Event) -> %%-------------------------------------------------------------------------------- -mk_write_options(#{durable := false}) -> - [{disable_wal, true}]; -mk_write_options(#{}) -> - []. - -spec cf_names(cf_refs()) -> [string()]. cf_names(CFRefs) -> {CFNames, _CFHandles} = lists:unzip(CFRefs), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 026ae92b6..ca29c11a8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,7 +31,7 @@ create/5, open/5, drop/5, - prepare_batch/3, + prepare_batch/4, commit_batch/4, get_streams/4, get_delete_streams/4, @@ -102,10 +102,10 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -prepare_batch(_ShardId, _Data, Messages) -> +prepare_batch(_ShardId, _Data, Messages, _Options) -> {ok, Messages}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) -> +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun({TS, Msg}) -> @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, WriteOpts), + Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), Res. @@ -284,3 +284,10 @@ do_delete_next( -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> "emqx_ds_storage_reference" ++ integer_to_list(GenId). + +-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) -> + _RocksDBOpts :: [{atom(), _}]. +write_batch_opts(#{durable := false}) -> + [{disable_wal, true}]; +write_batch_opts(#{}) -> + []. From 3d296abde97e833cebfe00f69cb05b45a9637bb9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 26 Jun 2024 18:40:30 +0200 Subject: [PATCH 17/19] fix(dsrepl): classify ra error conditions more carefully Most importantly: avoid automatic retries of `shutdown` and `nodedown` errors as this could easily lead to Raft log entries duplication. --- .../src/emqx_ds_replication_layer.erl | 26 +++++-- .../src/emqx_ds_replication_layer_shard.erl | 70 ++++++++++++++----- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index a2476ba66..669abdbf1 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -63,6 +63,8 @@ apply/3, tick/2, + state_enter/2, + snapshot_module/0 ]). @@ -380,7 +382,7 @@ init_buffer(_DB, _Shard, _Options) -> {ok, #bs{}}. -spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) -> - {egress_state(), ok | {error, recoverable | unrecoverable, _}}. + {egress_state(), ok | emqx_ds:error(_)}. flush_buffer(DB, Shard, Messages, State) -> case ra_store_batch(DB, Shard, Messages) of {timeout, ServerId} -> @@ -623,18 +625,20 @@ list_nodes() -> ). -spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> - ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err. + ok | {timeout, _} | {error, recoverable | unrecoverable, _Err}. ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, ?batch_messages => Messages }, Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), - case ra:process_command(Servers, Command, ?RA_TIMEOUT) of + case emqx_ds_replication_layer_shard:process_command(Servers, Command, ?RA_TIMEOUT) of {ok, Result, _Leader} -> Result; - Error -> - Error + {timeout, _} = Timeout -> + Timeout; + {error, Reason = servers_unreachable} -> + {error, recoverable, Reason} end. ra_add_generation(DB, Shard) -> @@ -970,7 +974,19 @@ set_ts({DB, Shard}, TS) -> %% +-spec state_enter(ra_server:ra_state() | eol, ra_state()) -> ra_machine:effects(). +state_enter(MemberState, #{db_shard := {DB, Shard}, latest := Latest}) -> + ?tp( + ds_ra_state_enter, + #{db => DB, shard => Shard, latest => Latest, state => MemberState} + ), + []. + +%% + approx_message_size(#message{from = ClientID, topic = Topic, payload = Payload}) -> + %% NOTE: Overhead here is basically few empty maps + 8-byte message id. + %% TODO: Probably need to ask the storage layer about the footprint. MinOverhead = 40, MinOverhead + clientid_size(ClientID) + byte_size(Topic) + byte_size(Payload). diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl index b43373c43..cdd62d874 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_shard.erl @@ -4,6 +4,8 @@ -module(emqx_ds_replication_layer_shard). +-include_lib("snabbkaffe/include/trace.hrl"). + %% API: -export([start_link/3]). @@ -19,6 +21,12 @@ servers/3 ]). +%% Safe Process Command API +-export([ + process_command/3, + try_servers/3 +]). + %% Membership -export([ add_local_server/2, @@ -37,6 +45,12 @@ -type server() :: ra:server_id(). +-type server_error() :: server_error(none()). +-type server_error(Reason) :: + {timeout, server()} + | {error, server(), Reason} + | {error, servers_unreachable}. + -define(MEMBERSHIP_CHANGE_TIMEOUT, 30_000). %% @@ -146,6 +160,40 @@ local_site() -> %% +-spec process_command([server()], _Command, timeout()) -> + {ok, _Result, _Leader :: server()} | server_error(). +process_command(Servers, Command, Timeout) -> + try_servers(Servers, fun ra:process_command/3, [Command, Timeout]). + +-spec try_servers([server()], function(), [_Arg]) -> + {ok, _Result, _Leader :: server()} | server_error(_Reason). +try_servers([Server | Rest], Fun, Args) -> + case is_server_online(Server) andalso erlang:apply(Fun, [Server | Args]) of + {ok, R, Leader} -> + {ok, R, Leader}; + _Online = false -> + ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => offline}), + try_servers(Rest, Fun, Args); + {error, Reason = noproc} -> + ?tp(emqx_ds_replshard_try_next_servers, #{server => Server, reason => Reason}), + try_servers(Rest, Fun, Args); + {error, Reason} when Reason =:= nodedown orelse Reason =:= shutdown -> + %% NOTE + %% Conceptually, those error conditions basically mean the same as a plain + %% timeout: "it's impossible to tell if operation has succeeded or not". + ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => Reason}), + {timeout, Server}; + {timeout, _} = Timeout -> + ?tp(emqx_ds_replshard_try_servers_timeout, #{server => Server, reason => timeout}), + Timeout; + {error, Reason} -> + {error, Server, Reason} + end; +try_servers([], _Fun, _Args) -> + {error, servers_unreachable}. + +%% + %% @doc Add a local server to the shard cluster. %% It's recommended to have the local server running before calling this function. %% This function is idempotent. @@ -174,10 +222,10 @@ add_local_server(DB, Shard) -> } end, Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, - case ra_try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of + case try_servers(ShardServers, fun ra:add_member/3, [ServerRecord, Timeout]) of {ok, _, _Leader} -> ok; - {error, already_member} -> + {error, _Server, already_member} -> ok; Error -> {error, recoverable, Error} @@ -208,10 +256,10 @@ drop_local_server(DB, Shard) -> remove_server(DB, Shard, Server) -> ShardServers = shard_servers(DB, Shard), Timeout = ?MEMBERSHIP_CHANGE_TIMEOUT, - case ra_try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of + case try_servers(ShardServers, fun ra:remove_member/3, [Server, Timeout]) of {ok, _, _Leader} -> ok; - {error, not_member} -> + {error, _Server, not_member} -> ok; Error -> {error, recoverable, Error} @@ -261,20 +309,6 @@ member_readiness(#{status := Status, voter_status := #{membership := Membership} member_readiness(#{}) -> unknown. -%% - -ra_try_servers([Server | Rest], Fun, Args) -> - case erlang:apply(Fun, [Server | Args]) of - {ok, R, Leader} -> - {ok, R, Leader}; - {error, Reason} when Reason == noproc; Reason == nodedown -> - ra_try_servers(Rest, Fun, Args); - ErrorOrTimeout -> - ErrorOrTimeout - end; -ra_try_servers([], _Fun, _Args) -> - {error, servers_unreachable}. - ra_overview(Server) -> case ra:member_overview(Server) of {ok, Overview, _Leader} -> From 30efa1f57e4be25da019222d075d34a348ea573b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 26 Jun 2024 18:44:07 +0200 Subject: [PATCH 18/19] test(dsrepl): relax crash-recover testcase to tolerate message loss Which is quite an expected occasion for this kind of stress test. --- .../test/emqx_ds_replication_SUITE.erl | 53 ++++++++++++++++--- .../src/emqx_ds_buffer.erl | 2 +- .../test/emqx_ds_test_helpers.erl | 9 ++-- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index a6ca86f7e..fe0a30489 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -817,9 +817,10 @@ t_crash_restart_recover(Config) -> DBOpts = opts(#{n_shards => 16, n_sites => 3, replication_factor => 3}), %% Prepare test event stream. - {Stream0, TopicStreams} = emqx_ds_test_helpers:interleaved_topic_messages( - ?FUNCTION_NAME, _NClients = 8, _NMsgs = 400 - ), + NMsgs = 400, + NClients = 8, + {Stream0, TopicStreams} = + emqx_ds_test_helpers:interleaved_topic_messages(?FUNCTION_NAME, NClients, NMsgs), Stream1 = emqx_utils_stream:interleave( [ {300, Stream0}, @@ -849,19 +850,59 @@ t_crash_restart_recover(Config) -> %% Apply the test events, including simulated node crashes. NodeStream = emqx_utils_stream:const(N1), emqx_ds_test_helpers:apply_stream(?DB, NodeStream, Stream, 0), - timer:sleep(5000), - %% Verify that all the data is there. - emqx_ds_test_helpers:verify_stream_effects(?DB, ?FUNCTION_NAME, Nodes, TopicStreams) + %% It's expected to lose few messages when leaders are abruptly killed. + MatchFlushFailed = ?match_event(#{?snk_kind := emqx_ds_buffer_flush_failed}), + {ok, SubRef} = snabbkaffe:subscribe(MatchFlushFailed, NMsgs, _Timeout = 5000, infinity), + {timeout, Events} = snabbkaffe:receive_events(SubRef), + LostMessages = [M || #{batch := Messages} <- Events, M <- Messages], + ct:pal("Some messages were lost: ~p", [LostMessages]), + ?assert(length(LostMessages) < NMsgs div 20), + + %% Verify that all the successfully persisted messages are there. + VerifyClient = fun({ClientId, ExpectedStream}) -> + Topic = emqx_ds_test_helpers:client_topic(?FUNCTION_NAME, ClientId), + ClientNodes = nodes_of_clientid(ClientId, Nodes), + DSStream1 = ds_topic_stream(ClientId, Topic, hd(ClientNodes)), + %% Do nodes contain same messages for a client? + lists:foreach( + fun(ClientNode) -> + DSStream = ds_topic_stream(ClientId, Topic, ClientNode), + ?defer_assert(emqx_ds_test_helpers:diff_messages(DSStream1, DSStream)) + end, + tl(ClientNodes) + ), + %% Does any messages were lost unexpectedly? + {_, DSMessages} = lists:unzip(emqx_utils_stream:consume(DSStream1)), + ExpectedMessages = emqx_utils_stream:consume(ExpectedStream), + MissingMessages = ExpectedMessages -- DSMessages, + ?defer_assert(?assertEqual([], MissingMessages -- LostMessages, DSMessages)) + end, + lists:foreach(VerifyClient, TopicStreams) end, [] ). +nodes_of_clientid(ClientId, Nodes) -> + emqx_ds_test_helpers:nodes_of_clientid(?DB, ClientId, Nodes). + +ds_topic_stream(ClientId, ClientTopic, Node) -> + emqx_ds_test_helpers:ds_topic_stream(?DB, ClientId, ClientTopic, Node). + +is_message_lost(Message, MessagesLost) -> + lists:any( + fun(ML) -> + emqx_ds_test_helpers:message_eq([clientid, topic, payload], Message, ML) + end, + MessagesLost + ). + kill_restart_node_async(Node, Spec, DBOpts) -> erlang:spawn_link(?MODULE, kill_restart_node, [Node, Spec, DBOpts]). kill_restart_node(Node, Spec, DBOpts) -> ok = emqx_cth_peer:kill(Node), + ?tp(test_cluster_node_killed, #{node => Node}), _ = emqx_cth_cluster:restart(Spec), ok = erpc:call(Node, emqx_ds, open_db, [?DB, DBOpts]). diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index e93bb33be..dec9eea80 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -314,7 +314,7 @@ do_flush( ?tp( debug, emqx_ds_buffer_flush_failed, - #{db => DB, shard => Shard, error => Err} + #{db => DB, shard => Shard, batch => Messages, error => Err} ), emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics), Reply = diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 0b6b634c5..af41df1ad 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -266,15 +266,18 @@ verify_stream_effects(DB, TestCase, Node, ClientId, ExpectedStream) -> ct:pal("Checking consistency of effects for ~p on ~p", [ClientId, Node]), ?defer_assert( begin - snabbkaffe_diff:assert_lists_eq( + diff_messages( ExpectedStream, - ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node), - message_diff_options([id, qos, from, flags, headers, topic, payload, extra]) + ds_topic_stream(DB, ClientId, client_topic(TestCase, ClientId), Node) ), ct:pal("Data for client ~p on ~p is consistent.", [ClientId, Node]) end ). +diff_messages(Expected, Got) -> + Fields = [id, qos, from, flags, headers, topic, payload, extra], + diff_messages(Fields, Expected, Got). + diff_messages(Fields, Expected, Got) -> snabbkaffe_diff:assert_lists_eq(Expected, Got, message_diff_options(Fields)). From e1de18ef102cb5da61d429f48a66dbb03ab11521 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 26 Jun 2024 20:42:59 +0200 Subject: [PATCH 19/19] test(dsrepl): await stable state before running testcase --- apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index fe0a30489..d3eed99df 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -140,6 +140,7 @@ t_replication_transfers_snapshots(Config) -> %% Stop the DB on the "offline" node. ok = emqx_cth_cluster:stop_node(NodeOffline), + _ = ?block_until(#{?snk_kind := ds_ra_state_enter, state := leader}, 500, 0), %% Fill the storage with messages and few additional generations. emqx_ds_test_helpers:apply_stream(?DB, Nodes -- [NodeOffline], Stream),