feat(dsstore): make WAL-less mode optional

And make the upper layer choose when to use it.
This commit is contained in:
Andrew Mayorov 2024-06-11 14:58:34 +02:00
parent 0c0757b8c2
commit 7895e9cc45
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 74 additions and 37 deletions

View File

@ -57,6 +57,7 @@
ra_store_batch/3 ra_store_batch/3
]). ]).
-behaviour(ra_machine).
-export([ -export([
init/1, init/1,
apply/3, apply/3,
@ -768,7 +769,7 @@ apply(
) -> ) ->
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}),
{Latest, Messages} = assign_timestamps(Latest0, MessagesIn), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{durable => false}),
State = State0#{latest := Latest}, State = State0#{latest := Latest},
set_ts(DBShard, Latest), set_ts(DBShard, Latest),
Effects = try_release_log(RaftMeta, State), Effects = try_release_log(RaftMeta, State),

View File

@ -28,8 +28,8 @@
create/5, create/5,
open/5, open/5,
drop/5, drop/5,
prepare_batch/4, prepare_batch/3,
commit_batch/3, commit_batch/4,
get_streams/4, get_streams/4,
get_delete_streams/4, get_delete_streams/4,
make_iterator/5, make_iterator/5,
@ -269,11 +269,10 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
-spec prepare_batch( -spec prepare_batch(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
s(), s(),
[{emqx_ds:time(), emqx_types:message()}, ...], [{emqx_ds:time(), emqx_types:message()}, ...]
emqx_ds:message_store_opts()
) -> ) ->
{ok, cooked_batch()}. {ok, cooked_batch()}.
prepare_batch(_ShardId, S, Messages, _Options) -> prepare_batch(_ShardId, S, Messages) ->
_ = erase(?lts_persist_ops), _ = erase(?lts_persist_ops),
{Payloads, MaxTs} = {Payloads, MaxTs} =
lists:mapfoldl( lists:mapfoldl(
@ -294,12 +293,14 @@ prepare_batch(_ShardId, S, Messages, _Options) ->
-spec commit_batch( -spec commit_batch(
emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:shard_id(),
s(), s(),
cooked_batch() cooked_batch(),
emqx_ds_storage_layer:db_write_opts()
) -> ok | emqx_ds:error(_). ) -> ok | emqx_ds:error(_).
commit_batch( commit_batch(
_ShardId, _ShardId,
_Data, _Data,
#{?cooked_payloads := [], ?cooked_lts_ops := LTS} #{?cooked_payloads := [], ?cooked_lts_ops := LTS},
_WriteOpts
) -> ) ->
%% Assert: %% Assert:
[] = LTS, [] = LTS,
@ -307,7 +308,8 @@ commit_batch(
commit_batch( commit_batch(
_ShardId, _ShardId,
#s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs},
WriteOpts
) -> ) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
%% Commit LTS trie to the storage: %% Commit LTS trie to the storage:
@ -326,7 +328,7 @@ commit_batch(
end, end,
Payloads Payloads
), ),
Result = rocksdb:write_batch(DB, Batch, [{disable_wal, true}]), Result = rocksdb:write_batch(DB, Batch, WriteOpts),
rocksdb:release_batch(Batch), rocksdb:release_batch(Batch),
ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}),
%% NOTE %% NOTE

View File

@ -27,7 +27,7 @@
%% Data %% Data
store_batch/3, store_batch/3,
prepare_batch/3, prepare_batch/3,
commit_batch/2, commit_batch/3,
get_streams/3, get_streams/3,
get_delete_streams/3, get_delete_streams/3,
@ -70,7 +70,9 @@
shard_id/0, shard_id/0,
options/0, options/0,
prototype/0, prototype/0,
cooked_batch/0 cooked_batch/0,
batch_store_opts/0,
db_write_opts/0
]). ]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -113,6 +115,23 @@
-type gen_id() :: 0..16#ffff. -type gen_id() :: 0..16#ffff.
%% Options affecting how batches should be stored.
%% See also: `emqx_ds:message_store_opts()'.
-type batch_store_opts() ::
#{
%% Whether the whole batch given to `store_batch' should be inserted atomically as
%% a unit. Default: `false'.
atomic => boolean(),
%% Should the storage make sure that the batch is written durably? Non-durable
%% writes are in general unsafe but require much less resources, i.e. with RocksDB
%% non-durable (WAL-less) writes do not usually involve _any_ disk I/O.
%% Default: `true'.
durable => boolean()
}.
%% Options affecting how batches should be prepared.
-type batch_prepare_opts() :: #{}.
%% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6 %% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6
-opaque stream_v1() :: -opaque stream_v1() ::
#{ #{
@ -203,6 +222,9 @@
%% Generation callbacks %% Generation callbacks
%%================================================================================ %%================================================================================
%% See: `rocksdb:write_options()'.
-type db_write_opts() :: [_Option].
%% Create the new schema given generation id and the options. %% Create the new schema given generation id and the options.
%% Create rocksdb column families. %% Create rocksdb column families.
-callback create( -callback create(
@ -225,15 +247,15 @@
-callback prepare_batch( -callback prepare_batch(
shard_id(), shard_id(),
generation_data(), generation_data(),
[{emqx_ds:time(), emqx_types:message()}, ...], [{emqx_ds:time(), emqx_types:message()}, ...]
emqx_ds:message_store_opts()
) -> ) ->
{ok, term()} | emqx_ds:error(_). {ok, term()} | emqx_ds:error(_).
-callback commit_batch( -callback commit_batch(
shard_id(), shard_id(),
generation_data(), generation_data(),
_CookedBatch _CookedBatch,
db_write_opts()
) -> ok | emqx_ds:error(_). ) -> ok | emqx_ds:error(_).
-callback get_streams( -callback get_streams(
@ -290,16 +312,16 @@ drop_shard(Shard) ->
-spec store_batch( -spec store_batch(
shard_id(), shard_id(),
[{emqx_ds:time(), emqx_types:message()}], [{emqx_ds:time(), emqx_types:message()}],
emqx_ds:message_store_opts() batch_store_opts()
) -> ) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) -> store_batch(Shard, Messages, Options) ->
?tp(emqx_ds_storage_layer_store_batch, #{ ?tp(emqx_ds_storage_layer_store_batch, #{
shard => Shard, messages => Messages, options => Options shard => Shard, messages => Messages, options => Options
}), }),
case prepare_batch(Shard, Messages, Options) of case prepare_batch(Shard, Messages, #{}) of
{ok, CookedBatch} -> {ok, CookedBatch} ->
commit_batch(Shard, CookedBatch); commit_batch(Shard, CookedBatch, Options);
ignore -> ignore ->
ok; ok;
Error = {error, _, _} -> Error = {error, _, _} ->
@ -309,9 +331,9 @@ store_batch(Shard, Messages, Options) ->
-spec prepare_batch( -spec prepare_batch(
shard_id(), shard_id(),
[{emqx_ds:time(), emqx_types:message()}], [{emqx_ds:time(), emqx_types:message()}],
emqx_ds:message_store_opts() batch_prepare_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> prepare_batch(Shard, Messages = [{Time, _} | _], _Options) ->
%% NOTE %% NOTE
%% We assume that batches do not span generations. Callers should enforce this. %% We assume that batches do not span generations. Callers should enforce this.
%% FIXME: always store messages in the current generation %% FIXME: always store messages in the current generation
@ -319,7 +341,7 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
{GenId, #{module := Mod, data := GenData}} -> {GenId, #{module := Mod, data := GenData}} ->
T0 = erlang:monotonic_time(microsecond), T0 = erlang:monotonic_time(microsecond),
Result = Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of case Mod:prepare_batch(Shard, GenData, Messages) of
{ok, CookedBatch} -> {ok, CookedBatch} ->
?tp(emqx_ds_storage_layer_batch_cooked, #{ ?tp(emqx_ds_storage_layer_batch_cooked, #{
shard => Shard, gen => GenId, batch => CookedBatch shard => Shard, gen => GenId, batch => CookedBatch
@ -338,11 +360,16 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
prepare_batch(_Shard, [], _Options) -> prepare_batch(_Shard, [], _Options) ->
ignore. ignore.
-spec commit_batch(shard_id(), cooked_batch()) -> emqx_ds:store_batch_result(). -spec commit_batch(
commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> shard_id(),
cooked_batch(),
batch_store_opts()
) -> emqx_ds:store_batch_result().
commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) ->
#{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard),
WriteOptions = mk_write_options(Options),
T0 = erlang:monotonic_time(microsecond), T0 = erlang:monotonic_time(microsecond),
Result = Mod:commit_batch(Shard, GenData, CookedBatch), Result = Mod:commit_batch(Shard, GenData, CookedBatch, WriteOptions),
T1 = erlang:monotonic_time(microsecond), T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result. Result.
@ -994,6 +1021,13 @@ handle_event(Shard, Time, Event) ->
GenId = generation_current(Shard), GenId = generation_current(Shard),
handle_event(Shard, Time, ?mk_storage_event(GenId, Event)). handle_event(Shard, Time, ?mk_storage_event(GenId, Event)).
%%--------------------------------------------------------------------------------
mk_write_options(#{durable := false}) ->
[{disable_wal, true}];
mk_write_options(#{}) ->
[].
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
%% Schema access %% Schema access
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------

View File

@ -31,8 +31,8 @@
create/5, create/5,
open/5, open/5,
drop/5, drop/5,
prepare_batch/4, prepare_batch/3,
commit_batch/3, commit_batch/4,
get_streams/4, get_streams/4,
get_delete_streams/4, get_delete_streams/4,
make_iterator/5, make_iterator/5,
@ -102,10 +102,10 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok = rocksdb:drop_column_family(DBHandle, CFHandle),
ok. ok.
prepare_batch(_ShardId, _Data, Messages, _Options) -> prepare_batch(_ShardId, _Data, Messages) ->
{ok, Messages}. {ok, Messages}.
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) ->
{ok, Batch} = rocksdb:batch(), {ok, Batch} = rocksdb:batch(),
lists:foreach( lists:foreach(
fun({TS, Msg}) -> fun({TS, Msg}) ->
@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) ->
end, end,
Messages Messages
), ),
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = [{disable_wal, true}]), Res = rocksdb:write_batch(DB, Batch, WriteOpts),
rocksdb:release_batch(Batch), rocksdb:release_batch(Batch),
Res. Res.

View File

@ -64,7 +64,7 @@ t_iterate(_Config) ->
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
|| Topic <- Topics, PublishedAt <- Timestamps || Topic <- Topics, PublishedAt <- Timestamps
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
%% Iterate through individual topics: %% Iterate through individual topics:
[ [
begin begin
@ -94,7 +94,7 @@ t_delete(_Config) ->
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
|| Topic <- Topics, PublishedAt <- Timestamps || Topic <- Topics, PublishedAt <- Timestamps
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
%% Iterate through topics: %% Iterate through topics:
StartTime = 0, StartTime = 0,
@ -125,7 +125,7 @@ t_get_streams(_Config) ->
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
|| Topic <- Topics, PublishedAt <- Timestamps || Topic <- Topics, PublishedAt <- Timestamps
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch, #{}),
GetStream = fun(Topic) -> GetStream = fun(Topic) ->
StartTime = 0, StartTime = 0,
emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime) emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), StartTime)
@ -152,7 +152,7 @@ t_get_streams(_Config) ->
end end
|| I <- lists:seq(1, 200) || I <- lists:seq(1, 200)
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, NewBatch, #{}),
%% Check that "foo/bar/baz" topic now appears in two streams: %% Check that "foo/bar/baz" topic now appears in two streams:
%% "foo/bar/baz" and "foo/bar/+": %% "foo/bar/baz" and "foo/bar/+":
NewStreams = lists:sort(GetStream("foo/bar/baz")), NewStreams = lists:sort(GetStream("foo/bar/baz")),
@ -180,7 +180,7 @@ t_new_generation_inherit_trie(_Config) ->
|| I <- lists:seq(1, 200), || I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>] Suffix <- [<<"foo">>, <<"bar">>]
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
%% Now we create a new generation with the same LTS module. It should inherit the %% Now we create a new generation with the same LTS module. It should inherit the
%% learned trie. %% learned trie.
ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000), ok = emqx_ds_storage_layer:add_generation(?SHARD, _Since = 1_000),
@ -194,7 +194,7 @@ t_new_generation_inherit_trie(_Config) ->
|| I <- lists:seq(1, 200), || I <- lists:seq(1, 200),
Suffix <- [<<"foo">>, <<"bar">>] Suffix <- [<<"foo">>, <<"bar">>]
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
%% We should get only two streams for wildcard query, for "foo" and for "bar". %% We should get only two streams for wildcard query, for "foo" and for "bar".
?assertMatch( ?assertMatch(
[_Foo, _Bar], [_Foo, _Bar],
@ -217,13 +217,13 @@ t_replay(_Config) ->
{PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))} {PublishedAt, make_message(PublishedAt, Topic, integer_to_binary(PublishedAt))}
|| Topic <- Topics, PublishedAt <- Timestamps || Topic <- Topics, PublishedAt <- Timestamps
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch1, #{}),
%% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar': %% Create wildcard topics `wildcard/+/suffix/foo' and `wildcard/+/suffix/bar':
Batch2 = [ Batch2 = [
{TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))} {TS, make_message(TS, make_topic([wildcard, I, suffix, Suffix]), bin(TS))}
|| I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>]
], ],
ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, #{}),
%% Check various topic filters: %% Check various topic filters:
Messages = [M || {_TS, M} <- Batch1 ++ Batch2], Messages = [M || {_TS, M} <- Batch1 ++ Batch2],
%% Missing topics (no ghost messages): %% Missing topics (no ghost messages):