refactor(dsstore): keep passing `Options` to both prepare + commit

This commit is contained in:
Andrew Mayorov 2024-06-24 13:04:13 +02:00
parent 5b5f33c421
commit 733751fadd
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 32 additions and 26 deletions

View File

@ -28,7 +28,7 @@
create/5,
open/5,
drop/5,
prepare_batch/3,
prepare_batch/4,
commit_batch/4,
get_streams/4,
get_delete_streams/4,
@ -269,10 +269,11 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) ->
-spec prepare_batch(
emqx_ds_storage_layer:shard_id(),
s(),
[{emqx_ds:time(), emqx_types:message()}, ...]
[{emqx_ds:time(), emqx_types:message()}, ...],
emqx_ds_storage_layer:batch_store_opts()
) ->
{ok, cooked_batch()}.
prepare_batch(_ShardId, S, Messages) ->
prepare_batch(_ShardId, S, Messages, _Options) ->
_ = erase(?lts_persist_ops),
{Payloads, MaxTs} =
lists:mapfoldl(
@ -294,13 +295,13 @@ prepare_batch(_ShardId, S, Messages) ->
emqx_ds_storage_layer:shard_id(),
s(),
cooked_batch(),
emqx_ds_storage_layer:db_write_opts()
emqx_ds_storage_layer:batch_store_opts()
) -> ok | emqx_ds:error(_).
commit_batch(
_ShardId,
_Data,
#{?cooked_payloads := [], ?cooked_lts_ops := LTS},
_WriteOpts
_Options
) ->
%% Assert:
[] = LTS,
@ -309,7 +310,7 @@ commit_batch(
_ShardId,
#s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars},
#{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs},
WriteOpts
Options
) ->
{ok, Batch} = rocksdb:batch(),
%% Commit LTS trie to the storage:
@ -328,7 +329,7 @@ commit_batch(
end,
Payloads
),
Result = rocksdb:write_batch(DB, Batch, WriteOpts),
Result = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
rocksdb:release_batch(Batch),
ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}),
%% NOTE
@ -966,6 +967,13 @@ pop_lts_persist_ops() ->
L
end.
-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) ->
_RocksDBOpts :: [{atom(), _}].
write_batch_opts(#{durable := false}) ->
[{disable_wal, true}];
write_batch_opts(#{}) ->
[].
-ifdef(TEST).
serialize(Msg) ->

View File

@ -71,8 +71,7 @@
options/0,
prototype/0,
cooked_batch/0,
batch_store_opts/0,
db_write_opts/0
batch_store_opts/0
]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -223,9 +222,6 @@
%% Generation callbacks
%%================================================================================
%% See: `rocksdb:write_options()'.
-type db_write_opts() :: [_Option].
%% Create the new schema given generation id and the options.
%% Create rocksdb column families.
-callback create(
@ -248,7 +244,8 @@
-callback prepare_batch(
shard_id(),
generation_data(),
[{emqx_ds:time(), emqx_types:message()}, ...]
[{emqx_ds:time(), emqx_types:message()}, ...],
batch_store_opts()
) ->
{ok, term()} | emqx_ds:error(_).
@ -256,7 +253,7 @@
shard_id(),
generation_data(),
_CookedBatch,
db_write_opts()
batch_store_opts()
) -> ok | emqx_ds:error(_).
-callback get_streams(
@ -342,7 +339,7 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
{GenId, #{module := Mod, data := GenData}} ->
T0 = erlang:monotonic_time(microsecond),
Result =
case Mod:prepare_batch(Shard, GenData, Messages) of
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
{ok, CookedBatch} ->
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
@ -365,9 +362,8 @@ prepare_batch(_Shard, [], _Options) ->
) -> emqx_ds:store_batch_result().
commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}, Options) ->
#{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard),
WriteOptions = mk_write_options(Options),
T0 = erlang:monotonic_time(microsecond),
Result = Mod:commit_batch(Shard, GenData, CookedBatch, WriteOptions),
Result = Mod:commit_batch(Shard, GenData, CookedBatch, Options),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result.
@ -1025,11 +1021,6 @@ handle_event(Shard, Time, Event) ->
%%--------------------------------------------------------------------------------
mk_write_options(#{durable := false}) ->
[{disable_wal, true}];
mk_write_options(#{}) ->
[].
-spec cf_names(cf_refs()) -> [string()].
cf_names(CFRefs) ->
{CFNames, _CFHandles} = lists:unzip(CFRefs),

View File

@ -31,7 +31,7 @@
create/5,
open/5,
drop/5,
prepare_batch/3,
prepare_batch/4,
commit_batch/4,
get_streams/4,
get_delete_streams/4,
@ -102,10 +102,10 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
ok = rocksdb:drop_column_family(DBHandle, CFHandle),
ok.
prepare_batch(_ShardId, _Data, Messages) ->
prepare_batch(_ShardId, _Data, Messages, _Options) ->
{ok, Messages}.
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) ->
commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, Options) ->
{ok, Batch} = rocksdb:batch(),
lists:foreach(
fun({TS, Msg}) ->
@ -115,7 +115,7 @@ commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages, WriteOpts) ->
end,
Messages
),
Res = rocksdb:write_batch(DB, Batch, WriteOpts),
Res = rocksdb:write_batch(DB, Batch, write_batch_opts(Options)),
rocksdb:release_batch(Batch),
Res.
@ -284,3 +284,10 @@ do_delete_next(
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) ->
"emqx_ds_storage_reference" ++ integer_to_list(GenId).
-spec write_batch_opts(emqx_ds_storage_layer:batch_store_opts()) ->
_RocksDBOpts :: [{atom(), _}].
write_batch_opts(#{durable := false}) ->
[{disable_wal, true}];
write_batch_opts(#{}) ->
[].