From afeb2ab8aa6b04c58edd36570e6aa41241504de0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Jul 2024 01:11:07 +0200 Subject: [PATCH] feat(ds): Add metrics for skipstream layout --- .../include/emqx_ds_metrics.hrl | 9 ++++ .../src/emqx_ds_builtin_metrics.erl | 17 +++++++- .../src/emqx_ds_storage_skipstream_lts.erl | 43 ++++++++++++++++++- apps/emqx_prometheus/src/emqx_prometheus.erl | 9 +++- 4 files changed, 73 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl index 4e463b14f..6433b1d29 100644 --- a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl @@ -46,4 +46,13 @@ %% the value is rejected by the subsequent post-processing: -define(DS_BITFIELD_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision). +%%% Skipstream LTS Storage counters: +-define(DS_SKIPSTREAM_LTS_SEEK, emqx_ds_storage_skipstream_lts_seek). +-define(DS_SKIPSTREAM_LTS_NEXT, emqx_ds_storage_skipstream_lts_next). +-define(DS_SKIPSTREAM_LTS_HASH_COLLISION, emqx_ds_storage_skipstream_lts_hash_collision). +-define(DS_SKIPSTREAM_LTS_HIT, emqx_ds_storage_skipstream_lts_hit). +-define(DS_SKIPSTREAM_LTS_MISS, emqx_ds_storage_skipstream_lts_miss). +-define(DS_SKIPSTREAM_LTS_FUTURE, emqx_ds_storage_skipstream_lts_future). +-define(DS_SKIPSTREAM_LTS_EOS, emqx_ds_storage_skipstream_lts_end_of_stream). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 107e2e5e6..1ae37f321 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -36,7 +36,9 @@ inc_lts_seek_counter/2, inc_lts_next_counter/2, - inc_lts_collision_counter/2 + inc_lts_collision_counter/2, + + collect_shard_counter/3 ]). %% behavior callbacks: @@ -59,7 +61,14 @@ {slide, ?DS_STORE_BATCH_TIME}, {counter, ?DS_BITFIELD_LTS_SEEK_COUNTER}, {counter, ?DS_BITFIELD_LTS_NEXT_COUNTER}, - {counter, ?DS_BITFIELD_LTS_COLLISION_COUNTER} + {counter, ?DS_BITFIELD_LTS_COLLISION_COUNTER}, + {counter, ?DS_SKIPSTREAM_LTS_SEEK}, + {counter, ?DS_SKIPSTREAM_LTS_NEXT}, + {counter, ?DS_SKIPSTREAM_LTS_HASH_COLLISION}, + {counter, ?DS_SKIPSTREAM_LTS_HIT}, + {counter, ?DS_SKIPSTREAM_LTS_MISS}, + {counter, ?DS_SKIPSTREAM_LTS_FUTURE}, + {counter, ?DS_SKIPSTREAM_LTS_EOS} ]). -define(FETCH_METRICS, [ @@ -160,6 +169,10 @@ inc_lts_next_counter({DB, _}, Inc) -> inc_lts_collision_counter({DB, _}, Inc) -> catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_BITFIELD_LTS_COLLISION_COUNTER, Inc). +-spec collect_shard_counter(emqx_ds_storage_layer:shard_id(), atom(), non_neg_integer()) -> ok. +collect_shard_counter({DB, _}, Key, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, Key, Inc). + prometheus_meta() -> lists:map( fun diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl index 7b0c49c6f..164050932 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_skipstream_lts.erl @@ -43,6 +43,7 @@ -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). +-include("emqx_ds_metrics.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -260,7 +261,8 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> {ok, OldIter#it{ts = TS}} end. -next({_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) -> +next(ShardId = {_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) -> + init_counters(), Iterators = init_iterators(S, It), %% ?tp(notice, skipstream_init_iters, #{it => It, its => Iterators}), try @@ -271,7 +273,8 @@ next({_DB, Shard}, S, It, BatchSize, TMax, IsCurrent) -> Result end after - free_iterators(Iterators) + free_iterators(Iterators), + collect_counters(ShardId) end. delete_next(Shard, S, It0, Selector, BatchSize, Now, IsCurrent) -> @@ -447,12 +450,15 @@ next_loop(Shard, S, It0, Iterators, BatchSize, TMax, Op, Acc) -> case next_step(S, StaticIdx, CompressedTF, Iterators, undefined, Op) of none -> %% ?tp(notice, skipstream_loop_result, #{r => none}), + inc_counter(?DS_SKIPSTREAM_LTS_EOS), finalize_loop(It0, Op, Acc); {seek, TS} when TS > TMax -> %% ?tp(notice, skipstream_loop_result, #{r => seek_future, ts => TS}), + inc_counter(?DS_SKIPSTREAM_LTS_FUTURE), finalize_loop(It0, {seek, TS}, Acc); {ok, TS, _Key, _Msg0} when TS > TMax -> %% ?tp(notice, skipstream_loop_result, #{r => ok_future, ts => TS, key => _Key}), + inc_counter(?DS_SKIPSTREAM_LTS_FUTURE), finalize_loop(It0, {seek, TS}, Acc); {seek, TS} -> %% ?tp(notice, skipstream_loop_result, #{r => seek, ts => TS}), @@ -479,8 +485,10 @@ next_step( Result = case Op of next -> + inc_counter(?DS_SKIPSTREAM_LTS_NEXT), rocksdb:iterator_move(IH, next); {seek, TS} -> + inc_counter(?DS_SKIPSTREAM_LTS_SEEK), rocksdb:iterator_move(IH, {seek, mk_key(StaticIdx, N, Hash, TS)}) end, case Result of @@ -508,10 +516,12 @@ next_step( Msg0 = deserialize(S, Blob), case emqx_topic:match(Msg0#message.topic, CompressedTF) of true -> + inc_counter(?DS_SKIPSTREAM_LTS_HIT), {ok, NextTS, Key, Msg0}; false -> %% Hash collision. Advance to the %% next timestamp: + inc_counter(?DS_SKIPSTREAM_LTS_HASH_COLLISION), {seek, NextTS + 1} end; _ -> @@ -519,6 +529,7 @@ next_step( next_step(S, StaticIdx, CompressedTF, Iterators, NextTS, {seek, NextTS}) end; NextTS when NextTS > ExpectedTS, N > 0 -> + inc_counter(?DS_SKIPSTREAM_LTS_MISS), %% Next index level is not what we expect. {seek, NextTS} end @@ -672,3 +683,31 @@ words(<<>>) -> []; words(Bin) -> emqx_topic:words(Bin). + +%%%%%%%% Counters %%%%%%%%%% + +-define(COUNTERS, [ + ?DS_SKIPSTREAM_LTS_SEEK, + ?DS_SKIPSTREAM_LTS_NEXT, + ?DS_SKIPSTREAM_LTS_HASH_COLLISION, + ?DS_SKIPSTREAM_LTS_HIT, + ?DS_SKIPSTREAM_LTS_MISS, + ?DS_SKIPSTREAM_LTS_FUTURE, + ?DS_SKIPSTREAM_LTS_EOS +]). + +inc_counter(Counter) -> + N = get(Counter), + put(Counter, N + 1). + +init_counters() -> + _ = [put(I, 0) || I <- ?COUNTERS], + ok. + +collect_counters(Shard) -> + lists:foreach( + fun(Key) -> + emqx_ds_builtin_metrics:collect_shard_counter(Shard, Key, get(Key)) + end, + ?COUNTERS + ). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index e205952d2..c1d30e604 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -514,7 +514,14 @@ emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = ?DS_BITFIELD_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, [])); emqx_collect(K = ?DS_BITFIELD_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = ?DS_BITFIELD_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])). +emqx_collect(K = ?DS_BITFIELD_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_SEEK, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_NEXT, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_HASH_COLLISION, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_HIT, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_MISS, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_FUTURE, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_SKIPSTREAM_LTS_EOS, D) -> counter_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators