feat(ds): Support metrics for builtin_local backend
This commit is contained in:
parent
be6c5e172f
commit
bc915216a0
|
@ -33,7 +33,12 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Config.
|
case is_standalone() of
|
||||||
|
true ->
|
||||||
|
{skip, standalone_not_supported};
|
||||||
|
false ->
|
||||||
|
Config
|
||||||
|
end.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -590,3 +595,11 @@ on_message_dropped(Msg, Context, Res, TestPid) ->
|
||||||
ErrCtx = #{msg => Msg, ctx => Context, res => Res},
|
ErrCtx = #{msg => Msg, ctx => Context, res => Res},
|
||||||
ct:pal("this hook should not be called.\n ~p", [ErrCtx]),
|
ct:pal("this hook should not be called.\n ~p", [ErrCtx]),
|
||||||
exit(TestPid, {hookpoint_called, ErrCtx}).
|
exit(TestPid, {hookpoint_called, ErrCtx}).
|
||||||
|
|
||||||
|
is_standalone() ->
|
||||||
|
try
|
||||||
|
emqx_conf:module_info(),
|
||||||
|
false
|
||||||
|
catch
|
||||||
|
error:undef -> true
|
||||||
|
end.
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
|
|
||||||
open_db/2,
|
open_db/2,
|
||||||
close_db/1,
|
close_db/1,
|
||||||
|
which_dbs/0,
|
||||||
update_db_config/2,
|
update_db_config/2,
|
||||||
add_generation/1,
|
add_generation/1,
|
||||||
list_generations_with_lifetimes/1,
|
list_generations_with_lifetimes/1,
|
||||||
|
@ -264,13 +265,19 @@ open_db(DB, Opts = #{backend := Backend}) ->
|
||||||
error({no_such_backend, Backend});
|
error({no_such_backend, Backend});
|
||||||
Module ->
|
Module ->
|
||||||
persistent_term:put(?persistent_term(DB), Module),
|
persistent_term:put(?persistent_term(DB), Module),
|
||||||
|
emqx_ds_sup:register_db(DB, Backend),
|
||||||
?module(DB):open_db(DB, Opts)
|
?module(DB):open_db(DB, Opts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec close_db(db()) -> ok.
|
-spec close_db(db()) -> ok.
|
||||||
close_db(DB) ->
|
close_db(DB) ->
|
||||||
|
emqx_ds_sup:unregister_db(DB),
|
||||||
?module(DB):close_db(DB).
|
?module(DB):close_db(DB).
|
||||||
|
|
||||||
|
-spec which_dbs() -> [{db(), _Backend :: atom()}].
|
||||||
|
which_dbs() ->
|
||||||
|
emqx_ds_sup:which_dbs().
|
||||||
|
|
||||||
-spec add_generation(db()) -> ok.
|
-spec add_generation(db()) -> ok.
|
||||||
add_generation(DB) ->
|
add_generation(DB) ->
|
||||||
?module(DB):add_generation(DB).
|
?module(DB):add_generation(DB).
|
||||||
|
|
|
@ -39,7 +39,8 @@
|
||||||
%% Type declarations
|
%% Type declarations
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
|
-define(name(DB, SHARD), {n, l, {?MODULE, DB, SHARD}}).
|
||||||
|
-define(via(DB, SHARD), {via, gproc, ?name(DB, SHARD)}).
|
||||||
-define(flush, flush).
|
-define(flush, flush).
|
||||||
|
|
||||||
-define(cbm(DB), {?MODULE, DB}).
|
-define(cbm(DB), {?MODULE, DB}).
|
||||||
|
@ -66,7 +67,7 @@
|
||||||
|
|
||||||
-spec ls() -> [{emqx_ds:db(), _Shard}].
|
-spec ls() -> [{emqx_ds:db(), _Shard}].
|
||||||
ls() ->
|
ls() ->
|
||||||
MS = {{n, l, {?MODULE, '$1', '$2'}}, [], ['$1', '$2']},
|
MS = {{?name('$1', '$2'), '_', '_'}, [], [{{'$1', '$2'}}]},
|
||||||
gproc:select({local, names}, [MS]).
|
gproc:select({local, names}, [MS]).
|
||||||
|
|
||||||
-spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) ->
|
-spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) ->
|
||||||
|
|
|
@ -176,11 +176,14 @@ prometheus_collect(NodeOrAggr) ->
|
||||||
|
|
||||||
prometheus_per_db(NodeOrAggr) ->
|
prometheus_per_db(NodeOrAggr) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(DB, Acc) ->
|
fun
|
||||||
prometheus_per_db(NodeOrAggr, DB, Acc)
|
({DB, Backend}, Acc) when Backend =:= builtin_local; Backend =:= builtin_raft ->
|
||||||
|
prometheus_per_db(NodeOrAggr, DB, Acc);
|
||||||
|
({_, _}, Acc) ->
|
||||||
|
Acc
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
emqx_ds_builtin_raft_db_sup:which_dbs()
|
emqx_ds:which_dbs()
|
||||||
).
|
).
|
||||||
|
|
||||||
%% This function returns the data in the following format:
|
%% This function returns the data in the following format:
|
||||||
|
@ -235,18 +238,15 @@ prometheus_per_db(NodeOrAggr, DB, Acc0) ->
|
||||||
%% If `NodeOrAggr' = `node' then node name is appended to the list of
|
%% If `NodeOrAggr' = `node' then node name is appended to the list of
|
||||||
%% labels.
|
%% labels.
|
||||||
prometheus_per_shard(NodeOrAggr) ->
|
prometheus_per_shard(NodeOrAggr) ->
|
||||||
|
prometheus_buffer_metrics(NodeOrAggr).
|
||||||
|
|
||||||
|
prometheus_buffer_metrics(NodeOrAggr) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(DB, Acc0) ->
|
fun({DB, Shard}, Acc) ->
|
||||||
lists:foldl(
|
prometheus_per_shard(NodeOrAggr, DB, Shard, Acc)
|
||||||
fun(Shard, Acc) ->
|
|
||||||
prometheus_per_shard(NodeOrAggr, DB, Shard, Acc)
|
|
||||||
end,
|
|
||||||
Acc0,
|
|
||||||
emqx_ds_replication_layer_meta:shards(DB)
|
|
||||||
)
|
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
emqx_ds_builtin_raft_db_sup:which_dbs()
|
emqx_ds_buffer:ls()
|
||||||
).
|
).
|
||||||
|
|
||||||
prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) ->
|
prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) ->
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([start_link/0, attach_backend/2]).
|
-export([start_link/0, attach_backend/2]).
|
||||||
|
-export([register_db/2, unregister_db/1, which_dbs/0]).
|
||||||
|
|
||||||
%% behaviour callbacks:
|
%% behaviour callbacks:
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
@ -28,6 +29,7 @@
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-define(SUP, ?MODULE).
|
-define(SUP, ?MODULE).
|
||||||
|
-define(TAB, ?MODULE).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% API functions
|
%% API functions
|
||||||
|
@ -58,11 +60,23 @@ attach_backend(Backend, Start) ->
|
||||||
{error, Err}
|
{error, Err}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
register_db(DB, Backend) ->
|
||||||
|
ets:insert(?TAB, {DB, Backend}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
unregister_db(DB) ->
|
||||||
|
ets:delete(?TAB, DB),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
which_dbs() ->
|
||||||
|
ets:tab2list(?TAB).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behaviour callbacks
|
%% behaviour callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
init(top) ->
|
init(top) ->
|
||||||
|
_ = ets:new(?TAB, [public, set, named_table]),
|
||||||
Children = [],
|
Children = [],
|
||||||
SupFlags = #{
|
SupFlags = #{
|
||||||
strategy => one_for_one,
|
strategy => one_for_one,
|
||||||
|
|
Loading…
Reference in New Issue