feat(sessds): Observe next time
This commit is contained in:
parent
b9ad241658
commit
f41e538526
|
@ -27,7 +27,9 @@
|
||||||
inc_egress_batches_failed/1,
|
inc_egress_batches_failed/1,
|
||||||
inc_egress_messages/2,
|
inc_egress_messages/2,
|
||||||
inc_egress_bytes/2,
|
inc_egress_bytes/2,
|
||||||
observe_egress_flush_time/2
|
|
||||||
|
observe_egress_flush_time/2,
|
||||||
|
observe_next_time/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
|
@ -46,7 +48,7 @@
|
||||||
|
|
||||||
-define(DB_METRICS, []).
|
-define(DB_METRICS, []).
|
||||||
|
|
||||||
-define(SHARD_METRICS, [
|
-define(EGRESS_METRICS, [
|
||||||
{counter, 'emqx_ds_egress_batches'},
|
{counter, 'emqx_ds_egress_batches'},
|
||||||
{counter, 'emqx_ds_egress_batches_retry'},
|
{counter, 'emqx_ds_egress_batches_retry'},
|
||||||
{counter, 'emqx_ds_egress_batches_failed'},
|
{counter, 'emqx_ds_egress_batches_failed'},
|
||||||
|
@ -55,6 +57,12 @@
|
||||||
{slide, 'emqx_ds_egress_flush_time'}
|
{slide, 'emqx_ds_egress_flush_time'}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(INGRESS_METRICS, [
|
||||||
|
{slide, 'emqx_ds_builtin_next_time'}
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(SHARD_METRICS, ?EGRESS_METRICS ++ ?INGRESS_METRICS).
|
||||||
|
|
||||||
-type shard_metrics_id() :: binary().
|
-type shard_metrics_id() :: binary().
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -112,6 +120,14 @@ inc_egress_bytes(Id, NMessages) ->
|
||||||
observe_egress_flush_time(Id, FlushTime) ->
|
observe_egress_flush_time(Id, FlushTime) ->
|
||||||
emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime).
|
emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime).
|
||||||
|
|
||||||
|
%% @doc Add a sample of elapsed time spent waiting for a
|
||||||
|
%% `emqx_ds_replication_layer:next'
|
||||||
|
-spec observe_next_time(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), non_neg_integer()) ->
|
||||||
|
ok.
|
||||||
|
observe_next_time(DB, Shard, NextTime) ->
|
||||||
|
Id = shard_metric_id(DB, Shard),
|
||||||
|
emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_builtin_next_time', NextTime).
|
||||||
|
|
||||||
prometheus_meta() ->
|
prometheus_meta() ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun
|
fun
|
||||||
|
|
|
@ -329,7 +329,11 @@ next(DB, Iter0, BatchSize) ->
|
||||||
%%
|
%%
|
||||||
%% This kind of trickery should be probably done here in the
|
%% This kind of trickery should be probably done here in the
|
||||||
%% replication layer. Or, perhaps, in the logic layer.
|
%% replication layer. Or, perhaps, in the logic layer.
|
||||||
case ra_next(DB, Shard, StorageIter0, BatchSize) of
|
T0 = erlang:monotonic_time(microsecond),
|
||||||
|
Result = ra_next(DB, Shard, StorageIter0, BatchSize),
|
||||||
|
T1 = erlang:monotonic_time(microsecond),
|
||||||
|
emqx_ds_builtin_metrics:observe_next_time(DB, Shard, T1 - T0),
|
||||||
|
case Result of
|
||||||
{ok, StorageIter, Batch} ->
|
{ok, StorageIter, Batch} ->
|
||||||
Iter = Iter0#{?enc := StorageIter},
|
Iter = Iter0#{?enc := StorageIter},
|
||||||
{ok, Iter, Batch};
|
{ok, Iter, Batch};
|
||||||
|
|
Loading…
Reference in New Issue