feat(dsrepl): enable WAL-less batch writes

This commit is contained in:
Andrew Mayorov 2024-06-10 18:27:02 +02:00
parent 2705226eb5
commit 0c0757b8c2
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 44 additions and 3 deletions

View File

@ -70,6 +70,7 @@ prepare(Index, State) ->
ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
write(Dir, Meta, MachineState) ->
?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}),
ok = emqx_ds_storage_layer:flush(shard_id(MachineState)),
ra_log_snapshot:write(Dir, Meta, MachineState).
%% Reading a snapshot.
@ -229,7 +230,7 @@ complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
write_machine_snapshot(WS).
write_machine_snapshot(#ws{dir = Dir, meta = Meta, state = MachineState}) ->
write(Dir, Meta, MachineState).
ra_log_snapshot:write(Dir, Meta, MachineState).
%% Restoring machine state from a snapshot.
%% This is equivalent to restoring from a log snapshot.

View File

@ -326,7 +326,7 @@ commit_batch(
end,
Payloads
),
Result = rocksdb:write_batch(DB, Batch, []),
Result = rocksdb:write_batch(DB, Batch, [{disable_wal, true}]),
rocksdb:release_batch(Batch),
ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}),
%% NOTE

View File

@ -44,6 +44,7 @@
drop_generation/2,
%% Snapshotting
flush/1,
take_snapshot/1,
accept_snapshot/1,
@ -279,6 +280,7 @@
-record(call_update_config, {options :: emqx_ds:create_db_opts(), since :: emqx_ds:time()}).
-record(call_list_generations_with_lifetimes, {}).
-record(call_drop_generation, {gen_id :: gen_id()}).
-record(call_flush, {}).
-record(call_take_snapshot, {}).
-spec drop_shard(shard_id()) -> ok.
@ -539,6 +541,10 @@ shard_info(ShardId, status) ->
error:badarg -> down
end.
-spec flush(shard_id()) -> ok | {error, _}.
flush(ShardId) ->
gen_server:call(?REF(ShardId), #call_flush{}, infinity).
-spec take_snapshot(shard_id()) -> {ok, emqx_ds_storage_snapshot:reader()} | {error, _Reason}.
take_snapshot(ShardId) ->
case gen_server:call(?REF(ShardId), #call_take_snapshot{}, infinity) of
@ -566,6 +572,7 @@ start_link(Shard = {_, _}, Options) ->
shard_id :: shard_id(),
db :: rocksdb:db_handle(),
cf_refs :: cf_refs(),
cf_need_flush :: gen_id(),
schema :: shard_schema(),
shard :: shard()
}).
@ -591,10 +598,12 @@ init({ShardId, Options}) ->
{Scm, CFRefs0}
end,
Shard = open_shard(ShardId, DB, CFRefs, Schema),
CurrentGenId = maps:get(current_generation, Schema),
S = #s{
shard_id = ShardId,
db = DB,
cf_refs = CFRefs,
cf_need_flush = CurrentGenId,
schema = Schema,
shard = Shard
},
@ -635,6 +644,9 @@ handle_call(#call_list_generations_with_lifetimes{}, _From, S) ->
handle_call(#call_drop_generation{gen_id = GenId}, _From, S0) ->
{Reply, S} = handle_drop_generation(S0, GenId),
{reply, Reply, S};
handle_call(#call_flush{}, _From, S0) ->
{Reply, S} = handle_flush(S0),
{reply, Reply, S};
handle_call(#call_take_snapshot{}, _From, S) ->
Snapshot = handle_take_snapshot(S),
{reply, Snapshot, S};
@ -866,6 +878,10 @@ rocksdb_open(Shard, Options) ->
DBOptions = [
{create_if_missing, true},
{create_missing_column_families, true},
%% NOTE
%% With WAL-less writes, it's important to have CFs flushed atomically.
%% For example, bitfield-lts backend needs data + trie CFs to be consistent.
{atomic_flush, true},
{enable_write_thread_adaptive_yield, false}
| maps:get(db_options, Options, [])
],
@ -921,6 +937,30 @@ update_last_until(Schema = #{current_generation := GenId}, Until) ->
{error, overlaps_existing_generations}
end.
handle_flush(S = #s{db = DB, cf_need_flush = NeedFlushGenId, schema = Schema}) ->
%% NOTE
%% There could have been few generations added since the last time `flush/1` was
%% called. Strictly speaking, we don't need to flush them all at once as part of
%% a single atomic flush, but the error handling is a bit easier this way.
CurrentGenId = maps:get(current_generation, Schema),
GenIds = lists:seq(NeedFlushGenId, CurrentGenId),
CFHandles = lists:flatmap(
fun(GenId) ->
#{?GEN_KEY(GenId) := #{cf_refs := CFRefs}} = Schema,
{_, CFHandles} = lists:unzip(CFRefs),
CFHandles
end,
GenIds
),
case rocksdb:flush(DB, CFHandles, [{wait, true}]) of
ok ->
%% Current generation will always need a flush.
?tp(ds_storage_flush_complete, #{gens => GenIds, cfs => CFHandles}),
{ok, S#s{cf_need_flush = CurrentGenId}};
{error, _} = Error ->
{Error, S}
end.
handle_take_snapshot(#s{db = DB, shard_id = ShardId}) ->
Name = integer_to_list(erlang:system_time(millisecond)),
Dir = checkpoint_dir(ShardId, Name),

View File

@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) ->
end,
Messages
),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = [{disable_wal, true}]),
rocksdb:release_batch(Batch),
Res.