From 7895e9cc45934ddeff1b50add6b11f62ff0d3c1c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 11 Jun 2024 14:58:34 +0200 Subject: [PATCH] feat(dsstore): make WAL-less mode optional And make the upper layer choose when to use it. --- .../src/emqx_ds_replication_layer.erl | 3 +- .../src/emqx_ds_storage_bitfield_lts.erl | 20 +++--- .../src/emqx_ds_storage_layer.erl | 62 ++++++++++++++----- .../src/emqx_ds_storage_reference.erl | 10 +-- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 16 ++--- 5 files changed, 74 insertions(+), 37 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index fe7da3f8f..af0aeec7d 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -57,6 +57,7 @@ ra_store_batch/3 ]). +-behaviour(ra_machine). -export([ init/1, apply/3, @@ -768,7 +769,7 @@ apply( ) -> ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), - Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), + Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}), State = State0#{latest := Latest}, set_ts(DBShard, Latest), Effects = try_release_log(RaftMeta, State), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d73f3f2ec..9c3a930eb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,8 +28,8 @@ create/5, open/5, drop/5, - prepare_batch/4, - commit_batch/3, + prepare_batch/3, + commit_batch/4, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -269,11 +269,10 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> -spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), - [{emqx_ds:time(), emqx_types:message()}, ...], - emqx_ds:message_store_opts() + [{emqx_ds:time(), emqx_types:message()}, ...] ) -> {ok, cooked_batch()}. -prepare_batch(_ShardId, S, Messages, _Options) -> +prepare_batch(_ShardId, S, Messages) -> _ = erase(?lts_persist_ops), {Payloads, MaxTs} = lists:mapfoldl( @@ -294,12 +293,14 @@ prepare_batch(_ShardId, S, Messages, _Options) -> -spec commit_batch( emqx_ds_storage_layer:shard_id(), s(), - cooked_batch() + cooked_batch(), + emqx_ds_storage_layer:db_write_opts() ) -> ok | emqx_ds:error(_). commit_batch( _ShardId, _Data, - #{?cooked_payloads := [], ?cooked_lts_ops := LTS} + #{?cooked_payloads := [], ?cooked_lts_ops := LTS}, + _WriteOpts ) -> %% Assert: [] = LTS, @@ -307,7 +308,8 @@ commit_batch( commit_batch( _ShardId, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, - #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs}, + WriteOpts ) -> {ok, Batch} = rocksdb:batch(), %% Commit LTS trie to the storage: @@ -326,7 +328,7 @@ commit_batch( end, Payloads ), - Result = rocksdb:write_batch(DB, Batch, [{disable_wal, true}]), + Result = rocksdb:write_batch(DB, Batch, WriteOpts), rocksdb:release_batch(Batch), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index df1c822c4..d8624f56b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -27,7 +27,7 @@ %% Data store_batch/3, prepare_batch/3, - commit_batch/2, + commit_batch/3, get_streams/3, get_delete_streams/3, @@ -70,7 +70,9 @@ shard_id/0, options/0, prototype/0, - cooked_batch/0 + cooked_batch/0, + batch_store_opts/0, + db_write_opts/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -113,6 +115,23 @@ -type gen_id() :: 0..16#ffff. +%% Options affecting how batches should be stored. +%% See also: `emqx_ds:message_store_opts()'. +-type batch_store_opts() :: + #{ + %% Whether the whole batch given to `store_batch' should be inserted atomically as + %% a unit. Default: `false'. + atomic => boolean(), + %% Should the storage make sure that the batch is written durably? Non-durable + %% writes are in general unsafe but require much less resources, i.e. with RocksDB + %% non-durable (WAL-less) writes do not usually involve _any_ disk I/O. + %% Default: `true'. + durable => boolean() + }. + +%% Options affecting how batches should be prepared. +-type batch_prepare_opts() :: #{}. + %% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6 -opaque stream_v1() :: #{ @@ -203,6 +222,9 @@ %% Generation callbacks %%================================================================================ +%% See: `rocksdb:write_options()'. +-type db_write_opts() :: [_Option]. + %% Create the new schema given generation id and the options. %% Create rocksdb column families. -callback create( @@ -225,15 +247,15 @@ -callback prepare_batch( shard_id(), generation_data(), - [{emqx_ds:time(), emqx_types:message()}, ...], - emqx_ds:message_store_opts() + [{emqx_ds:time(), emqx_types:message()}, ...] ) -> {ok, term()} | emqx_ds:error(_). -callback commit_batch( shard_id(), generation_data(), - _CookedBatch + _CookedBatch, + db_write_opts() ) -> ok | emqx_ds:error(_). -callback get_streams( @@ -290,16 +312,16 @@ drop_shard(Shard) -> -spec store_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], - emqx_ds:message_store_opts() + batch_store_opts() ) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> ?tp(emqx_ds_storage_layer_store_batch, #{ shard => Shard, messages => Messages, options => Options }), - case prepare_batch(Shard, Messages, Options) of + case prepare_batch(Shard, Messages, #{}) of {ok, CookedBatch} -> - commit_batch(Shard, CookedBatch); + commit_batch(Shard, CookedBatch, Options); ignore -> ok; Error = {error, _, _} -> @@ -309,9 +331,9 @@ store_batch(Shard, Messages, Options) -> -spec prepare_batch( shard_id(), [{emqx_ds:time(), emqx_types:message()}], - emqx_ds:message_store_opts() + batch_prepare_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> +prepare_batch(Shard, Messages = [{Time, _} | _], _Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. %% FIXME: always store messages in the current generation @@ -319,7 +341,7 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> {GenId, #{module := Mod, data := GenData}} -> T0 = erlang:monotonic_time(microsecond), Result = - case Mod:prepare_batch(Shard, GenData, Messages, Options) of + case Mod:prepare_batch(Shard, GenData, Messages) of {ok, CookedBatch} -> ?tp(emqx_ds_storage_layer_batch_cooked, #{ shard => Shard, gen => GenId, batch => CookedBatch @@ -338,11 +360,16 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> prepare_batch(_Shard, [], _Options) -> ignore. --spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result(). -commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> +-spec commit_batch( + shard_id(), + cooked_batch(), + batch_store_opts() +) -> emqx_ds:store_batch_result(). +commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) -> #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), + WriteOptions = mk_write_options(Options), T0 = erlang:monotonic_time(microsecond), - Result = Mod:commit_batch(Shard, GenData, CookedBatch), + Result = Mod:commit_batch(Shard, GenData, CookedBatch, WriteOptions), T1 = erlang:monotonic_time(microsecond), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result. @@ -994,6 +1021,13 @@ handle_event(Shard, Time, Event) -> GenId = generation_current(Shard), handle_event(Shard, Time, ?mk_storage_event(GenId, Event)). +%%-------------------------------------------------------------------------------- + +mk_write_options(#{durable := false}) -> + [{disable_wal, true}]; +mk_write_options(#{}) -> + []. + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index c70b80e68..026ae92b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,8 +31,8 @@ create/5, open/5, drop/5, - prepare_batch/4, - commit_batch/3, + prepare_batch/3, + commit_batch/4, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -102,10 +102,10 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -prepare_batch(_ShardId, _Data, Messages, _Options) -> +prepare_batch(_ShardId, _Data, Messages) -> {ok, Messages}. -commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) -> {ok, Batch} = rocksdb:batch(), lists:foreach( fun({TS, Msg}) -> @@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> end, Messages ), - Res = rocksdb:write_batch(DB, Batch, _WriteOptions = [{disable_wal, true}]), + Res = rocksdb:write_batch(DB, Batch, WriteOpts), rocksdb:release_batch(Batch), Res. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bd0f382b2..866b4d381 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -64,7 +64,7 @@ t_iterate(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), %% Iterate through individual topics: [ begin @@ -94,7 +94,7 @@ t_delete(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), %% Iterate through topics: StartTime = 0, @@ -125,7 +125,7 @@ t_get_streams(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}), GetStream = fun(Topic) -> StartTime = 0, emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime) @@ -152,7 +152,7 @@ t_get_streams(_Config) -> end || I <- lists:seq(1, 200) ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}), %% Check that "foo/bar/baz" topic now appears in two streams: %% "foo/bar/baz" and "foo/bar/+": NewStreams = lists:sort(GetStream("foo/bar/baz")), @@ -180,7 +180,7 @@ t_new_generation_inherit_trie(_Config) -> || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), %% Now we create a new generation with the same LTS module. It should inherit the %% learned trie. ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), @@ -194,7 +194,7 @@ t_new_generation_inherit_trie(_Config) -> || I <- lists:seq(1, 200), Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), %% We should get only two streams for wildcard query, for "foo" and for "bar". ?assertMatch( [_Foo, _Bar], @@ -217,13 +217,13 @@ t_replay(_Config) -> {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} || Topic <- Topics, PublishedAt <- Timestamps ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}), %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': Batch2 = [ {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], - ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), + ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}), %% Check various topic filters: Messages = [M || {_TS, M} <- Batch1 ++ Batch2], %% Missing topics (no ghost messages):