fix(prom_mria): move mria callback into `emqx_prometheus.erl`
- they will return in same api endpoint.
This commit is contained in:
parent
bd13540e23
commit
58b60181d7
2
Makefile
2
Makefile
|
@ -21,7 +21,7 @@ endif
|
||||||
# Dashboard version
|
# Dashboard version
|
||||||
# from https://github.com/emqx/emqx-dashboard5
|
# from https://github.com/emqx/emqx-dashboard5
|
||||||
export EMQX_DASHBOARD_VERSION ?= v1.6.1
|
export EMQX_DASHBOARD_VERSION ?= v1.6.1
|
||||||
export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.8
|
export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.10
|
||||||
|
|
||||||
PROFILE ?= emqx
|
PROFILE ?= emqx
|
||||||
REL_PROFILES := emqx emqx-enterprise
|
REL_PROFILES := emqx emqx-enterprise
|
||||||
|
|
|
@ -1107,8 +1107,7 @@ tr_prometheus_collectors(Conf) ->
|
||||||
%% emqx collectors
|
%% emqx collectors
|
||||||
emqx_prometheus,
|
emqx_prometheus,
|
||||||
{'/prometheus/auth', emqx_prometheus_auth},
|
{'/prometheus/auth', emqx_prometheus_auth},
|
||||||
{'/prometheus/data_integration', emqx_prometheus_data_integration},
|
{'/prometheus/data_integration', emqx_prometheus_data_integration}
|
||||||
emqx_prometheus_mria
|
|
||||||
%% builtin vm collectors
|
%% builtin vm collectors
|
||||||
| prometheus_collectors(Conf)
|
| prometheus_collectors(Conf)
|
||||||
].
|
].
|
||||||
|
|
|
@ -82,6 +82,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(MG(K, MAP), maps:get(K, MAP)).
|
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||||
|
-define(MG(K, MAP, DEFAULT), maps:get(K, MAP, DEFAULT)).
|
||||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||||
|
|
||||||
-define(C(K, L), proplists:get_value(K, L, 0)).
|
-define(C(K, L), proplists:get_value(K, L, 0)).
|
||||||
|
@ -210,6 +211,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
|
||||||
ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)),
|
ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)),
|
||||||
|
|
||||||
ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)),
|
ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)),
|
||||||
|
ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)),
|
||||||
ok = maybe_license_add_collect_family(Callback, RawData),
|
ok = maybe_license_add_collect_family(Callback, RawData),
|
||||||
ok;
|
ok;
|
||||||
collect_mf(_Registry, _Callback) ->
|
collect_mf(_Registry, _Callback) ->
|
||||||
|
@ -259,7 +261,8 @@ fetch_from_local_node(Mode) ->
|
||||||
emqx_session_data => emqx_metric_data(session_metric_meta(), Mode),
|
emqx_session_data => emqx_metric_data(session_metric_meta(), Mode),
|
||||||
emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
|
emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
|
||||||
emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
|
emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
|
||||||
emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode)
|
emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode),
|
||||||
|
mria_data => mria_data(Mode)
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
fetch_cluster_consistented_data() ->
|
fetch_cluster_consistented_data() ->
|
||||||
|
@ -460,7 +463,19 @@ emqx_collect(K = emqx_authentication_failure, D) -> counter_metrics(?MG(K, D));
|
||||||
emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D));
|
emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D));
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Certs
|
%% Certs
|
||||||
emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)).
|
emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D));
|
||||||
|
%% Mria
|
||||||
|
%% ========== core
|
||||||
|
emqx_collect(K = emqx_mria_last_intercepted_trans, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_weight, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_replicants, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_server_mql, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
%% ========== replicant
|
||||||
|
emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
|
||||||
|
emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Indicators
|
%% Indicators
|
||||||
|
@ -889,6 +904,80 @@ utc_time_to_datetime(Str) ->
|
||||||
date_to_expiry_epoch(DateTime) ->
|
date_to_expiry_epoch(DateTime) ->
|
||||||
calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
|
calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
|
||||||
|
|
||||||
|
%%========================================
|
||||||
|
%% Mria
|
||||||
|
%%========================================
|
||||||
|
|
||||||
|
mria_metric_meta() ->
|
||||||
|
mria_metric_meta(core) ++ mria_metric_meta(replicant).
|
||||||
|
|
||||||
|
mria_metric_meta(core) ->
|
||||||
|
[
|
||||||
|
{emqx_mria_last_intercepted_trans, gauge, last_intercepted_trans},
|
||||||
|
{emqx_mria_weight, gauge, weight},
|
||||||
|
{emqx_mria_replicants, gauge, replicants},
|
||||||
|
{emqx_mria_server_mql, gauge, server_mql}
|
||||||
|
];
|
||||||
|
mria_metric_meta(replicant) ->
|
||||||
|
[
|
||||||
|
{emqx_mria_lag, gauge, lag},
|
||||||
|
{emqx_mria_bootstrap_time, gauge, bootstrap_time},
|
||||||
|
{emqx_mria_bootstrap_num_keys, gauge, bootstrap_num_keys},
|
||||||
|
{emqx_mria_message_queue_len, gauge, message_queue_len},
|
||||||
|
{emqx_mria_replayq_len, gauge, replayq_len}
|
||||||
|
].
|
||||||
|
|
||||||
|
mria_data(Mode) ->
|
||||||
|
case mria_rlog:backend() of
|
||||||
|
rlog ->
|
||||||
|
mria_data(mria_rlog:role(), Mode);
|
||||||
|
mnesia ->
|
||||||
|
#{}
|
||||||
|
end.
|
||||||
|
|
||||||
|
mria_data(Role, Mode) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun({Name, _Type, MetricK}, AccIn) ->
|
||||||
|
%% TODO: only report shards that are up
|
||||||
|
DataFun = fun() -> get_shard_metrics(Mode, MetricK) end,
|
||||||
|
AccIn#{
|
||||||
|
Name => catch_all(DataFun)
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
mria_metric_meta(Role)
|
||||||
|
).
|
||||||
|
|
||||||
|
get_shard_metrics(Mode, MetricK) ->
|
||||||
|
Labels =
|
||||||
|
case Mode of
|
||||||
|
node ->
|
||||||
|
[];
|
||||||
|
_ ->
|
||||||
|
[{node, node(self())}]
|
||||||
|
end,
|
||||||
|
[
|
||||||
|
{[{shard, Shard} | Labels], get_shard_metric(MetricK, Shard)}
|
||||||
|
|| Shard <- mria_schema:shards(), Shard =/= undefined
|
||||||
|
].
|
||||||
|
|
||||||
|
get_shard_metric(replicants, Shard) ->
|
||||||
|
length(mria_status:agents(Shard));
|
||||||
|
get_shard_metric(Metric, Shard) ->
|
||||||
|
case mria_status:get_shard_stats(Shard) of
|
||||||
|
#{Metric := Value} when is_number(Value) ->
|
||||||
|
Value;
|
||||||
|
_ ->
|
||||||
|
undefined
|
||||||
|
end.
|
||||||
|
|
||||||
|
catch_all(DataFun) ->
|
||||||
|
try
|
||||||
|
DataFun()
|
||||||
|
catch
|
||||||
|
_:_ -> undefined
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Collect functions
|
%% Collect functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -1,102 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-module(emqx_prometheus_mria).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
deregister_cleanup/1,
|
|
||||||
collect_mf/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-include_lib("prometheus/include/prometheus.hrl").
|
|
||||||
|
|
||||||
%% Please don't remove this attribute, prometheus uses it to
|
|
||||||
%% automatically register collectors.
|
|
||||||
-behaviour(prometheus_collector).
|
|
||||||
|
|
||||||
%%====================================================================
|
|
||||||
%% Macros
|
|
||||||
%%====================================================================
|
|
||||||
|
|
||||||
-define(METRIC_NAME_PREFIX, "emqx_mria_").
|
|
||||||
|
|
||||||
%%====================================================================
|
|
||||||
%% Collector API
|
|
||||||
%%====================================================================
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
deregister_cleanup(_) -> ok.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
-spec collect_mf(_Registry, Callback) -> ok when
|
|
||||||
_Registry :: prometheus_registry:registry(),
|
|
||||||
Callback :: prometheus_collector:collect_mf_callback().
|
|
||||||
collect_mf(_Registry, Callback) ->
|
|
||||||
case mria_rlog:backend() of
|
|
||||||
rlog ->
|
|
||||||
Metrics = metrics(),
|
|
||||||
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
|
|
||||||
ok;
|
|
||||||
mnesia ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
add_metric_family({Name, Metrics}, Callback) ->
|
|
||||||
Callback(
|
|
||||||
prometheus_model_helpers:create_mf(
|
|
||||||
?METRIC_NAME(Name),
|
|
||||||
<<"">>,
|
|
||||||
gauge,
|
|
||||||
catch_all(Metrics)
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
||||||
%%====================================================================
|
|
||||||
%% Internal functions
|
|
||||||
%%====================================================================
|
|
||||||
|
|
||||||
metrics() ->
|
|
||||||
Metrics =
|
|
||||||
case mria_rlog:role() of
|
|
||||||
replicant ->
|
|
||||||
[lag, bootstrap_time, bootstrap_num_keys, message_queue_len, replayq_len];
|
|
||||||
core ->
|
|
||||||
[last_intercepted_trans, weight, replicants, server_mql]
|
|
||||||
end,
|
|
||||||
[{MetricId, fun() -> get_shard_metric(MetricId) end} || MetricId <- Metrics].
|
|
||||||
|
|
||||||
get_shard_metric(Metric) ->
|
|
||||||
%% TODO: only report shards that are up
|
|
||||||
[
|
|
||||||
{[{shard, Shard}], get_shard_metric(Metric, Shard)}
|
|
||||||
|| Shard <- mria_schema:shards(), Shard =/= undefined
|
|
||||||
].
|
|
||||||
|
|
||||||
get_shard_metric(replicants, Shard) ->
|
|
||||||
length(mria_status:agents(Shard));
|
|
||||||
get_shard_metric(Metric, Shard) ->
|
|
||||||
case mria_status:get_shard_stats(Shard) of
|
|
||||||
#{Metric := Value} when is_number(Value) ->
|
|
||||||
Value;
|
|
||||||
_ ->
|
|
||||||
undefined
|
|
||||||
end.
|
|
||||||
|
|
||||||
catch_all(DataFun) ->
|
|
||||||
try
|
|
||||||
DataFun()
|
|
||||||
catch
|
|
||||||
_:_ -> undefined
|
|
||||||
end.
|
|
Loading…
Reference in New Issue