From 0c0757b8c20ce939b1e5fdd1faf30568906b87dd Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 10 Jun 2024 18:27:02 +0200 Subject: [PATCH] feat(dsrepl): enable WAL-less batch writes --- .../src/emqx_ds_replication_snapshot.erl | 3 +- .../src/emqx_ds_storage_bitfield_lts.erl | 2 +- .../src/emqx_ds_storage_layer.erl | 40 +++++++++++++++++++ .../src/emqx_ds_storage_reference.erl | 2 +- 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl index 58b8e4f0a..3a62b3b0f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl @@ -70,6 +70,7 @@ prepare(Index, State) -> ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}. write(Dir, Meta, MachineState) -> ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}), + ok = emqx_ds_storage_layer:flush(shard_id(MachineState)), ra_log_snapshot:write(Dir, Meta, MachineState). %% Reading a snapshot. @@ -229,7 +230,7 @@ complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> write_machine_snapshot(WS). write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) -> - write(Dir, Meta, MachineState). + ra_log_snapshot:write(Dir, Meta, MachineState). %% Restoring machine state from a snapshot. %% This is equivalent to restoring from a log snapshot. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index c978e416f..d73f3f2ec 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -326,7 +326,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, []), + Result = rocksdb:write_batch(DB, Batch, [{disable_wal, true}]), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index c31a6bc52..df1c822c4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -44,6 +44,7 @@ drop_generation/2, %% Snapshotting + flush/1, take_snapshot/1, accept_snapshot/1, @@ -279,6 +280,7 @@ -record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}). -record(call_list_generations_with_lifetimes, {}). -record(call_drop_generation, {gen_id :: gen_id()}). +-record(call_flush, {}). -record(call_take_snapshot, {}). -spec drop_shard(shard_id()) -> ok. @@ -539,6 +541,10 @@ shard_info(ShardId, status) -> error:badarg -> down end. +-spec flush(shard_id()) -> ok | {error, _}. +flush(ShardId) -> + gen_server:call(?REF(ShardId), #call_flush{}, infinity). + -spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}. take_snapshot(ShardId) -> case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of @@ -566,6 +572,7 @@ start_link(Shard = {_, _}, Options) -> shard_id :: shard_id(), db :: rocksdb:db_handle(), cf_refs :: cf_refs(), + cf_need_flush :: gen_id(), schema :: shard_schema(), shard :: shard() }). @@ -591,10 +598,12 @@ init({ShardId, Options}) -> {Scm, CFRefs0} end, Shard = open_shard(ShardId, DB, CFRefs, Schema), + CurrentGenId = maps:get(current_generation, Schema), S = #s{ shard_id = ShardId, db = DB, cf_refs = CFRefs, + cf_need_flush = CurrentGenId, schema = Schema, shard = Shard }, @@ -635,6 +644,9 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) -> handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) -> {Reply, S} = handle_drop_generation(S0, GenId), {reply, Reply, S}; +handle_call(#call_flush{}, _From, S0) -> + {Reply, S} = handle_flush(S0), + {reply, Reply, S}; handle_call(#call_take_snapshot{}, _From, S) -> Snapshot = handle_take_snapshot(S), {reply, Snapshot, S}; @@ -866,6 +878,10 @@ rocksdb_open(Shard, Options) -> DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true}, + %% NOTE + %% With WAL-less writes, it's important to have CFs flushed atomically. + %% For example, bitfield-lts backend needs data + trie CFs to be consistent. + {atomic_flush, true}, {enable_write_thread_adaptive_yield, false} | maps:get(db_options, Options, []) ], @@ -921,6 +937,30 @@ update_last_until(Schema = #{current_generation := GenId}, Until) -> {error, overlaps_existing_generations} end. +handle_flush(S = #s{db = DB, cf_need_flush = NeedFlushGenId, schema = Schema}) -> + %% NOTE + %% There could have been few generations added since the last time `flush/1` was + %% called. Strictly speaking, we don't need to flush them all at once as part of + %% a single atomic flush, but the error handling is a bit easier this way. + CurrentGenId = maps:get(current_generation, Schema), + GenIds = lists:seq(NeedFlushGenId, CurrentGenId), + CFHandles = lists:flatmap( + fun(GenId) -> + #{?GEN_KEY(GenId) := #{cf_refs := CFRefs}} = Schema, + {_, CFHandles} = lists:unzip(CFRefs), + CFHandles + end, + GenIds + ), + case rocksdb:flush(DB, CFHandles, [{wait, true}]) of + ok -> + %% Current generation will always need a flush. + ?tp(ds_storage_flush_complete, #{gens => GenIds, cfs => CFHandles}), + {ok, S#s{cf_need_flush = CurrentGenId}}; + {error, _} = Error -> + {Error, S} + end. + handle_take_snapshot(#s{db = DB, shard_id = ShardId}) -> Name = integer_to_list(erlang:system_time(millisecond)), Dir = checkpoint_dir(ShardId, Name), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index b4c3ade3f..c70b80e68 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), + Res = rocksdb:write_batch(DB, Batch, _WriteOptions = [{disable_wal, true}]), rocksdb:release_batch(Batch), Res.