From bc915216a0ff802e4f37f4a07d511f5b083746a7 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 14 Jun 2024 23:50:59 +0200 Subject: [PATCH] feat(ds): Support metrics for builtin_local backend --- .../test/emqx_persistent_messages_SUITE.erl | 15 +++++++++++- apps/emqx_durable_storage/src/emqx_ds.erl | 7 ++++++ .../src/emqx_ds_buffer.erl | 5 ++-- .../src/emqx_ds_builtin_metrics.erl | 24 +++++++++---------- apps/emqx_durable_storage/src/emqx_ds_sup.erl | 14 +++++++++++ 5 files changed, 50 insertions(+), 15 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 951d72c8d..f58b21fb7 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -33,7 +33,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - Config. + case is_standalone() of + true -> + {skip, standalone_not_supported}; + false -> + Config + end. end_per_suite(_Config) -> ok. @@ -590,3 +595,11 @@ on_message_dropped(Msg, Context, Res, TestPid) -> ErrCtx = #{msg => Msg, ctx => Context, res => Res}, ct:pal("this hook should not be called.\n ~p", [ErrCtx]), exit(TestPid, {hookpoint_called, ErrCtx}). + +is_standalone() -> + try + emqx_conf:module_info(), + false + catch + error:undef -> true + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index c2f1e7eb3..594c4f30e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -27,6 +27,7 @@ open_db/2, close_db/1, + which_dbs/0, update_db_config/2, add_generation/1, list_generations_with_lifetimes/1, @@ -264,13 +265,19 @@ open_db(DB, Opts = #{backend := Backend}) -> error({no_such_backend, Backend}); Module -> persistent_term:put(?persistent_term(DB), Module), + emqx_ds_sup:register_db(DB, Backend), ?module(DB):open_db(DB, Opts) end. -spec close_db(db()) -> ok. close_db(DB) -> + emqx_ds_sup:unregister_db(DB), ?module(DB):close_db(DB). +-spec which_dbs() -> [{db(), _Backend :: atom()}]. +which_dbs() -> + emqx_ds_sup:which_dbs(). + -spec add_generation(db()) -> ok. add_generation(DB) -> ?module(DB):add_generation(DB). diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index 3fcbec3b9..56e98eee3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -39,7 +39,8 @@ %% Type declarations %%================================================================================ --define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). +-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}). +-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}). -define(flush, flush). -define(cbm(DB), {?MODULE, DB}). @@ -66,7 +67,7 @@ -spec ls() -> [{emqx_ds:db(), _Shard}]. ls() -> - MS = {{n, l, {?MODULE, '$1', '$2'}}, [], ['$1', '$2']}, + MS = {{?name('$1', '$2'), '_', '_'}, [], [{{'$1', '$2'}}]}, gproc:select({local, names}, [MS]). -spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 994368df0..83c7c2d53 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -176,11 +176,14 @@ prometheus_collect(NodeOrAggr) -> prometheus_per_db(NodeOrAggr) -> lists:foldl( - fun(DB, Acc) -> - prometheus_per_db(NodeOrAggr, DB, Acc) + fun + ({DB, Backend}, Acc) when Backend =:= builtin_local; Backend =:= builtin_raft -> + prometheus_per_db(NodeOrAggr, DB, Acc); + ({_, _}, Acc) -> + Acc end, #{}, - emqx_ds_builtin_raft_db_sup:which_dbs() + emqx_ds:which_dbs() ). %% This function returns the data in the following format: @@ -235,18 +238,15 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) -> %% If `NodeOrAggr' = `node' then node name is appended to the list of %% labels. prometheus_per_shard(NodeOrAggr) -> + prometheus_buffer_metrics(NodeOrAggr). + +prometheus_buffer_metrics(NodeOrAggr) -> lists:foldl( - fun(DB, Acc0) -> - lists:foldl( - fun(Shard, Acc) -> - prometheus_per_shard(NodeOrAggr, DB, Shard, Acc) - end, - Acc0, - emqx_ds_replication_layer_meta:shards(DB) - ) + fun({DB, Shard}, Acc) -> + prometheus_per_shard(NodeOrAggr, DB, Shard, Acc) end, #{}, - emqx_ds_builtin_raft_db_sup:which_dbs() + emqx_ds_buffer:ls() ). prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index c4bd0e873..41631d6d6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -19,6 +19,7 @@ %% API: -export([start_link/0, attach_backend/2]). +-export([register_db/2, unregister_db/1, which_dbs/0]). %% behaviour callbacks: -export([init/1]). @@ -28,6 +29,7 @@ %%================================================================================ -define(SUP, ?MODULE). +-define(TAB, ?MODULE). %%================================================================================ %% API functions @@ -58,11 +60,23 @@ attach_backend(Backend, Start) -> {error, Err} end. +register_db(DB, Backend) -> + ets:insert(?TAB, {DB, Backend}), + ok. + +unregister_db(DB) -> + ets:delete(?TAB, DB), + ok. + +which_dbs() -> + ets:tab2list(?TAB). + %%================================================================================ %% behaviour callbacks %%================================================================================ init(top) -> + _ = ets:new(?TAB, [public, set, named_table]), Children = [], SupFlags = #{ strategy => one_for_one,