diff --git a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl index 0a82a6682..a76289eb9 100644 --- a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl @@ -19,17 +19,17 @@ %%%% Egress metrics: %% Number of successfully flushed batches: --define(DS_EGRESS_BATCHES, emqx_ds_egress_batches). +-define(DS_BUFFER_BATCHES, emqx_ds_buffer_batches). %% Number of batch flush retries: --define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry). +-define(DS_BUFFER_BATCHES_RETRY, emqx_ds_buffer_batches_retry). %% Number of batches that weren't flushed due to unrecoverable errors: --define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed). +-define(DS_BUFFER_BATCHES_FAILED, emqx_ds_buffer_batches_failed). %% Total number of messages that were successfully committed to the storage: --define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages). +-define(DS_BUFFER_MESSAGES, emqx_ds_buffer_messages). %% Total size of payloads that were successfully committed to the storage: --define(DS_EGRESS_BYTES, emqx_ds_egress_bytes). +-define(DS_BUFFER_BYTES, emqx_ds_buffer_bytes). %% Sliding average of flush time (microseconds): --define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time). +-define(DS_BUFFER_FLUSH_TIME, emqx_ds_buffer_flush_time). %%%% Storage layer metrics: -define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time). diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index a69ae22c2..2d9f9ea16 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -68,16 +68,16 @@ -define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS). --define(EGRESS_METRICS, [ - {counter, ?DS_EGRESS_BATCHES}, - {counter, ?DS_EGRESS_BATCHES_RETRY}, - {counter, ?DS_EGRESS_BATCHES_FAILED}, - {counter, ?DS_EGRESS_MESSAGES}, - {counter, ?DS_EGRESS_BYTES}, - {slide, ?DS_EGRESS_FLUSH_TIME} +-define(BUFFER_METRICS, [ + {counter, ?DS_BUFFER_BATCHES}, + {counter, ?DS_BUFFER_BATCHES_RETRY}, + {counter, ?DS_BUFFER_BATCHES_FAILED}, + {counter, ?DS_BUFFER_MESSAGES}, + {counter, ?DS_BUFFER_BYTES}, + {slide, ?DS_BUFFER_FLUSH_TIME} ]). --define(SHARD_METRICS, ?EGRESS_METRICS). +-define(SHARD_METRICS, ?BUFFER_METRICS). -type shard_metrics_id() :: binary(). @@ -108,35 +108,35 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_RETRY). %% @doc Increase the number of time the egress worker encountered an %% unrecoverable error while trying to flush the batch -spec inc_egress_batches_failed(shard_metrics_id()) -> ok. inc_egress_batches_failed(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_FAILED). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_MESSAGES, NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BYTES, NMessages). %% @doc Add a sample of elapsed time spent flushing the egress to the %% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime). + catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_BUFFER_FLUSH_TIME, FlushTime). -spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. observe_store_batch_time({DB, _}, StoreTime) -> diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index f4d0ff2c0..5d88ebd17 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -504,12 +504,12 @@ emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []) emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])); %% DS -emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUFFER_BATCHES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUFFER_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUFFER_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUFFER_MESSAGES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUFFER_BYTES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUFFER_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, [])); diff --git a/changes/ce/breaking-13248.en.md b/changes/ce/breaking-13248.en.md index 9f2ad2bd8..731196e0b 100644 --- a/changes/ce/breaking-13248.en.md +++ b/changes/ce/breaking-13248.en.md @@ -5,3 +5,12 @@ This backend is available in both open source and enterprise editions. - `builtin_raft`: A durable storage backend that uses Raft algorithm for replication. This backend is available enterprise edition. + +The following Prometheus metrics have been renamed: + +- `emqx_ds_egress_batches` -> `emqx_ds_buffer_batches` +- `emqx_ds_egress_batches_retry` -> `emqx_ds_buffer_batches_retry` +- `emqx_ds_egress_batches_failed` -> `emqx_ds_buffer_batches_failed` +- `emqx_ds_egress_messages` -> `emqx_ds_buffer_messages` +- `emqx_ds_egress_bytes` -> `emqx_ds_buffer_bytes` +- `emqx_ds_egress_flush_time` -> `emqx_ds_buffer_flush_time`