From 733751fadd21fedf2940e9526a236c3ea8493a0b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 24 Jun 2024 13:04:13 +0200 Subject: [PATCH] refactor(dsstore): keep passing `Options` to both prepare + commit --- .../src/emqx_ds_storage_bitfield_lts.erl | 22 +++++++++++++------ .../src/emqx_ds_storage_layer.erl | 21 +++++------------- .../src/emqx_ds_storage_reference.erl | 15 +++++++++---- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 9c3a930eb..20c3bc087 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,7 +28,7 @@ create/5, open/5, drop/5, - prepare_batch/3, + prepare_batch/4, commit_batch/4, get_streams/4, get_delete_streams/4, @@ -269,10 +269,11 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}, ...] + [{emqx_ds:time(), emqx_types:message()}, ...], + emqx_ds_storage_layer:batch_store_opts() ) -> {ok, cooked_batch()}. -prepare_batch(_ShardId, S, Messages) -> +prepare_batch(_ShardId, S, Messages, _Options) -> _ = erase(?lts_persist_ops), {Payloads, MaxTs} = lists:mapfoldl( @@ -294,13 +295,13 @@ prepare_batch(_ShardId, S, Messages) -> emqx_ds_storage_layer:shard_id(), s(), cooked_batch(), - emqx_ds_storage_layer:db_write_opts() + emqx_ds_storage_layer:batch_store_opts() ) -> ok | emqx_ds:error(_). commit_batch( _ShardId, _Data, #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, - _WriteOpts + _Options ) -> %% Assert: [] = LTS, @@ -309,7 +310,7 @@ commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, - WriteOpts + Options ) -> {ok, Batch} = rocksdb:batch(), %% Commit LTS trie to the storage: @@ -328,7 +329,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, WriteOpts), + Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE @@ -966,6 +967,13 @@ pop_lts_persist_ops() -> L end. +-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) -> + _RocksDBOpts :: [{atom(), _}]. +write_batch_opts(#{durable := false}) -> + [{disable_wal, true}]; +write_batch_opts(#{}) -> + []. + -ifdef(TEST). serialize(Msg) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 1ff63aa51..fe1d36a35 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -71,8 +71,7 @@ options/0, prototype/0, cooked_batch/0, - batch_store_opts/0, - db_write_opts/0 + batch_store_opts/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -223,9 +222,6 @@ %% Generation callbacks %%================================================================================ -%% See: `rocksdb:write_options()'. --type db_write_opts() :: [_Option]. - %% Create the new schema given generation id and the options. %% Create rocksdb column families. -callback create( @@ -248,7 +244,8 @@ -callback prepare_batch( shard_id(), generation_data(), - [{emqx_ds:time(), emqx_types:message()}, ...] + [{emqx_ds:time(), emqx_types:message()}, ...], + batch_store_opts() ) -> {ok, term()} | emqx_ds:error(_). @@ -256,7 +253,7 @@ shard_id(), generation_data(), _CookedBatch, - db_write_opts() + batch_store_opts() ) -> ok | emqx_ds:error(_). -callback get_streams( @@ -342,7 +339,7 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> {GenId, #{module := Mod, data := GenData}} -> T0 = erlang:monotonic_time(microsecond), Result = - case Mod:prepare_batch(Shard, GenData, Messages) of + case Mod:prepare_batch(Shard, GenData, Messages, Options) of {ok, CookedBatch} -> {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> @@ -365,9 +362,8 @@ prepare_batch(_Shard, [], _Options) -> ) -> emqx_ds:store_batch_result(). commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) -> #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), - WriteOptions = mk_write_options(Options), T0 = erlang:monotonic_time(microsecond), - Result = Mod:commit_batch(Shard, GenData, CookedBatch, WriteOptions), + Result = Mod:commit_batch(Shard, GenData, CookedBatch, Options), T1 = erlang:monotonic_time(microsecond), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result. @@ -1025,11 +1021,6 @@ handle_event(Shard, Time, Event) -> %%-------------------------------------------------------------------------------- -mk_write_options(#{durable := false}) -> - [{disable_wal, true}]; -mk_write_options(#{}) -> - []. - -spec cf_names(cf_refs()) -> [string()]. cf_names(CFRefs) -> {CFNames, _CFHandles} = lists:unzip(CFRefs), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 026ae92b6..ca29c11a8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,7 +31,7 @@ create/5, open/5, drop/5, - prepare_batch/3, + prepare_batch/4, commit_batch/4, get_streams/4, get_delete_streams/4, @@ -102,10 +102,10 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -prepare_batch(_ShardId, _Data, Messages) -> +prepare_batch(_ShardId, _Data, Messages, _Options) -> {ok, Messages}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) -> +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun({TS, Msg}) -> @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, WriteOpts), + Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)), rocksdb:release_batch(Batch), Res. @@ -284,3 +284,10 @@ do_delete_next( -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. data_cf(GenId) -> "emqx_ds_storage_reference" ++ integer_to_list(GenId). + +-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) -> + _RocksDBOpts :: [{atom(), _}]. +write_batch_opts(#{durable := false}) -> + [{disable_wal, true}]; +write_batch_opts(#{}) -> + [].