From 19cc7c34a5d0020b7abb0dd73b063ebb01c518ca Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 29 Feb 2024 17:17:44 +0100 Subject: [PATCH] wip: disable rocksdb wal --- .../src/emqx_ds_storage_bitfield_lts.erl | 4 ++-- apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 332fd0c9b..37c186dec 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -235,7 +235,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> fun(Msg) -> {Key, _} = make_key(S, Msg), Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) + rocksdb:put(DB, Data, Key, Val, [{disable_wal, true}]) end, Messages ). @@ -511,7 +511,7 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). restore_trie(TopicIndexBytes, DB, CF) -> PersistCallback = fun(Key, Val) -> - rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) + rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), [{disable_wal, true}]) end, {ok, IT} = rocksdb:iterator(DB, CF, []), try diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e0bf1fa1b..d78255a21 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -582,7 +582,9 @@ rocksdb_open(Shard, Options) -> DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true}, - {enable_write_thread_adaptive_yield, false} + % {enable_write_thread_adaptive_yield, false}, + {manual_wal_flush, true}, + {atomic_flush, true} | maps:get(db_options, Options, []) ], DBDir = db_dir(Shard), @@ -707,6 +709,6 @@ get_schema_persistent(DB) -> -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok. put_schema_persistent(DB, Schema) -> Blob = term_to_binary(Schema), - rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []). + rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, [{disable_wal, true}]). -undef(ROCKSDB_SCHEMA_KEY).