chore(dsstore): resurrect `prepare_batch` entry tracepoint

This commit is contained in:
Andrew Mayorov 2024-06-24 12:49:59 +02:00
parent 8ff48ac5ea
commit 5b5f33c421
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 4 additions and 7 deletions

View File

@ -317,9 +317,6 @@ drop_shard(Shard) ->
) ->
emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) ->
?tp(emqx_ds_storage_layer_store_batch, #{
shard => Shard, messages => Messages, options => Options
}),
case prepare_batch(Shard, Messages, #{}) of
{ok, CookedBatch} ->
commit_batch(Shard, CookedBatch, Options);
@ -334,9 +331,12 @@ store_batch(Shard, Messages, Options) ->
[{emqx_ds:time(), emqx_types:message()}],
batch_prepare_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _} | _], _Options) ->
prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options
}),
%% FIXME: always store messages in the current generation
case generation_at(Shard, Time) of
{GenId, #{module := Mod, data := GenData}} ->
@ -344,9 +344,6 @@ prepare_batch(Shard, Messages = [{Time, _} | _], _Options) ->
Result =
case Mod:prepare_batch(Shard, GenData, Messages) of
{ok, CookedBatch} ->
?tp(emqx_ds_storage_layer_batch_cooked, #{
shard => Shard, gen => GenId, batch => CookedBatch
}),
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
Error