diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index f0eac9652..fc6de2861 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -27,7 +27,9 @@ inc_egress_batches_failed/1, inc_egress_messages/2, inc_egress_bytes/2, - observe_egress_flush_time/2 + + observe_egress_flush_time/2, + observe_next_time/3 ]). %% behavior callbacks: @@ -46,7 +48,7 @@ -define(DB_METRICS, []). --define(SHARD_METRICS, [ +-define(EGRESS_METRICS, [ {counter, 'emqx_ds_egress_batches'}, {counter, 'emqx_ds_egress_batches_retry'}, {counter, 'emqx_ds_egress_batches_failed'}, @@ -55,6 +57,12 @@ {slide, 'emqx_ds_egress_flush_time'} ]). +-define(INGRESS_METRICS, [ + {slide, 'emqx_ds_builtin_next_time'} +]). + +-define(SHARD_METRICS, ?EGRESS_METRICS ++ ?INGRESS_METRICS). + -type shard_metrics_id() :: binary(). %%================================================================================ @@ -112,6 +120,14 @@ inc_egress_bytes(Id, NMessages) -> observe_egress_flush_time(Id, FlushTime) -> emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). +%% @doc Add a sample of elapsed time spent waiting for a +%% `emqx_ds_replication_layer:next' +-spec observe_next_time(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), non_neg_integer()) -> + ok. +observe_next_time(DB, Shard, NextTime) -> + Id = shard_metric_id(DB, Shard), + emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_builtin_next_time', NextTime). + prometheus_meta() -> lists:map( fun diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 14c2268b8..2d4982af3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -329,7 +329,11 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case ra_next(DB, Shard, StorageIter0, BatchSize) of + T0 = erlang:monotonic_time(microsecond), + Result = ra_next(DB, Shard, StorageIter0, BatchSize), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_next_time(DB, Shard, T1 - T0), + case Result of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch};