feat(ds_buffer): Add `ls' function to list all local buffers

This commit is contained in:
ieQu1 2024-06-14 22:00:00 +02:00
parent 5a8818edf3
commit 99c9b56cf3
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 36 additions and 30 deletions

View File

@ -22,6 +22,7 @@
%% API:
-export([start_link/4, store_batch/3, shard_of_message/3]).
-export([ls/0]).
%% behavior callbacks:
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -63,6 +64,11 @@
%% API functions
%%================================================================================
-spec ls() -> [{emqx_ds:db(), _Shard}].
ls() ->
MS = {{n, l, {?MODULE, '$1', '$2'}}, [], ['$1', '$2']},
gproc:select({local, names}, [MS]).
-spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) ->
{ok, pid()}.
start_link(CallbackModule, CallbackOptions, DB, Shard) ->
@ -267,12 +273,12 @@ do_flush(
{CallbackS, Result} = CBM:flush_buffer(DB, Shard, Messages, CallbackS0),
S = S0#s{callback_state = CallbackS},
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0),
emqx_ds_builtin_metrics:observe_buffer_flush_time(Metrics, T1 - T0),
case Result of
ok ->
emqx_ds_builtin_metrics:inc_egress_batches(Metrics),
emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
emqx_ds_builtin_metrics:inc_buffer_batches(Metrics),
emqx_ds_builtin_metrics:inc_buffer_messages(Metrics, S#s.n),
emqx_ds_builtin_metrics:inc_buffer_bytes(Metrics, S#s.n_bytes),
?tp(
emqx_ds_buffer_flush,
#{db => DB, shard => Shard, batch => Messages}
@ -298,7 +304,7 @@ do_flush(
#{db => DB, shard => Shard, reason => Err}
),
%% Retry sending the batch:
emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
emqx_ds_builtin_metrics:inc_buffer_batches_retry(Metrics),
erlang:garbage_collect(),
%% We block the gen_server until the next retry.
BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
@ -310,7 +316,7 @@ do_flush(
emqx_ds_buffer_flush_failed,
#{db => DB, shard => Shard, error => Err}
),
emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
emqx_ds_builtin_metrics:inc_buffer_batches_failed(Metrics),
Reply =
case Err of
{error, _, _} -> Err;

View File

@ -22,13 +22,13 @@
-export([prometheus_meta/0, prometheus_collect/1]).
-export([
inc_egress_batches/1,
inc_egress_batches_retry/1,
inc_egress_batches_failed/1,
inc_egress_messages/2,
inc_egress_bytes/2,
inc_buffer_batches/1,
inc_buffer_batches_retry/1,
inc_buffer_batches_failed/1,
inc_buffer_messages/2,
inc_buffer_bytes/2,
observe_egress_flush_time/2,
observe_buffer_flush_time/2,
observe_store_batch_time/2,
@ -106,36 +106,36 @@ init_for_shard(ShardId) ->
emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []).
%% @doc Increase the number of successfully flushed batches
-spec inc_egress_batches(shard_metrics_id()) -> ok.
inc_egress_batches(Id) ->
-spec inc_buffer_batches(shard_metrics_id()) -> ok.
inc_buffer_batches(Id) ->
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES).
%% @doc Increase the number of time the egress worker had to retry
%% @doc Increase the number of time the buffer worker had to retry
%% flushing the batch
-spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
inc_egress_batches_retry(Id) ->
-spec inc_buffer_batches_retry(shard_metrics_id()) -> ok.
inc_buffer_batches_retry(Id) ->
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_RETRY).
%% @doc Increase the number of time the egress worker encountered an
%% @doc Increase the number of time the buffer worker encountered an
%% unrecoverable error while trying to flush the batch
-spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
inc_egress_batches_failed(Id) ->
-spec inc_buffer_batches_failed(shard_metrics_id()) -> ok.
inc_buffer_batches_failed(Id) ->
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BATCHES_FAILED).
%% @doc Increase the number of messages successfully saved to the shard
-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
inc_egress_messages(Id, NMessages) ->
-spec inc_buffer_messages(shard_metrics_id(), non_neg_integer()) -> ok.
inc_buffer_messages(Id, NMessages) ->
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_MESSAGES, NMessages).
%% @doc Increase the number of messages successfully saved to the shard
-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
inc_egress_bytes(Id, NMessages) ->
-spec inc_buffer_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
inc_buffer_bytes(Id, NMessages) ->
catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_BUFFER_BYTES, NMessages).
%% @doc Add a sample of elapsed time spent flushing the egress to the
%% @doc Add a sample of elapsed time spent flushing the buffer to the
%% Raft log (in microseconds)
-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
observe_egress_flush_time(Id, FlushTime) ->
-spec observe_buffer_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
observe_buffer_flush_time(Id, FlushTime) ->
catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_BUFFER_FLUSH_TIME, FlushTime).
-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
@ -221,13 +221,13 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) ->
%% This function returns the data in the following format:
%% ```
%% #{emqx_ds_egress_batches =>
%% #{emqx_ds_buffer_batches =>
%% [{[{db,messages},{shard,<<"1">>}],99408},
%% {[{db,messages},{shard,<<"0">>}],99409}],
%% emqx_ds_egress_batches_retry =>
%% emqx_ds_buffer_batches_retry =>
%% [{[{db,messages},{shard,<<"1">>}],0},
%% {[{db,messages},{shard,<<"0">>}],0}],
%% emqx_ds_egress_messages =>
%% emqx_ds_buffer_messages =>
%% ...
%% }
%% '''