chore(dsrepl): improve tracepoints usability a bit

This commit is contained in:
Andrew Mayorov 2024-06-10 18:21:47 +02:00
parent 10e9fed22b
commit b6894c18fa
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 14 additions and 11 deletions

View File

@ -143,7 +143,12 @@
%% Core state of the replication, i.e. the state of ra machine.
-type ra_state() :: #{
%% Shard ID.
db_shard := {emqx_ds:db(), shard_id()},
%% Unique timestamp tracking real time closely.
%% With microsecond granularity it should be nearly impossible for it to run
%% too far ahead of the real time clock.
latest := timestamp_us()
}.
@ -755,11 +760,7 @@ apply(
},
#{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0
) ->
%% NOTE
%% Unique timestamp tracking real time closely.
%% With microsecond granularity it should be nearly impossible for it to run
%% too far ahead than the real time clock.
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}),
?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}),
{Latest, Messages} = assign_timestamps(Latest0, MessagesIn),
Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}),
State = State0#{latest := Latest},
@ -839,7 +840,7 @@ apply(
tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) ->
%% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard),
{Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest),
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}),
?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}),
handle_custom_event(DBShard, Timestamp, tick).
assign_timestamps(Latest, Messages) ->

View File

@ -69,6 +69,7 @@ prepare(Index, State) ->
-spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) ->
ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}.
write(Dir, Meta, MachineState) ->
?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}),
ra_log_snapshot:write(Dir, Meta, MachineState).
%% Reading a snapshot.
@ -165,6 +166,7 @@ complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) ->
-spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) ->
{ok, ws()}.
begin_accept(Dir, Meta) ->
?tp(dsrepl_snapshot_accept_started, #{meta => Meta}),
WS = #ws{
phase = machine_state,
started_at = erlang:monotonic_time(millisecond),
@ -207,7 +209,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0})
?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}),
_ = emqx_ds_storage_snapshot:release_writer(SnapWriter),
Result = complete_accept(WS#ws{writer = SnapWriter}),
?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}),
?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS), state => WS#ws.state}),
Result;
{error, Reason} ->
?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}),
@ -218,7 +220,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0})
complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) ->
ShardId = shard_id(WS),
logger:info(#{
msg => "dsrepl_snapshot_read_complete",
msg => "dsrepl_snapshot_write_complete",
shard => ShardId,
duration_ms => erlang:monotonic_time(millisecond) - StartedAt,
bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter)

View File

@ -312,9 +312,6 @@ store_batch(Shard, Messages, Options) ->
prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options
}),
%% FIXME: always store messages in the current generation
case generation_at(Shard, Time) of
{GenId, #{module := Mod, data := GenData}} ->
@ -322,6 +319,9 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of
{ok, CookedBatch} ->
?tp(emqx_ds_storage_layer_batch_cooked, #{
shard => Shard, gen => GenId, batch => CookedBatch
}),
{ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
Error = {error, _, _} ->
Error