From 99c9b56cf3895e921d6d65792d9fa013ec6c951f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 14 Jun 2024 22:00:00 +0200 Subject: [PATCH] feat(ds_buffer): Add `ls' function to list all local buffers --- .../src/emqx_ds_buffer.erl | 18 ++++--- .../src/emqx_ds_builtin_metrics.erl | 48 +++++++++---------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index f0cf4fe83..3fcbec3b9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -22,6 +22,7 @@ %% API: -export([start_link/4, store_batch/3, shard_of_message/3]). +-export([ls/0]). %% behavior callbacks: -export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -63,6 +64,11 @@ %% API functions %%================================================================================ +-spec ls() -> [{emqx_ds:db(), _Shard}]. +ls() -> + MS = {{n, l, {?MODULE, '$1', '$2'}}, [], ['$1', '$2']}, + gproc:select({local, names}, [MS]). + -spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) -> {ok, pid()}. start_link(CallbackModule, CallbackOptions, DB, Shard) -> @@ -267,12 +273,12 @@ do_flush( {CallbackS, Result} = CBM:flush_buffer(DB, Shard, Messages, CallbackS0), S = S0#s{callback_state = CallbackS}, T1 = erlang:monotonic_time(microsecond), - emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0), + emqx_ds_builtin_metrics:observe_buffer_flush_time(Metrics, T1 - T0), case Result of ok -> - emqx_ds_builtin_metrics:inc_egress_batches(Metrics), - emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), - emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), + emqx_ds_builtin_metrics:inc_buffer_batches(Metrics), + emqx_ds_builtin_metrics:inc_buffer_messages(Metrics, S#s.n), + emqx_ds_builtin_metrics:inc_buffer_bytes(Metrics, S#s.n_bytes), ?tp( emqx_ds_buffer_flush, #{db => DB, shard => Shard, batch => Messages} @@ -298,7 +304,7 @@ do_flush( #{db => DB, shard => Shard, reason => Err} ), %% Retry sending the batch: - emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), + emqx_ds_builtin_metrics:inc_buffer_batches_retry(Metrics), erlang:garbage_collect(), %% We block the gen_server until the next retry. BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), @@ -310,7 +316,7 @@ do_flush( emqx_ds_buffer_flush_failed, #{db => DB, shard => Shard, error => Err} ), - emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), + emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics), Reply = case Err of {error, _, _} -> Err; diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 2d9f9ea16..994368df0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -22,13 +22,13 @@ -export([prometheus_meta/0, prometheus_collect/1]). -export([ - inc_egress_batches/1, - inc_egress_batches_retry/1, - inc_egress_batches_failed/1, - inc_egress_messages/2, - inc_egress_bytes/2, + inc_buffer_batches/1, + inc_buffer_batches_retry/1, + inc_buffer_batches_failed/1, + inc_buffer_messages/2, + inc_buffer_bytes/2, - observe_egress_flush_time/2, + observe_buffer_flush_time/2, observe_store_batch_time/2, @@ -106,36 +106,36 @@ init_for_shard(ShardId) -> emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []). %% @doc Increase the number of successfully flushed batches --spec inc_egress_batches(shard_metrics_id()) -> ok. -inc_egress_batches(Id) -> +-spec inc_buffer_batches(shard_metrics_id()) -> ok. +inc_buffer_batches(Id) -> catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES). -%% @doc Increase the number of time the egress worker had to retry +%% @doc Increase the number of time the buffer worker had to retry %% flushing the batch --spec inc_egress_batches_retry(shard_metrics_id()) -> ok. -inc_egress_batches_retry(Id) -> +-spec inc_buffer_batches_retry(shard_metrics_id()) -> ok. +inc_buffer_batches_retry(Id) -> catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_RETRY). -%% @doc Increase the number of time the egress worker encountered an +%% @doc Increase the number of time the buffer worker encountered an %% unrecoverable error while trying to flush the batch --spec inc_egress_batches_failed(shard_metrics_id()) -> ok. -inc_egress_batches_failed(Id) -> +-spec inc_buffer_batches_failed(shard_metrics_id()) -> ok. +inc_buffer_batches_failed(Id) -> catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_FAILED). %% @doc Increase the number of messages successfully saved to the shard --spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. -inc_egress_messages(Id, NMessages) -> +-spec inc_buffer_messages(shard_metrics_id(), non_neg_integer()) -> ok. +inc_buffer_messages(Id, NMessages) -> catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_MESSAGES, NMessages). %% @doc Increase the number of messages successfully saved to the shard --spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. -inc_egress_bytes(Id, NMessages) -> +-spec inc_buffer_bytes(shard_metrics_id(), non_neg_integer()) -> ok. +inc_buffer_bytes(Id, NMessages) -> catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BYTES, NMessages). -%% @doc Add a sample of elapsed time spent flushing the egress to the +%% @doc Add a sample of elapsed time spent flushing the buffer to the %% Raft log (in microseconds) --spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. -observe_egress_flush_time(Id, FlushTime) -> +-spec observe_buffer_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. +observe_buffer_flush_time(Id, FlushTime) -> catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_BUFFER_FLUSH_TIME, FlushTime). -spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. @@ -221,13 +221,13 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) -> %% This function returns the data in the following format: %% ``` -%% #{emqx_ds_egress_batches => +%% #{emqx_ds_buffer_batches => %% [{[{db,messages},{shard,<<"1">>}],99408}, %% {[{db,messages},{shard,<<"0">>}],99409}], -%% emqx_ds_egress_batches_retry => +%% emqx_ds_buffer_batches_retry => %% [{[{db,messages},{shard,<<"1">>}],0}, %% {[{db,messages},{shard,<<"0">>}],0}], -%% emqx_ds_egress_messages => +%% emqx_ds_buffer_messages => %% ... %% } %% '''