From 94ca7ad0f86c6f924351f3852cf35bfb66b8bb02 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 01:52:34 +0200 Subject: [PATCH] feat(ds): Report counters for LTS storage layout --- apps/emqx_durable_storage/include/emqx_ds.hrl | 4 +- .../include/emqx_ds_metrics.hrl | 49 ++++++++++++++++ .../src/emqx_ds_builtin_metrics.erl | 55 ++++++++++++------ .../src/emqx_ds_storage_bitfield_lts.erl | 57 ++++++++++++------- .../test/emqx_ds_SUITE.erl | 2 +- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 22 ++++--- apps/emqx_prometheus/src/emqx_prometheus.erl | 20 ++++--- 7 files changed, 150 insertions(+), 59 deletions(-) create mode 100644 apps/emqx_durable_storage/include/emqx_ds_metrics.hrl diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index f24605175..cc7a7431f 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --ifndef(EMQX_DS_HRL_HRL). --define(EMQX_DS_HRL_HRL, true). +-ifndef(EMQX_DS_HRL). +-define(EMQX_DS_HRL, true). -endif. diff --git a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl new file mode 100644 index 000000000..0a82a6682 --- /dev/null +++ b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_METRICS_HRL). +-define(EMQX_DS_METRICS_HRL, true). + +%%%% Egress metrics: + +%% Number of successfully flushed batches: +-define(DS_EGRESS_BATCHES, emqx_ds_egress_batches). +%% Number of batch flush retries: +-define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry). +%% Number of batches that weren't flushed due to unrecoverable errors: +-define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed). +%% Total number of messages that were successfully committed to the storage: +-define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages). +%% Total size of payloads that were successfully committed to the storage: +-define(DS_EGRESS_BYTES, emqx_ds_egress_bytes). +%% Sliding average of flush time (microseconds): +-define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time). + +%%%% Storage layer metrics: +-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time). +-define(DS_BUILTIN_NEXT_TIME, emqx_ds_builtin_next_time). + +%%% LTS Storage counters: + +%% This counter is incremented when the iterator seeks to the next interval: +-define(DS_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek). +%% This counter is incremented when the iterator proceeds to the next +%% key within the interval (this is is best case scenario): +-define(DS_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next). +%% This counter is incremented when the key passes bitmask check, but +%% the value is rejected by the subsequent post-processing: +-define(DS_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 833e39211..ce984db57 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -32,7 +32,11 @@ observe_store_batch_time/2, - observe_next_time/2 + observe_next_time/2, + + inc_lts_seek_counter/2, + inc_lts_next_counter/2, + inc_lts_collision_counter/2 ]). %% behavior callbacks: @@ -43,6 +47,8 @@ -export_type([shard_metrics_id/0]). +-include("emqx_ds_metrics.hrl"). + %%================================================================================ %% Type declarations %%================================================================================ @@ -50,22 +56,25 @@ -define(WORKER, ?MODULE). -define(STORAGE_LAYER_METRICS, [ - {slide, 'emqx_ds_store_batch_time'} + {slide, ?DS_STORE_BATCH_TIME}, + {counter, ?DS_LTS_SEEK_COUNTER}, + {counter, ?DS_LTS_NEXT_COUNTER}, + {counter, ?DS_LTS_COLLISION_COUNTER} ]). -define(FETCH_METRICS, [ - {slide, 'emqx_ds_builtin_next_time'} + {slide, ?DS_BUILTIN_NEXT_TIME} ]). -define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS). -define(EGRESS_METRICS, [ - {counter, 'emqx_ds_egress_batches'}, - {counter, 'emqx_ds_egress_batches_retry'}, - {counter, 'emqx_ds_egress_batches_failed'}, - {counter, 'emqx_ds_egress_messages'}, - {counter, 'emqx_ds_egress_bytes'}, - {slide, 'emqx_ds_egress_flush_time'} + {counter, ?DS_EGRESS_BATCHES}, + {counter, ?DS_EGRESS_BATCHES_RETRY}, + {counter, ?DS_EGRESS_BATCHES_FAILED}, + {counter, ?DS_EGRESS_MESSAGES}, + {counter, ?DS_EGRESS_BYTES}, + {slide, ?DS_EGRESS_FLUSH_TIME} ]). -define(SHARD_METRICS, ?EGRESS_METRICS). @@ -99,45 +108,57 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY). %% @doc Increase the number of time the egress worker encountered an %% unrecoverable error while trying to flush the batch -spec inc_egress_batches_failed(shard_metrics_id()) -> ok. inc_egress_batches_failed(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages). %% @doc Add a sample of elapsed time spent flushing the egress to the %% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - catch emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). + catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime). -spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. observe_store_batch_time({DB, _}, StoreTime) -> - catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_store_batch_time', StoreTime). + catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_STORE_BATCH_TIME, StoreTime). %% @doc Add a sample of elapsed time spent waiting for a batch %% `emqx_ds_replication_layer:next' -spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok. observe_next_time(DB, NextTime) -> - catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_builtin_next_time', NextTime). + catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_BUILTIN_NEXT_TIME, NextTime). + +-spec inc_lts_seek_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_seek_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_SEEK_COUNTER, Inc). + +-spec inc_lts_next_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_next_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_NEXT_COUNTER, Inc). + +-spec inc_lts_collision_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_collision_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_COLLISION_COUNTER, Inc). prometheus_meta() -> lists:map( diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 594854d21..2ec6674b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -44,6 +44,7 @@ -export_type([options/0]). +-include("emqx_ds_metrics.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -115,8 +116,6 @@ ?last_seen_key := binary() }. --define(COUNTER, emqx_ds_storage_bitfield_lts_counter). - %% Limit on the number of wildcard levels in the learned topic trie: -define(WILDCARD_LIMIT, 10). @@ -140,6 +139,8 @@ -define(DIM_TOPIC, 1). -define(DIM_TS, 2). +-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -347,13 +348,18 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> +next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. + init_counters(), Now = emqx_ds:timestamp_us(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, - next_until(Schema, It, SafeCutoffTime, BatchSize). + try + next_until(Schema, It, SafeCutoffTime, BatchSize) + after + report_counters(Shard) + end. next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when StartTime >= SafeCutoffTime @@ -375,20 +381,23 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, filter := Filter } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers), try - put(?COUNTER, 0), next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize) after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) + rocksdb:iterator_close(ITHandle) end. -delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. + init_counters(), Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, - delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize). + try + delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) + after + report_counters(Shard) + end. delete_next_until( _Schema, @@ -417,7 +426,6 @@ delete_next_until( DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers ), try - put(?COUNTER, 0), LoopContext = LoopContext0#{ db => DB, cf => CF, @@ -430,8 +438,7 @@ delete_next_until( }, delete_next_loop(LoopContext) after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) + rocksdb:iterator_close(ITHandle) end. %%================================================================================ @@ -477,7 +484,6 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> - inc_counter(), #{?tag := ?IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> @@ -485,6 +491,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> Key1 -> %% assert true = Key1 > Key0, + inc_counter(?DS_LTS_SEEK_COUNTER), case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> {N, It, Acc} = traverse_interval( @@ -510,6 +517,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) - Acc = [{Key, Msg} | Acc0], traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1); false -> + inc_counter(?DS_LTS_COLLISION_COUNTER), traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N) end; overflow -> @@ -521,7 +529,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) - traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {0, It, Acc}; traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) -> - inc_counter(), + inc_counter(?DS_LTS_NEXT_COUNTER), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N); @@ -541,7 +549,7 @@ delete_next_loop(LoopContext0) -> iterated_over := AccIter0, it_handle := ITHandle } = LoopContext0, - inc_counter(), + inc_counter(?DS_LTS_SEEK_COUNTER), #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> @@ -623,7 +631,7 @@ delete_traverse_interval1(LoopContext0) -> iterated_over := AccIter, storage_iter := It } = LoopContext0, - inc_counter(), + inc_counter(?DS_LTS_NEXT_COUNTER), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> delete_traverse_interval(LoopContext0#{ @@ -767,9 +775,20 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) -> read_persisted_trie(_IT, {error, invalid_iterator}) -> []. -inc_counter() -> - N = get(?COUNTER), - put(?COUNTER, N + 1). +inc_counter(Counter) -> + N = get(Counter), + put(Counter, N + 1). + +init_counters() -> + _ = [put(I, 0) || I <- ?DS_LTS_COUNTERS], + ok. + +report_counters(Shard) -> + emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)), + emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)), + emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)), + _ = [erase(I) || I <- ?DS_LTS_COUNTERS], + ok. %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 33988a974..727f424b8 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -677,7 +677,7 @@ t_error_mapping_replication_layer(_Config) -> ), meck:unload(). -%% This test suite verifies the behavior of `store_batch' operation +%% This testcase verifies the behavior of `store_batch' operation %% when the underlying code experiences recoverable or unrecoverable %% problems. t_store_batch_fail(_Config) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 636b57b89..78838e675 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -261,8 +261,7 @@ t_atomic_store_batch(_Config) -> sync => true }) ), - - ok + timer:sleep(1000) end, fun(Trace) -> %% Must contain exactly one flush with all messages. @@ -293,19 +292,18 @@ t_non_atomic_store_batch(_Config) -> sync => true }) ), - - ok + Msgs end, - fun(Trace) -> - %% Should contain one flush per message. - ?assertMatch( - [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + fun(ExpectedMsgs, Trace) -> + ProcessedMsgs = lists:append( + ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)) ), - ok + ?assertEqual( + ExpectedMsgs, + ProcessedMsgs + ) end - ), - ok. + ). check(Shard, TopicFilter, StartTime, ExpectedMessages) -> ExpectedFiltered = lists:filter( diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 2327a7263..450033f18 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -37,6 +37,7 @@ -include_lib("public_key/include/public_key.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_durable_storage/include/emqx_ds_metrics.hrl"). -import( prometheus_model_helpers, @@ -501,14 +502,17 @@ emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []) emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])); %% DS -emqx_collect(K = emqx_ds_egress_batches, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_store_batch_time, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_builtin_next_time, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators