From b6894c18fa034a636d683a84f6b9eee9cb495043 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 10 Jun 2024 18:21:47 +0200 Subject: [PATCH] chore(dsrepl): improve tracepoints usability a bit --- .../src/emqx_ds_replication_layer.erl | 13 +++++++------ .../src/emqx_ds_replication_snapshot.erl | 6 ++++-- .../src/emqx_ds_storage_layer.erl | 6 +++--- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 0a1173e70..34cb5fc4d 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -143,7 +143,12 @@ %% Core state of the replication, i.e. the state of ra machine. -type ra_state() :: #{ + %% Shard ID. db_shard := {emqx_ds:db(), shard_id()}, + + %% Unique timestamp tracking real time closely. + %% With microsecond granularity it should be nearly impossible for it to run + %% too far ahead of the real time clock. latest := timestamp_us() }. @@ -755,11 +760,7 @@ apply( }, #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State0 ) -> - %% NOTE - %% Unique timestamp tracking real time closely. - %% With microsecond granularity it should be nearly impossible for it to run - %% too far ahead than the real time clock. - ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, ts => Latest0}), + ?tp(ds_ra_apply_batch, #{db => DB, shard => Shard, batch => MessagesIn, latest => Latest0}), {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), State = State0#{latest := Latest}, @@ -839,7 +840,7 @@ apply( tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = ensure_monotonic_timestamp(timestamp_to_timeus(TimeMs), Latest), - ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), + ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, timestamp => Timestamp}), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl index 9267aee77..58b8e4f0a 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_snapshot.erl @@ -69,6 +69,7 @@ prepare(Index, State) -> -spec write(_SnapshotDir :: file:filename(), ra_snapshot:meta(), _State :: ra_state()) -> ok | {ok, _BytesWritten :: non_neg_integer()} | {error, ra_snapshot:file_err()}. write(Dir, Meta, MachineState) -> + ?tp(dsrepl_snapshot_write, #{meta => Meta, state => MachineState}), ra_log_snapshot:write(Dir, Meta, MachineState). %% Reading a snapshot. @@ -165,6 +166,7 @@ complete_read(RS = #rs{reader = SnapReader, started_at = StartedAt}) -> -spec begin_accept(_SnapshotDir :: file:filename(), ra_snapshot:meta()) -> {ok, ws()}. begin_accept(Dir, Meta) -> + ?tp(dsrepl_snapshot_accept_started, #{meta => Meta}), WS = #ws{ phase = machine_state, started_at = erlang:monotonic_time(millisecond), @@ -207,7 +209,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) ?tp(dsrepl_snapshot_write_complete, #{writer => SnapWriter}), _ = emqx_ds_storage_snapshot:release_writer(SnapWriter), Result = complete_accept(WS#ws{writer = SnapWriter}), - ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS)}), + ?tp(dsrepl_snapshot_accepted, #{shard => shard_id(WS), state => WS#ws.state}), Result; {error, Reason} -> ?tp(dsrepl_snapshot_write_error, #{reason => Reason, writer => SnapWriter0}), @@ -218,7 +220,7 @@ complete_accept(Chunk, WS = #ws{phase = storage_snapshot, writer = SnapWriter0}) complete_accept(WS = #ws{started_at = StartedAt, writer = SnapWriter}) -> ShardId = shard_id(WS), logger:info(#{ - msg => "dsrepl_snapshot_read_complete", + msg => "dsrepl_snapshot_write_complete", shard => ShardId, duration_ms => erlang:monotonic_time(millisecond) - StartedAt, bytes_written => emqx_ds_storage_snapshot:writer_info(bytes_written, SnapWriter) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 818d0bcb7..c31a6bc52 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -312,9 +312,6 @@ store_batch(Shard, Messages, Options) -> prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. - ?tp(emqx_ds_storage_layer_prepare_batch, #{ - shard => Shard, messages => Messages, options => Options - }), %% FIXME: always store messages in the current generation case generation_at(Shard, Time) of {GenId, #{module := Mod, data := GenData}} -> @@ -322,6 +319,9 @@ prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> Result = case Mod:prepare_batch(Shard, GenData, Messages, Options) of {ok, CookedBatch} -> + ?tp(emqx_ds_storage_layer_batch_cooked, #{ + shard => Shard, gen => GenId, batch => CookedBatch + }), {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; Error = {error, _, _} -> Error