refactor(ds): Rename egress metrics to 'buffer'
This commit is contained in:
parent
09c3ae795d
commit
ecb172b07e
|
@ -19,17 +19,17 @@
|
||||||
%%%% Egress metrics:
|
%%%% Egress metrics:
|
||||||
|
|
||||||
%% Number of successfully flushed batches:
|
%% Number of successfully flushed batches:
|
||||||
-define(DS_EGRESS_BATCHES, emqx_ds_egress_batches).
|
-define(DS_BUFFER_BATCHES, emqx_ds_buffer_batches).
|
||||||
%% Number of batch flush retries:
|
%% Number of batch flush retries:
|
||||||
-define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry).
|
-define(DS_BUFFER_BATCHES_RETRY, emqx_ds_buffer_batches_retry).
|
||||||
%% Number of batches that weren't flushed due to unrecoverable errors:
|
%% Number of batches that weren't flushed due to unrecoverable errors:
|
||||||
-define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed).
|
-define(DS_BUFFER_BATCHES_FAILED, emqx_ds_buffer_batches_failed).
|
||||||
%% Total number of messages that were successfully committed to the storage:
|
%% Total number of messages that were successfully committed to the storage:
|
||||||
-define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages).
|
-define(DS_BUFFER_MESSAGES, emqx_ds_buffer_messages).
|
||||||
%% Total size of payloads that were successfully committed to the storage:
|
%% Total size of payloads that were successfully committed to the storage:
|
||||||
-define(DS_EGRESS_BYTES, emqx_ds_egress_bytes).
|
-define(DS_BUFFER_BYTES, emqx_ds_buffer_bytes).
|
||||||
%% Sliding average of flush time (microseconds):
|
%% Sliding average of flush time (microseconds):
|
||||||
-define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time).
|
-define(DS_BUFFER_FLUSH_TIME, emqx_ds_buffer_flush_time).
|
||||||
|
|
||||||
%%%% Storage layer metrics:
|
%%%% Storage layer metrics:
|
||||||
-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time).
|
-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time).
|
||||||
|
|
|
@ -68,16 +68,16 @@
|
||||||
|
|
||||||
-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS).
|
-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS).
|
||||||
|
|
||||||
-define(EGRESS_METRICS, [
|
-define(BUFFER_METRICS, [
|
||||||
{counter, ?DS_EGRESS_BATCHES},
|
{counter, ?DS_BUFFER_BATCHES},
|
||||||
{counter, ?DS_EGRESS_BATCHES_RETRY},
|
{counter, ?DS_BUFFER_BATCHES_RETRY},
|
||||||
{counter, ?DS_EGRESS_BATCHES_FAILED},
|
{counter, ?DS_BUFFER_BATCHES_FAILED},
|
||||||
{counter, ?DS_EGRESS_MESSAGES},
|
{counter, ?DS_BUFFER_MESSAGES},
|
||||||
{counter, ?DS_EGRESS_BYTES},
|
{counter, ?DS_BUFFER_BYTES},
|
||||||
{slide, ?DS_EGRESS_FLUSH_TIME}
|
{slide, ?DS_BUFFER_FLUSH_TIME}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(SHARD_METRICS, ?EGRESS_METRICS).
|
-define(SHARD_METRICS, ?BUFFER_METRICS).
|
||||||
|
|
||||||
-type shard_metrics_id() :: binary().
|
-type shard_metrics_id() :: binary().
|
||||||
|
|
||||||
|
@ -108,35 +108,35 @@ init_for_shard(ShardId) ->
|
||||||
%% @doc Increase the number of successfully flushed batches
|
%% @doc Increase the number of successfully flushed batches
|
||||||
-spec inc_egress_batches(shard_metrics_id()) -> ok.
|
-spec inc_egress_batches(shard_metrics_id()) -> ok.
|
||||||
inc_egress_batches(Id) ->
|
inc_egress_batches(Id) ->
|
||||||
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES).
|
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES).
|
||||||
|
|
||||||
%% @doc Increase the number of time the egress worker had to retry
|
%% @doc Increase the number of time the egress worker had to retry
|
||||||
%% flushing the batch
|
%% flushing the batch
|
||||||
-spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
|
-spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
|
||||||
inc_egress_batches_retry(Id) ->
|
inc_egress_batches_retry(Id) ->
|
||||||
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY).
|
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_RETRY).
|
||||||
|
|
||||||
%% @doc Increase the number of time the egress worker encountered an
|
%% @doc Increase the number of time the egress worker encountered an
|
||||||
%% unrecoverable error while trying to flush the batch
|
%% unrecoverable error while trying to flush the batch
|
||||||
-spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
|
-spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
|
||||||
inc_egress_batches_failed(Id) ->
|
inc_egress_batches_failed(Id) ->
|
||||||
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED).
|
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_FAILED).
|
||||||
|
|
||||||
%% @doc Increase the number of messages successfully saved to the shard
|
%% @doc Increase the number of messages successfully saved to the shard
|
||||||
-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
|
-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
|
||||||
inc_egress_messages(Id, NMessages) ->
|
inc_egress_messages(Id, NMessages) ->
|
||||||
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages).
|
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_MESSAGES, NMessages).
|
||||||
|
|
||||||
%% @doc Increase the number of messages successfully saved to the shard
|
%% @doc Increase the number of messages successfully saved to the shard
|
||||||
-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
|
-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
|
||||||
inc_egress_bytes(Id, NMessages) ->
|
inc_egress_bytes(Id, NMessages) ->
|
||||||
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages).
|
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BYTES, NMessages).
|
||||||
|
|
||||||
%% @doc Add a sample of elapsed time spent flushing the egress to the
|
%% @doc Add a sample of elapsed time spent flushing the egress to the
|
||||||
%% Raft log (in microseconds)
|
%% Raft log (in microseconds)
|
||||||
-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
|
-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
|
||||||
observe_egress_flush_time(Id, FlushTime) ->
|
observe_egress_flush_time(Id, FlushTime) ->
|
||||||
catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime).
|
catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_BUFFER_FLUSH_TIME, FlushTime).
|
||||||
|
|
||||||
-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
|
-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
|
||||||
observe_store_batch_time({DB, _}, StoreTime) ->
|
observe_store_batch_time({DB, _}, StoreTime) ->
|
||||||
|
|
|
@ -504,12 +504,12 @@ emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, [])
|
||||||
emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
|
emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, []));
|
emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
%% DS
|
%% DS
|
||||||
emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUFFER_BATCHES, D) -> counter_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUFFER_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUFFER_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUFFER_MESSAGES, D) -> counter_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUFFER_BYTES, D) -> counter_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUFFER_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, []));
|
emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, []));
|
||||||
|
|
|
@ -5,3 +5,12 @@
|
||||||
This backend is available in both open source and enterprise editions.
|
This backend is available in both open source and enterprise editions.
|
||||||
- `builtin_raft`: A durable storage backend that uses Raft algorithm for replication.
|
- `builtin_raft`: A durable storage backend that uses Raft algorithm for replication.
|
||||||
This backend is available enterprise edition.
|
This backend is available enterprise edition.
|
||||||
|
|
||||||
|
The following Prometheus metrics have been renamed:
|
||||||
|
|
||||||
|
- `emqx_ds_egress_batches` -> `emqx_ds_buffer_batches`
|
||||||
|
- `emqx_ds_egress_batches_retry` -> `emqx_ds_buffer_batches_retry`
|
||||||
|
- `emqx_ds_egress_batches_failed` -> `emqx_ds_buffer_batches_failed`
|
||||||
|
- `emqx_ds_egress_messages` -> `emqx_ds_buffer_messages`
|
||||||
|
- `emqx_ds_egress_bytes` -> `emqx_ds_buffer_bytes`
|
||||||
|
- `emqx_ds_egress_flush_time` -> `emqx_ds_buffer_flush_time`
|
||||||
|
|
Loading…
Reference in New Issue