Merge pull request #12299 from JimMoen/EMQX-9700-improve-observability
EMQX-9700 improve observability
This commit is contained in:
commit
59c5e3c16f
|
@ -58,6 +58,7 @@
|
|||
{emqx_persistent_session_ds,1}.
|
||||
{emqx_plugins,1}.
|
||||
{emqx_prometheus,1}.
|
||||
{emqx_prometheus,2}.
|
||||
{emqx_resource,1}.
|
||||
{emqx_retainer,1}.
|
||||
{emqx_retainer,2}.
|
||||
|
|
|
@ -166,6 +166,8 @@ names() ->
|
|||
emqx_live_connections_max,
|
||||
emqx_sessions_count,
|
||||
emqx_sessions_max,
|
||||
emqx_channels_count,
|
||||
emqx_channels_max,
|
||||
emqx_topics_count,
|
||||
emqx_topics_max,
|
||||
emqx_suboptions_count,
|
||||
|
|
|
@ -1106,6 +1106,8 @@ tr_prometheus_collectors(Conf) ->
|
|||
prometheus_summary,
|
||||
%% emqx collectors
|
||||
emqx_prometheus,
|
||||
{'/prometheus/auth', emqx_prometheus_auth},
|
||||
{'/prometheus/data_integration', emqx_prometheus_data_integration},
|
||||
emqx_prometheus_mria
|
||||
%% builtin vm collectors
|
||||
| prometheus_collectors(Conf)
|
||||
|
|
|
@ -40,11 +40,14 @@
|
|||
-export([
|
||||
samplers/0,
|
||||
samplers/2,
|
||||
current_rate/0,
|
||||
current_rate/1,
|
||||
granularity_adapter/1
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([current_rate_cluster/0]).
|
||||
-endif.
|
||||
|
||||
%% for rpc
|
||||
-export([do_sample/2]).
|
||||
|
||||
|
@ -112,8 +115,33 @@ granularity_adapter(List) when length(List) > 1000 ->
|
|||
granularity_adapter(List) ->
|
||||
List.
|
||||
|
||||
current_rate(all) ->
|
||||
current_rate_cluster();
|
||||
current_rate(Node) when Node == node() ->
|
||||
try
|
||||
{ok, Rate} = do_call(current_rate),
|
||||
{ok, Rate}
|
||||
catch
|
||||
_E:R ->
|
||||
?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
|
||||
%% Rate map 0, ensure api will not crash.
|
||||
%% When joining cluster, dashboard monitor restart.
|
||||
Rate0 = [
|
||||
{Key, 0}
|
||||
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
|
||||
],
|
||||
{ok, maps:merge(maps:from_list(Rate0), non_rate_value())}
|
||||
end;
|
||||
current_rate(Node) ->
|
||||
case emqx_dashboard_proto_v1:current_rate(Node) of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, {Node, Reason}};
|
||||
{ok, Rate} ->
|
||||
{ok, Rate}
|
||||
end.
|
||||
|
||||
%% Get the current rate. Not the current sampler data.
|
||||
current_rate() ->
|
||||
current_rate_cluster() ->
|
||||
Fun =
|
||||
fun
|
||||
(Node, Cluster) when is_map(Cluster) ->
|
||||
|
@ -133,31 +161,6 @@ current_rate() ->
|
|||
{ok, Rate}
|
||||
end.
|
||||
|
||||
current_rate(all) ->
|
||||
current_rate();
|
||||
current_rate(Node) when Node == node() ->
|
||||
try
|
||||
{ok, Rate} = do_call(current_rate),
|
||||
{ok, Rate}
|
||||
catch
|
||||
_E:R ->
|
||||
?SLOG(warning, #{msg => "dashboard_monitor_error", reason => R}),
|
||||
%% Rate map 0, ensure api will not crash.
|
||||
%% When joining cluster, dashboard monitor restart.
|
||||
Rate0 = [
|
||||
{Key, 0}
|
||||
|| Key <- ?GAUGE_SAMPLER_LIST ++ maps:values(?DELTA_SAMPLER_RATE_MAP)
|
||||
],
|
||||
{ok, maps:from_list(Rate0)}
|
||||
end;
|
||||
current_rate(Node) ->
|
||||
case emqx_dashboard_proto_v1:current_rate(Node) of
|
||||
{badrpc, Reason} ->
|
||||
{badrpc, {Node, Reason}};
|
||||
{ok, Rate} ->
|
||||
{ok, Rate}
|
||||
end.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% gen_server functions
|
||||
|
||||
|
@ -173,7 +176,9 @@ handle_call(current_rate, _From, State = #state{last = Last}) ->
|
|||
NowTime = erlang:system_time(millisecond),
|
||||
NowSamplers = sample(NowTime),
|
||||
Rate = cal_rate(NowSamplers, Last),
|
||||
{reply, {ok, Rate}, State};
|
||||
NonRateValue = non_rate_value(),
|
||||
Samples = maps:merge(Rate, NonRateValue),
|
||||
{reply, {ok, Samples}, State};
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
|
@ -256,8 +261,16 @@ merge_cluster_sampler_map(M1, M2) ->
|
|||
merge_cluster_rate(Node, Cluster) ->
|
||||
Fun =
|
||||
fun
|
||||
(topics, Value, NCluster) ->
|
||||
NCluster#{topics => Value};
|
||||
%% cluster-synced values
|
||||
(topics, V, NCluster) ->
|
||||
NCluster#{topics => V};
|
||||
(retained_msg_count, V, NCluster) ->
|
||||
NCluster#{retained_msg_count => V};
|
||||
(license_quota, V, NCluster) ->
|
||||
NCluster#{license_quota => V};
|
||||
%% for cluster sample, ignore node_uptime
|
||||
(node_uptime, _V, NCluster) ->
|
||||
NCluster;
|
||||
(Key, Value, NCluster) ->
|
||||
ClusterValue = maps:get(Key, NCluster, 0),
|
||||
NCluster#{Key => Value + ClusterValue}
|
||||
|
@ -409,3 +422,26 @@ stats(received_bytes) -> emqx_metrics:val('bytes.received');
|
|||
stats(sent) -> emqx_metrics:val('messages.sent');
|
||||
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
|
||||
stats(dropped) -> emqx_metrics:val('messages.dropped').
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Retained && License Quota
|
||||
|
||||
%% the non rate values should be same on all nodes
|
||||
non_rate_value() ->
|
||||
(license_quota())#{
|
||||
retained_msg_count => emqx_retainer:retained_count(),
|
||||
node_uptime => emqx_sys:uptime()
|
||||
}.
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
license_quota() ->
|
||||
case emqx_license_checker:limits() of
|
||||
{ok, #{max_connections := Quota}} ->
|
||||
#{license_quota => Quota};
|
||||
{error, no_license} ->
|
||||
#{license_quota => 0}
|
||||
end.
|
||||
-else.
|
||||
license_quota() ->
|
||||
#{}.
|
||||
-endif.
|
||||
|
|
|
@ -1,5 +1,17 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_dashboard_monitor_api).
|
||||
|
@ -133,13 +145,15 @@ dashboard_samplers_fun(Latest) ->
|
|||
end
|
||||
end.
|
||||
|
||||
monitor_current(get, #{bindings := []}) ->
|
||||
emqx_utils_api:with_node_or_cluster(erlang:node(), fun emqx_dashboard_monitor:current_rate/1);
|
||||
monitor_current(get, #{bindings := Bindings}) ->
|
||||
RawNode = maps:get(node, Bindings, <<"all">>),
|
||||
emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1).
|
||||
|
||||
-spec current_rate(atom()) ->
|
||||
{error, term()}
|
||||
| {ok, Result :: map()}.
|
||||
current_rate(Node) ->
|
||||
%% Node :: 'all' or `NodeName`
|
||||
case emqx_dashboard_monitor:current_rate(Node) of
|
||||
{badrpc, _} = BadRpc ->
|
||||
{error, BadRpc};
|
||||
|
|
|
@ -31,10 +31,13 @@ all() ->
|
|||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
meck:new(emqx_retainer, [non_strict, passthrough, no_history, no_link]),
|
||||
meck:expect(emqx_retainer, retained_count, fun() -> 0 end),
|
||||
emqx_mgmt_api_test_util:init_suite([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
meck:unload([emqx_retainer]),
|
||||
emqx_mgmt_api_test_util:end_suite([]).
|
||||
|
||||
t_monitor_samplers_all(_Config) ->
|
||||
|
@ -198,5 +201,5 @@ waiting_emqx_stats_and_monitor_update(WaitKey) ->
|
|||
end,
|
||||
meck:unload([emqx_stats]),
|
||||
%% manually call monitor update
|
||||
_ = emqx_dashboard_monitor:current_rate(),
|
||||
_ = emqx_dashboard_monitor:current_rate_cluster(),
|
||||
ok.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_license, [
|
||||
{description, "EMQX License"},
|
||||
{vsn, "5.0.14"},
|
||||
{vsn, "5.0.15"},
|
||||
{modules, []},
|
||||
{registered, [emqx_license_sup]},
|
||||
{applications, [kernel, stdlib, emqx_ctl]},
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
start_link/2,
|
||||
update/1,
|
||||
dump/0,
|
||||
expiry_epoch/0,
|
||||
purge/0,
|
||||
limits/0,
|
||||
print_warnings/1
|
||||
|
@ -67,6 +68,10 @@ update(License) ->
|
|||
dump() ->
|
||||
gen_server:call(?MODULE, dump, infinity).
|
||||
|
||||
-spec expiry_epoch() -> integer().
|
||||
expiry_epoch() ->
|
||||
gen_server:call(?MODULE, expiry_epoch, infinity).
|
||||
|
||||
-spec limits() -> {ok, limits()} | {error, any()}.
|
||||
limits() ->
|
||||
try ets:lookup(?LICENSE_TAB, limits) of
|
||||
|
@ -111,6 +116,9 @@ handle_call({update, License}, _From, #{license := Old} = State) ->
|
|||
{reply, check_license(License), State1#{license => License}};
|
||||
handle_call(dump, _From, #{license := License} = State) ->
|
||||
{reply, emqx_license_parser:dump(License), State};
|
||||
handle_call(expiry_epoch, _From, #{license := License} = State) ->
|
||||
ExpiryEpoch = date_to_expiry_epoch(emqx_license_parser:expiry_date(License)),
|
||||
{reply, ExpiryEpoch, State};
|
||||
handle_call(purge, _From, State) ->
|
||||
_ = ets:delete_all_objects(?LICENSE_TAB),
|
||||
{reply, ok, State};
|
||||
|
@ -234,6 +242,12 @@ small_customer_overdue(_CType, _DaysLeft) -> false.
|
|||
non_official_license_overdue(?OFFICIAL, _) -> false;
|
||||
non_official_license_overdue(_, DaysLeft) -> DaysLeft < 0.
|
||||
|
||||
%% 62167219200 =:= calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}).
|
||||
-define(EPOCH_START, 62167219200).
|
||||
-spec date_to_expiry_epoch(calendar:date()) -> Seconds :: non_neg_integer().
|
||||
date_to_expiry_epoch({Y, M, D}) ->
|
||||
calendar:datetime_to_gregorian_seconds({{Y, M, D}, {0, 0, 0}}) - ?EPOCH_START.
|
||||
|
||||
apply_limits(Limits) ->
|
||||
ets:insert(?LICENSE_TAB, {limits, Limits}).
|
||||
|
||||
|
|
|
@ -16,3 +16,30 @@
|
|||
|
||||
-define(APP, emqx_prometheus).
|
||||
-define(PROMETHEUS, [prometheus]).
|
||||
|
||||
-define(PROMETHEUS_DEFAULT_REGISTRY, default).
|
||||
-define(PROMETHEUS_AUTH_REGISTRY, '/prometheus/auth').
|
||||
-define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth).
|
||||
-define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration').
|
||||
-define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration).
|
||||
|
||||
-define(PROMETHEUS_ALL_REGISTRYS, [
|
||||
?PROMETHEUS_DEFAULT_REGISTRY,
|
||||
?PROMETHEUS_AUTH_REGISTRY,
|
||||
?PROMETHEUS_DATA_INTEGRATION_REGISTRY
|
||||
]).
|
||||
|
||||
-define(PROM_DATA_MODE__NODE, node).
|
||||
-define(PROM_DATA_MODE__ALL_NODES_AGGREGATED, all_nodes_aggregated).
|
||||
-define(PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, all_nodes_unaggregated).
|
||||
|
||||
-define(PROM_DATA_MODES, [
|
||||
?PROM_DATA_MODE__NODE,
|
||||
?PROM_DATA_MODE__ALL_NODES_AGGREGATED,
|
||||
?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED
|
||||
]).
|
||||
|
||||
-define(PROM_DATA_MODE_KEY__, prom_data_mode).
|
||||
|
||||
-define(PUT_PROM_DATA_MODE(MODE__), erlang:put(?PROM_DATA_MODE_KEY__, MODE__)).
|
||||
-define(GET_PROM_DATA_MODE(), erlang:get(?PROM_DATA_MODE_KEY__)).
|
||||
|
|
|
@ -3,7 +3,8 @@
|
|||
{deps, [
|
||||
{emqx, {path, "../emqx"}},
|
||||
{emqx_utils, {path, "../emqx_utils"}},
|
||||
{prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.1"}}}
|
||||
{emqx_auth, {path, "../emqx_auth"}},
|
||||
{prometheus, {git, "https://github.com/emqx/prometheus.erl", {tag, "v4.10.0.2"}}}
|
||||
]}.
|
||||
|
||||
{edoc_opts, [{preprocess, true}]}.
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
{vsn, "5.0.19"},
|
||||
{modules, []},
|
||||
{registered, [emqx_prometheus_sup]},
|
||||
{applications, [kernel, stdlib, prometheus, emqx, emqx_management]},
|
||||
{applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_management]},
|
||||
{mod, {emqx_prometheus_app, []}},
|
||||
{env, []},
|
||||
{licenses, ["Apache-2.0"]},
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -18,20 +18,37 @@
|
|||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-include("emqx_prometheus.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-import(
|
||||
hoconsc,
|
||||
[
|
||||
mk/2,
|
||||
ref/1
|
||||
]
|
||||
).
|
||||
|
||||
-export([
|
||||
api_spec/0,
|
||||
paths/0,
|
||||
schema/1
|
||||
schema/1,
|
||||
fields/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
setting/2,
|
||||
stats/2
|
||||
stats/2,
|
||||
auth/2,
|
||||
data_integration/2
|
||||
]).
|
||||
|
||||
-export([lookup_from_local_nodes/3]).
|
||||
|
||||
-define(TAGS, [<<"Monitor">>]).
|
||||
-define(IS_TRUE(Val), ((Val =:= true) orelse (Val =:= <<"true">>))).
|
||||
-define(IS_FALSE(Val), ((Val =:= false) orelse (Val =:= <<"false">>))).
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
@ -39,7 +56,9 @@ api_spec() ->
|
|||
paths() ->
|
||||
[
|
||||
"/prometheus",
|
||||
"/prometheus/stats"
|
||||
"/prometheus/auth",
|
||||
"/prometheus/stats",
|
||||
"/prometheus/data_integration"
|
||||
].
|
||||
|
||||
schema("/prometheus") ->
|
||||
|
@ -61,6 +80,19 @@ schema("/prometheus") ->
|
|||
#{200 => prometheus_setting_response()}
|
||||
}
|
||||
};
|
||||
schema("/prometheus/auth") ->
|
||||
#{
|
||||
'operationId' => auth,
|
||||
get =>
|
||||
#{
|
||||
description => ?DESC(get_prom_auth_data),
|
||||
tags => ?TAGS,
|
||||
parameters => [ref(mode)],
|
||||
security => security(),
|
||||
responses =>
|
||||
#{200 => prometheus_data_schema()}
|
||||
}
|
||||
};
|
||||
schema("/prometheus/stats") ->
|
||||
#{
|
||||
'operationId' => stats,
|
||||
|
@ -68,6 +100,20 @@ schema("/prometheus/stats") ->
|
|||
#{
|
||||
description => ?DESC(get_prom_data),
|
||||
tags => ?TAGS,
|
||||
parameters => [ref(mode)],
|
||||
security => security(),
|
||||
responses =>
|
||||
#{200 => prometheus_data_schema()}
|
||||
}
|
||||
};
|
||||
schema("/prometheus/data_integration") ->
|
||||
#{
|
||||
'operationId' => data_integration,
|
||||
get =>
|
||||
#{
|
||||
description => ?DESC(get_prom_data_integration_data),
|
||||
tags => ?TAGS,
|
||||
parameters => [ref(mode)],
|
||||
security => security(),
|
||||
responses =>
|
||||
#{200 => prometheus_data_schema()}
|
||||
|
@ -79,6 +125,41 @@ security() ->
|
|||
true -> [#{'basicAuth' => []}, #{'bearerAuth' => []}];
|
||||
false -> []
|
||||
end.
|
||||
|
||||
%% erlfmt-ignore
|
||||
fields(mode) ->
|
||||
[
|
||||
{mode,
|
||||
mk(
|
||||
hoconsc:enum(?PROM_DATA_MODES),
|
||||
#{
|
||||
default => node,
|
||||
desc => <<"
|
||||
Metrics format mode.
|
||||
|
||||
`node`:
|
||||
Return metrics from local node. And it is the default behaviour if `mode` not specified.
|
||||
|
||||
`all_nodes_aggregated`:
|
||||
Return metrics for all nodes.
|
||||
And if possible, calculate the arithmetic sum or logical sum of the indicators of all nodes.
|
||||
|
||||
`all_nodes_unaggregated`:
|
||||
Return metrics from all nodes, and the metrics are not aggregated.
|
||||
The node name will be included in the returned results to
|
||||
indicate that certain metrics were returned on a certain node.
|
||||
">>,
|
||||
in => query,
|
||||
required => false,
|
||||
example => node
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
%% bpapi
|
||||
lookup_from_local_nodes(M, F, A) ->
|
||||
erlang:apply(M, F, A).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API Handler funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -100,24 +181,60 @@ setting(put, #{body := Body}) ->
|
|||
{500, 'INTERNAL_ERROR', Message}
|
||||
end.
|
||||
|
||||
stats(get, #{headers := Headers}) ->
|
||||
Type =
|
||||
case maps:get(<<"accept">>, Headers, <<"text/plain">>) of
|
||||
<<"application/json">> -> <<"json">>;
|
||||
_ -> <<"prometheus">>
|
||||
end,
|
||||
Data = emqx_prometheus:collect(Type),
|
||||
case Type of
|
||||
<<"json">> ->
|
||||
{200, Data};
|
||||
<<"prometheus">> ->
|
||||
{200, #{<<"content-type">> => <<"text/plain">>}, Data}
|
||||
end.
|
||||
stats(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus, collect_opts(Headers, Qs)).
|
||||
|
||||
auth(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_auth, collect_opts(Headers, Qs)).
|
||||
|
||||
data_integration(get, #{headers := Headers, query_string := Qs}) ->
|
||||
collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
collect(Module, #{type := Type, mode := Mode}) ->
|
||||
%% `Mode` is used to control the format of the returned data
|
||||
%% It will used in callback `Module:collect_mf/1` to fetch data from node or cluster
|
||||
%% And use this mode parameter to determine the formatting method of the returned information.
|
||||
%% Since the arity of the callback function has been fixed.
|
||||
%% so it is placed in the process dictionary of the current process.
|
||||
?PUT_PROM_DATA_MODE(Mode),
|
||||
Data =
|
||||
case erlang:function_exported(Module, collect, 1) of
|
||||
true ->
|
||||
erlang:apply(Module, collect, [Type]);
|
||||
false ->
|
||||
?SLOG(error, #{
|
||||
msg => "prometheus callback module not found, empty data responded",
|
||||
module_name => Module
|
||||
}),
|
||||
<<>>
|
||||
end,
|
||||
gen_response(Type, Data).
|
||||
|
||||
collect_opts(Headers, Qs) ->
|
||||
#{type => response_type(Headers), mode => mode(Qs)}.
|
||||
|
||||
response_type(#{<<"accept">> := <<"application/json">>}) ->
|
||||
<<"json">>;
|
||||
response_type(_) ->
|
||||
<<"prometheus">>.
|
||||
|
||||
mode(#{<<"mode">> := Mode}) ->
|
||||
case lists:member(Mode, ?PROM_DATA_MODES) of
|
||||
true -> Mode;
|
||||
false -> ?PROM_DATA_MODE__NODE
|
||||
end;
|
||||
mode(_) ->
|
||||
?PROM_DATA_MODE__NODE.
|
||||
|
||||
gen_response(<<"json">>, Data) ->
|
||||
{200, Data};
|
||||
gen_response(<<"prometheus">>, Data) ->
|
||||
{200, #{<<"content-type">> => <<"text/plain">>}, Data}.
|
||||
|
||||
prometheus_setting_request() ->
|
||||
[{prometheus, #{type := Setting}}] = emqx_prometheus_schema:roots(),
|
||||
emqx_dashboard_swagger:schema_with_examples(
|
||||
|
@ -181,7 +298,7 @@ recommend_setting_example() ->
|
|||
prometheus_data_schema() ->
|
||||
#{
|
||||
description =>
|
||||
<<"Get Prometheus Data. Note that support for JSON output is deprecated and will be removed in v5.2.">>,
|
||||
<<"Get Prometheus Data.">>,
|
||||
content =>
|
||||
[
|
||||
{'text/plain', #{schema => #{type => string}}},
|
||||
|
|
|
@ -0,0 +1,483 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_prometheus_auth).
|
||||
|
||||
-export([
|
||||
deregister_cleanup/1,
|
||||
collect_mf/2,
|
||||
collect_metrics/2
|
||||
]).
|
||||
|
||||
-export([collect/1]).
|
||||
|
||||
%% for bpapi
|
||||
-behaviour(emqx_prometheus_cluster).
|
||||
-export([
|
||||
fetch_data_from_local_node/0,
|
||||
fetch_cluster_consistented_data/0,
|
||||
aggre_or_zip_init_acc/0,
|
||||
logic_sum_metrics/0
|
||||
]).
|
||||
|
||||
%% %% @private
|
||||
-export([
|
||||
zip_json_auth_metrics/3
|
||||
]).
|
||||
|
||||
-include("emqx_prometheus.hrl").
|
||||
-include_lib("emqx_auth/include/emqx_authn_chains.hrl").
|
||||
-include_lib("prometheus/include/prometheus.hrl").
|
||||
|
||||
-import(
|
||||
prometheus_model_helpers,
|
||||
[
|
||||
create_mf/5,
|
||||
gauge_metric/1,
|
||||
gauge_metrics/1,
|
||||
counter_metrics/1
|
||||
]
|
||||
).
|
||||
|
||||
-type authn_metric_name() ::
|
||||
emqx_authn_enable
|
||||
| emqx_authn_status
|
||||
| emqx_authn_nomatch
|
||||
| emqx_authn_total
|
||||
| emqx_authn_success
|
||||
| emqx_authn_failed.
|
||||
|
||||
-type authz_metric_name() ::
|
||||
emqx_authz_enable
|
||||
| emqx_authz_status
|
||||
| emqx_authz_nomatch
|
||||
| emqx_authz_total
|
||||
| emqx_authz_success
|
||||
| emqx_authz_failed.
|
||||
|
||||
%% Please don't remove this attribute, prometheus uses it to
|
||||
%% automatically register collectors.
|
||||
-behaviour(prometheus_collector).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Macros
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(METRIC_NAME_PREFIX, "emqx_auth_").
|
||||
|
||||
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collector API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @private
|
||||
deregister_cleanup(_) -> ok.
|
||||
|
||||
%% @private
|
||||
-spec collect_mf(_Registry, Callback) -> ok when
|
||||
_Registry :: prometheus_registry:registry(),
|
||||
Callback :: prometheus_collector:collect_mf_callback().
|
||||
%% erlfmt-ignore
|
||||
collect_mf(?PROMETHEUS_AUTH_REGISTRY, Callback) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
ok = add_collect_family(Callback, authn_metric_meta(), ?MG(authn_data, RawData)),
|
||||
ok = add_collect_family(Callback, authn_users_count_metric_meta(), ?MG(authn_users_count_data, RawData)),
|
||||
ok = add_collect_family(Callback, authz_metric_meta(), ?MG(authz_data, RawData)),
|
||||
ok = add_collect_family(Callback, authz_rules_count_metric_meta(), ?MG(authz_rules_count_data, RawData)),
|
||||
ok = add_collect_family(Callback, banned_count_metric_meta(), ?MG(banned_count_data, RawData)),
|
||||
ok;
|
||||
collect_mf(_, _) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
collect(<<"json">>) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
#{
|
||||
emqx_authn => collect_json_data(?MG(authn_data, RawData)),
|
||||
emqx_authz => collect_json_data(?MG(authz_data, RawData)),
|
||||
emqx_banned => collect_banned_data()
|
||||
};
|
||||
collect(<<"prometheus">>) ->
|
||||
prometheus_text_format:format(?PROMETHEUS_AUTH_REGISTRY).
|
||||
|
||||
add_collect_family(Callback, MetricWithType, Data) ->
|
||||
_ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
|
||||
ok.
|
||||
|
||||
add_collect_family(Name, Data, Callback, Type) ->
|
||||
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
|
||||
|
||||
collect_metrics(Name, Metrics) ->
|
||||
collect_auth(Name, Metrics).
|
||||
|
||||
%% behaviour
|
||||
fetch_data_from_local_node() ->
|
||||
{node(self()), #{
|
||||
authn_data => authn_data(),
|
||||
authz_data => authz_data()
|
||||
}}.
|
||||
|
||||
fetch_cluster_consistented_data() ->
|
||||
#{
|
||||
authn_users_count_data => authn_users_count_data(),
|
||||
authz_rules_count_data => authz_rules_count_data(),
|
||||
banned_count_data => banned_count_data()
|
||||
}.
|
||||
|
||||
aggre_or_zip_init_acc() ->
|
||||
#{
|
||||
authn_data => maps:from_keys(authn_metric(names), []),
|
||||
authz_data => maps:from_keys(authz_metric(names), [])
|
||||
}.
|
||||
|
||||
logic_sum_metrics() ->
|
||||
[
|
||||
emqx_authn_enable,
|
||||
emqx_authn_status,
|
||||
emqx_authz_enable,
|
||||
emqx_authz_status
|
||||
].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collector
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%====================
|
||||
%% Authn overview
|
||||
collect_auth(K = emqx_authn_enable, Data) ->
|
||||
gauge_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authn_status, Data) ->
|
||||
gauge_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authn_nomatch, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authn_total, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authn_success, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authn_failed, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
%%====================
|
||||
%% Authn users count
|
||||
%% Only provided for `password_based:built_in_database` and `scram:built_in_database`
|
||||
collect_auth(K = emqx_authn_users_count, Data) ->
|
||||
gauge_metrics(?MG(K, Data));
|
||||
%%====================
|
||||
%% Authz overview
|
||||
collect_auth(K = emqx_authz_enable, Data) ->
|
||||
gauge_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authz_status, Data) ->
|
||||
gauge_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authz_nomatch, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authz_total, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authz_success, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
collect_auth(K = emqx_authz_failed, Data) ->
|
||||
counter_metrics(?MG(K, Data));
|
||||
%%====================
|
||||
%% Authz rules count
|
||||
%% Only provided for `file` and `built_in_database`
|
||||
collect_auth(K = emqx_authz_rules_count, Data) ->
|
||||
gauge_metrics(?MG(K, Data));
|
||||
%%====================
|
||||
%% Banned
|
||||
collect_auth(emqx_banned_count, Data) ->
|
||||
gauge_metric(Data).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% AuthN (Authentication)
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% Authn overview
|
||||
|
||||
authn_metric_meta() ->
|
||||
[
|
||||
{emqx_authn_enable, gauge},
|
||||
{emqx_authn_status, gauge},
|
||||
{emqx_authn_nomatch, counter},
|
||||
{emqx_authn_total, counter},
|
||||
{emqx_authn_success, counter},
|
||||
{emqx_authn_failed, counter}
|
||||
].
|
||||
|
||||
authn_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(authn_metric_meta()).
|
||||
|
||||
-spec authn_data() -> #{Key => [Point]} when
|
||||
Key :: authn_metric_name(),
|
||||
Point :: {[Label], Metric},
|
||||
Label :: IdLabel,
|
||||
IdLabel :: {id, AuthnName :: binary()},
|
||||
Metric :: number().
|
||||
authn_data() ->
|
||||
Authns = emqx_config:get([authentication]),
|
||||
lists:foldl(
|
||||
fun(Key, AccIn) ->
|
||||
AccIn#{Key => authn_backend_to_points(Key, Authns)}
|
||||
end,
|
||||
#{},
|
||||
authn_metric(names)
|
||||
).
|
||||
|
||||
-spec authn_backend_to_points(Key, list(Authn)) -> list(Point) when
|
||||
Key :: authn_metric_name(),
|
||||
Authn :: map(),
|
||||
Point :: {[Label], Metric},
|
||||
Label :: IdLabel,
|
||||
IdLabel :: {id, AuthnName :: binary()},
|
||||
Metric :: number().
|
||||
authn_backend_to_points(Key, Authns) ->
|
||||
do_authn_backend_to_points(Key, Authns, []).
|
||||
|
||||
do_authn_backend_to_points(_K, [], AccIn) ->
|
||||
lists:reverse(AccIn);
|
||||
do_authn_backend_to_points(K, [Authn | Rest], AccIn) ->
|
||||
Id = authenticator_id(Authn),
|
||||
Point = {[{id, Id}], do_metric(K, Authn, lookup_authn_metrics_local(Id))},
|
||||
do_authn_backend_to_points(K, Rest, [Point | AccIn]).
|
||||
|
||||
lookup_authn_metrics_local(Id) ->
|
||||
case emqx_authn_api:lookup_from_local_node(?GLOBAL, Id) of
|
||||
{ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} ->
|
||||
#{
|
||||
emqx_authn_status => emqx_prometheus_cluster:status_to_number(Status),
|
||||
emqx_authn_nomatch => ?MG0(nomatch, Counters),
|
||||
emqx_authn_total => ?MG0(total, Counters),
|
||||
emqx_authn_success => ?MG0(success, Counters),
|
||||
emqx_authn_failed => ?MG0(failed, Counters)
|
||||
};
|
||||
{error, _Reason} ->
|
||||
maps:from_keys(authn_metric(names) -- [emqx_authn_enable], 0)
|
||||
end.
|
||||
|
||||
%%====================
|
||||
%% Authn users count
|
||||
|
||||
authn_users_count_metric_meta() ->
|
||||
[
|
||||
{emqx_authn_users_count, gauge}
|
||||
].
|
||||
|
||||
-define(AUTHN_MNESIA, emqx_authn_mnesia).
|
||||
-define(AUTHN_SCRAM_MNESIA, emqx_authn_scram_mnesia).
|
||||
|
||||
authn_users_count_data() ->
|
||||
Samples = lists:foldl(
|
||||
fun
|
||||
(#{backend := built_in_database, mechanism := password_based} = Authn, AccIn) ->
|
||||
[auth_data_sample_point(authn, Authn, ?AUTHN_MNESIA) | AccIn];
|
||||
(#{backend := built_in_database, mechanism := scram} = Authn, AccIn) ->
|
||||
[auth_data_sample_point(authn, Authn, ?AUTHN_SCRAM_MNESIA) | AccIn];
|
||||
(_, AccIn) ->
|
||||
AccIn
|
||||
end,
|
||||
[],
|
||||
emqx_config:get([authentication])
|
||||
),
|
||||
#{emqx_authn_users_count => Samples}.
|
||||
|
||||
%%========================================
|
||||
%% AuthZ (Authorization)
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% Authz overview
|
||||
|
||||
authz_metric_meta() ->
|
||||
[
|
||||
{emqx_authz_enable, gauge},
|
||||
{emqx_authz_status, gauge},
|
||||
{emqx_authz_nomatch, counter},
|
||||
{emqx_authz_total, counter},
|
||||
{emqx_authz_success, counter},
|
||||
{emqx_authz_failed, counter}
|
||||
].
|
||||
|
||||
authz_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(authz_metric_meta()).
|
||||
|
||||
-spec authz_data() -> #{Key => [Point]} when
|
||||
Key :: authz_metric_name(),
|
||||
Point :: {[Label], Metric},
|
||||
Label :: TypeLabel,
|
||||
TypeLabel :: {type, AuthZType :: binary()},
|
||||
Metric :: number().
|
||||
authz_data() ->
|
||||
Authzs = emqx_config:get([authorization, sources]),
|
||||
lists:foldl(
|
||||
fun(Key, AccIn) ->
|
||||
AccIn#{Key => authz_backend_to_points(Key, Authzs)}
|
||||
end,
|
||||
#{},
|
||||
authz_metric(names)
|
||||
).
|
||||
|
||||
-spec authz_backend_to_points(Key, list(Authz)) -> list(Point) when
|
||||
Key :: authz_metric_name(),
|
||||
Authz :: map(),
|
||||
Point :: {[Label], Metric},
|
||||
Label :: TypeLabel,
|
||||
TypeLabel :: {type, AuthZType :: binary()},
|
||||
Metric :: number().
|
||||
authz_backend_to_points(Key, Authzs) ->
|
||||
do_authz_backend_to_points(Key, Authzs, []).
|
||||
|
||||
do_authz_backend_to_points(_K, [], AccIn) ->
|
||||
lists:reverse(AccIn);
|
||||
do_authz_backend_to_points(K, [Authz | Rest], AccIn) ->
|
||||
Type = maps:get(type, Authz),
|
||||
Point = {[{type, Type}], do_metric(K, Authz, lookup_authz_metrics_local(Type))},
|
||||
do_authz_backend_to_points(K, Rest, [Point | AccIn]).
|
||||
|
||||
lookup_authz_metrics_local(Type) ->
|
||||
case emqx_authz_api_sources:lookup_from_local_node(Type) of
|
||||
{ok, {_Node, Status, #{counters := Counters}, _ResourceMetrics}} ->
|
||||
#{
|
||||
emqx_authz_status => emqx_prometheus_cluster:status_to_number(Status),
|
||||
emqx_authz_nomatch => ?MG0(nomatch, Counters),
|
||||
emqx_authz_total => ?MG0(total, Counters),
|
||||
emqx_authz_success => ?MG0(success, Counters),
|
||||
emqx_authz_failed => ?MG0(failed, Counters)
|
||||
};
|
||||
{error, _Reason} ->
|
||||
maps:from_keys(authz_metric(names) -- [emqx_authz_enable], 0)
|
||||
end.
|
||||
|
||||
%%====================
|
||||
%% Authz rules count
|
||||
|
||||
authz_rules_count_metric_meta() ->
|
||||
[
|
||||
{emqx_authz_rules_count, gauge}
|
||||
].
|
||||
|
||||
-define(ACL_TABLE, emqx_acl).
|
||||
|
||||
authz_rules_count_data() ->
|
||||
Samples = lists:foldl(
|
||||
fun
|
||||
(#{type := built_in_database} = Authz, AccIn) ->
|
||||
[auth_data_sample_point(authz, Authz, ?ACL_TABLE) | AccIn];
|
||||
(#{type := file}, AccIn) ->
|
||||
#{annotations := #{rules := Rules}} = emqx_authz:lookup(file),
|
||||
Size = erlang:length(Rules),
|
||||
[{[{type, file}], Size} | AccIn];
|
||||
(_, AccIn) ->
|
||||
AccIn
|
||||
end,
|
||||
[],
|
||||
emqx_config:get([authorization, sources])
|
||||
),
|
||||
#{emqx_authz_rules_count => Samples}.
|
||||
|
||||
%%========================================
|
||||
%% Banned
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% Banned count
|
||||
|
||||
banned_count_metric_meta() ->
|
||||
[
|
||||
{emqx_banned_count, gauge}
|
||||
].
|
||||
-define(BANNED_TABLE,
|
||||
emqx_banned
|
||||
).
|
||||
banned_count_data() ->
|
||||
mnesia_size(?BANNED_TABLE).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collect functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% merge / zip formatting funcs for type `application/json`
|
||||
|
||||
collect_json_data(Data) ->
|
||||
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_auth_metrics/3).
|
||||
|
||||
collect_banned_data() ->
|
||||
#{emqx_banned_count => banned_count_data()}.
|
||||
|
||||
%% for initialized empty AccIn
|
||||
%% The following fields will be put into Result
|
||||
%% For Authn:
|
||||
%% `id`, `emqx_authn_users_count`
|
||||
%% For Authz:
|
||||
%% `type`, `emqx_authz_rules_count`n
|
||||
zip_json_auth_metrics(Key, Points, [] = _AccIn) ->
|
||||
lists:foldl(
|
||||
fun({Lables, Metric}, AccIn2) ->
|
||||
LablesKVMap = maps:from_list(Lables),
|
||||
Point = (maps:merge(LablesKVMap, users_or_rule_count(LablesKVMap)))#{Key => Metric},
|
||||
[Point | AccIn2]
|
||||
end,
|
||||
[],
|
||||
Points
|
||||
);
|
||||
zip_json_auth_metrics(Key, Points, AllResultedAcc) ->
|
||||
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
|
||||
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
|
||||
|
||||
users_or_rule_count(#{id := Id}) ->
|
||||
#{emqx_authn_users_count := Points} = authn_users_count_data(),
|
||||
case lists:keyfind([{id, Id}], 1, Points) of
|
||||
{_, Metric} ->
|
||||
#{emqx_authn_users_count => Metric};
|
||||
false ->
|
||||
#{}
|
||||
end;
|
||||
users_or_rule_count(#{type := Type}) ->
|
||||
#{emqx_authz_rules_count := Points} = authz_rules_count_data(),
|
||||
case lists:keyfind([{type, Type}], 1, Points) of
|
||||
{_, Metric} ->
|
||||
#{emqx_authz_rules_count => Metric};
|
||||
false ->
|
||||
#{}
|
||||
end;
|
||||
users_or_rule_count(_) ->
|
||||
#{}.
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% Helper funcs
|
||||
|
||||
authenticator_id(Authn) ->
|
||||
emqx_authn_chains:authenticator_id(Authn).
|
||||
|
||||
auth_data_sample_point(authn, Authn, Tab) ->
|
||||
Size = mnesia_size(Tab),
|
||||
Id = authenticator_id(Authn),
|
||||
{[{id, Id}], Size};
|
||||
auth_data_sample_point(authz, #{type := Type} = _Authz, Tab) ->
|
||||
Size = mnesia_size(Tab),
|
||||
{[{type, Type}], Size}.
|
||||
|
||||
mnesia_size(Tab) ->
|
||||
mnesia:table_info(Tab, size).
|
||||
|
||||
do_metric(emqx_authn_enable, #{enable := B}, _) ->
|
||||
emqx_prometheus_cluster:boolean_to_number(B);
|
||||
do_metric(K, _, Metrics) ->
|
||||
?MG0(K, Metrics).
|
|
@ -0,0 +1,202 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_prometheus_cluster).
|
||||
|
||||
-include("emqx_prometheus.hrl").
|
||||
|
||||
-export([
|
||||
raw_data/2,
|
||||
|
||||
collect_json_data/2,
|
||||
|
||||
aggre_cluster/3,
|
||||
with_node_name_label/2,
|
||||
|
||||
point_to_map_fun/1,
|
||||
|
||||
boolean_to_number/1,
|
||||
status_to_number/1,
|
||||
metric_names/1
|
||||
]).
|
||||
|
||||
-callback fetch_cluster_consistented_data() -> map().
|
||||
|
||||
-callback fetch_data_from_local_node() -> {node(), map()}.
|
||||
|
||||
-callback aggre_or_zip_init_acc() -> map().
|
||||
|
||||
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||
-define(PG0(K, PROPLISTS), proplists:get_value(K, PROPLISTS, 0)).
|
||||
|
||||
raw_data(Module, undefined) ->
|
||||
%% TODO: for push gateway, the format mode should be configurable
|
||||
raw_data(Module, ?PROM_DATA_MODE__NODE);
|
||||
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_AGGREGATED) ->
|
||||
AllNodesMetrics = aggre_cluster(Module),
|
||||
Cluster = Module:fetch_cluster_consistented_data(),
|
||||
maps:merge(AllNodesMetrics, Cluster);
|
||||
raw_data(Module, ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED) ->
|
||||
AllNodesMetrics = with_node_name_label(Module),
|
||||
Cluster = Module:fetch_cluster_consistented_data(),
|
||||
maps:merge(AllNodesMetrics, Cluster);
|
||||
raw_data(Module, ?PROM_DATA_MODE__NODE) ->
|
||||
{_Node, LocalNodeMetrics} = Module:fetch_data_from_local_node(),
|
||||
Cluster = Module:fetch_cluster_consistented_data(),
|
||||
maps:merge(LocalNodeMetrics, Cluster).
|
||||
|
||||
metrics_data_from_all_nodes(Module) ->
|
||||
Nodes = mria:running_nodes(),
|
||||
_ResL = emqx_prometheus_proto_v2:raw_prom_data(
|
||||
Nodes, Module, fetch_data_from_local_node, []
|
||||
).
|
||||
|
||||
collect_json_data(Data, Func) when is_function(Func, 3) ->
|
||||
maps:fold(
|
||||
fun(K, V, Acc) ->
|
||||
Func(K, V, Acc)
|
||||
end,
|
||||
[],
|
||||
Data
|
||||
);
|
||||
collect_json_data(_, _) ->
|
||||
error(badarg).
|
||||
|
||||
aggre_cluster(Module) ->
|
||||
do_aggre_cluster(
|
||||
Module:logic_sum_metrics(),
|
||||
metrics_data_from_all_nodes(Module),
|
||||
Module:aggre_or_zip_init_acc()
|
||||
).
|
||||
|
||||
with_node_name_label(Module) ->
|
||||
ResL = metrics_data_from_all_nodes(Module),
|
||||
do_with_node_name_label(ResL, Module:aggre_or_zip_init_acc()).
|
||||
|
||||
aggre_cluster(LogicSumKs, ResL, Init) ->
|
||||
do_aggre_cluster(LogicSumKs, ResL, Init).
|
||||
|
||||
do_aggre_cluster(_LogicSumKs, [], AccIn) ->
|
||||
AccIn;
|
||||
do_aggre_cluster(LogicSumKs, [{ok, {_NodeName, NodeMetric}} | Rest], AccIn) ->
|
||||
do_aggre_cluster(
|
||||
LogicSumKs,
|
||||
Rest,
|
||||
maps:fold(
|
||||
fun(K, V, AccIn0) ->
|
||||
AccIn0#{K => aggre_metric(LogicSumKs, V, ?MG(K, AccIn0))}
|
||||
end,
|
||||
AccIn,
|
||||
NodeMetric
|
||||
)
|
||||
);
|
||||
do_aggre_cluster(LogicSumKs, [{_, _} | Rest], AccIn) ->
|
||||
do_aggre_cluster(LogicSumKs, Rest, AccIn).
|
||||
|
||||
aggre_metric(LogicSumKs, NodeMetrics, AccIn0) ->
|
||||
lists:foldl(
|
||||
fun(K, AccIn) ->
|
||||
NAccL = do_aggre_metric(
|
||||
K, LogicSumKs, ?MG(K, NodeMetrics), ?MG(K, AccIn)
|
||||
),
|
||||
AccIn#{K => NAccL}
|
||||
end,
|
||||
AccIn0,
|
||||
maps:keys(NodeMetrics)
|
||||
).
|
||||
|
||||
do_aggre_metric(K, LogicSumKs, NodeMetrics, AccL) ->
|
||||
lists:foldl(
|
||||
fun({Labels, Metric}, AccIn) ->
|
||||
NMetric =
|
||||
case lists:member(K, LogicSumKs) of
|
||||
true ->
|
||||
logic_sum(Metric, ?PG0(Labels, AccIn));
|
||||
false ->
|
||||
Metric + ?PG0(Labels, AccIn)
|
||||
end,
|
||||
[{Labels, NMetric} | AccIn]
|
||||
end,
|
||||
AccL,
|
||||
NodeMetrics
|
||||
).
|
||||
|
||||
with_node_name_label(ResL, Init) ->
|
||||
do_with_node_name_label(ResL, Init).
|
||||
|
||||
do_with_node_name_label([], AccIn) ->
|
||||
AccIn;
|
||||
do_with_node_name_label([{ok, {NodeName, NodeMetric}} | Rest], AccIn) ->
|
||||
do_with_node_name_label(
|
||||
Rest,
|
||||
maps:fold(
|
||||
fun(K, V, AccIn0) ->
|
||||
AccIn0#{
|
||||
K => zip_with_node_name(NodeName, V, ?MG(K, AccIn0))
|
||||
}
|
||||
end,
|
||||
AccIn,
|
||||
NodeMetric
|
||||
)
|
||||
);
|
||||
do_with_node_name_label([{_, _} | Rest], AccIn) ->
|
||||
do_with_node_name_label(Rest, AccIn).
|
||||
|
||||
zip_with_node_name(NodeName, NodeMetrics, AccIn0) ->
|
||||
lists:foldl(
|
||||
fun(K, AccIn) ->
|
||||
NAccL = do_zip_with_node_name(NodeName, ?MG(K, NodeMetrics), ?MG(K, AccIn)),
|
||||
AccIn#{K => NAccL}
|
||||
end,
|
||||
AccIn0,
|
||||
maps:keys(NodeMetrics)
|
||||
).
|
||||
|
||||
do_zip_with_node_name(NodeName, NodeMetrics, AccL) ->
|
||||
lists:foldl(
|
||||
fun({Labels, Metric}, AccIn) ->
|
||||
NLabels = [{node, NodeName} | Labels],
|
||||
[{NLabels, Metric} | AccIn]
|
||||
end,
|
||||
AccL,
|
||||
NodeMetrics
|
||||
).
|
||||
|
||||
point_to_map_fun(Key) ->
|
||||
fun({Lables, Metric}, AccIn2) ->
|
||||
LablesKVMap = maps:from_list(Lables),
|
||||
[maps:merge(LablesKVMap, #{Key => Metric}) | AccIn2]
|
||||
end.
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
logic_sum(N1, N2) when
|
||||
(N1 > 0 andalso N2 > 0)
|
||||
->
|
||||
1;
|
||||
logic_sum(_, _) ->
|
||||
0.
|
||||
|
||||
boolean_to_number(true) -> 1;
|
||||
boolean_to_number(false) -> 0.
|
||||
|
||||
status_to_number(connected) -> 1;
|
||||
%% for auth
|
||||
status_to_number(stopped) -> 0;
|
||||
%% for data_integration
|
||||
status_to_number(disconnected) -> 0.
|
||||
|
||||
metric_names(MetricWithType) when is_list(MetricWithType) ->
|
||||
[Name || {Name, _Type} <- MetricWithType].
|
|
@ -25,6 +25,10 @@
|
|||
-export([conf/0, is_push_gateway_server_enabled/1]).
|
||||
-export([to_recommend_type/1]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([all_collectors/0]).
|
||||
-endif.
|
||||
|
||||
update(Config) ->
|
||||
case
|
||||
emqx_conf:update(
|
||||
|
@ -101,7 +105,7 @@ post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
|
|||
ok.
|
||||
|
||||
update_prometheus(AppEnvs) ->
|
||||
PrevCollectors = prometheus_registry:collectors(default),
|
||||
PrevCollectors = all_collectors(),
|
||||
CurCollectors = proplists:get_value(collectors, proplists:get_value(prometheus, AppEnvs)),
|
||||
lists:foreach(
|
||||
fun prometheus_registry:deregister_collector/1,
|
||||
|
@ -113,6 +117,15 @@ update_prometheus(AppEnvs) ->
|
|||
),
|
||||
application:set_env(AppEnvs).
|
||||
|
||||
all_collectors() ->
|
||||
lists:foldl(
|
||||
fun(Registry, AccIn) ->
|
||||
prometheus_registry:collectors(Registry) ++ AccIn
|
||||
end,
|
||||
_InitAcc = [],
|
||||
?PROMETHEUS_ALL_REGISTRYS
|
||||
).
|
||||
|
||||
update_push_gateway(Prometheus) ->
|
||||
case is_push_gateway_server_enabled(Prometheus) of
|
||||
true ->
|
||||
|
|
|
@ -0,0 +1,534 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_prometheus_data_integration).
|
||||
|
||||
-export([
|
||||
deregister_cleanup/1,
|
||||
collect_mf/2,
|
||||
collect_metrics/2
|
||||
]).
|
||||
|
||||
-export([collect/1]).
|
||||
|
||||
-export([
|
||||
zip_json_data_integration_metrics/3
|
||||
]).
|
||||
|
||||
%% for bpapi
|
||||
-behaviour(emqx_prometheus_cluster).
|
||||
-export([
|
||||
fetch_data_from_local_node/0,
|
||||
fetch_cluster_consistented_data/0,
|
||||
aggre_or_zip_init_acc/0,
|
||||
logic_sum_metrics/0
|
||||
]).
|
||||
|
||||
-export([add_collect_family/4]).
|
||||
|
||||
-include("emqx_prometheus.hrl").
|
||||
-include_lib("prometheus/include/prometheus.hrl").
|
||||
|
||||
-import(
|
||||
prometheus_model_helpers,
|
||||
[
|
||||
create_mf/5,
|
||||
gauge_metric/1,
|
||||
gauge_metrics/1,
|
||||
counter_metrics/1
|
||||
]
|
||||
).
|
||||
|
||||
%% Please don't remove this attribute, prometheus uses it to
|
||||
%% automatically register collectors.
|
||||
-behaviour(prometheus_collector).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Macros
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(METRIC_NAME_PREFIX, "emqx_data_integration_").
|
||||
|
||||
-define(MG(K, MAP), maps:get(K, MAP)).
|
||||
-define(MG0(K, MAP), maps:get(K, MAP, 0)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Callback for emqx_prometheus_cluster
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
fetch_data_from_local_node() ->
|
||||
Rules = emqx_rule_engine:get_rules(),
|
||||
Bridges = emqx_bridge:list(),
|
||||
{node(self()), #{
|
||||
rule_metric_data => rule_metric_data(Rules),
|
||||
action_metric_data => action_metric_data(Bridges),
|
||||
connector_metric_data => connector_metric_data(Bridges)
|
||||
}}.
|
||||
|
||||
fetch_cluster_consistented_data() ->
|
||||
Rules = emqx_rule_engine:get_rules(),
|
||||
Bridges = emqx_bridge:list(),
|
||||
(maybe_collect_schema_registry())#{
|
||||
rules_ov_data => rules_ov_data(Rules),
|
||||
connectors_ov_data => connectors_ov_data(Bridges)
|
||||
}.
|
||||
|
||||
aggre_or_zip_init_acc() ->
|
||||
#{
|
||||
rule_metric_data => maps:from_keys(rule_metric(names), []),
|
||||
action_metric_data => maps:from_keys(action_metric(names), []),
|
||||
connector_metric_data => maps:from_keys(connectr_metric(names), [])
|
||||
}.
|
||||
|
||||
logic_sum_metrics() ->
|
||||
[
|
||||
emqx_rule_enable,
|
||||
emqx_connector_enable,
|
||||
emqx_connector_status
|
||||
].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collector API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @private
|
||||
deregister_cleanup(_) -> ok.
|
||||
|
||||
%% @private
|
||||
-spec collect_mf(_Registry, Callback) -> ok when
|
||||
_Registry :: prometheus_registry:registry(),
|
||||
Callback :: prometheus_collector:collect_mf_callback().
|
||||
collect_mf(?PROMETHEUS_DATA_INTEGRATION_REGISTRY, Callback) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
|
||||
%% Data Integration Overview
|
||||
ok = add_collect_family(Callback, rules_ov_metric_meta(), ?MG(rules_ov_data, RawData)),
|
||||
ok = add_collect_family(
|
||||
Callback, connectors_ov_metric_meta(), ?MG(connectors_ov_data, RawData)
|
||||
),
|
||||
ok = maybe_collect_family_schema_registry(Callback),
|
||||
|
||||
%% Rule Metric
|
||||
RuleMetricDs = ?MG(rule_metric_data, RawData),
|
||||
ok = add_collect_family(Callback, rule_metric_meta(), RuleMetricDs),
|
||||
|
||||
%% Action Metric
|
||||
ActionMetricDs = ?MG(action_metric_data, RawData),
|
||||
ok = add_collect_family(Callback, action_metric_meta(), ActionMetricDs),
|
||||
|
||||
%% Connector Metric
|
||||
ConnectorMetricDs = ?MG(connector_metric_data, RawData),
|
||||
ok = add_collect_family(Callback, connector_metric_meta(), ConnectorMetricDs),
|
||||
|
||||
ok;
|
||||
collect_mf(_, _) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
collect(<<"json">>) ->
|
||||
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
|
||||
Rules = emqx_rule_engine:get_rules(),
|
||||
Bridges = emqx_bridge:list(),
|
||||
#{
|
||||
data_integration_overview => collect_data_integration_overview(Rules, Bridges),
|
||||
rules => collect_json_data(?MG(rule_metric_data, RawData)),
|
||||
actions => collect_json_data(?MG(action_metric_data, RawData)),
|
||||
connectors => collect_json_data(?MG(connector_metric_data, RawData))
|
||||
};
|
||||
collect(<<"prometheus">>) ->
|
||||
prometheus_text_format:format(?PROMETHEUS_DATA_INTEGRATION_REGISTRY).
|
||||
|
||||
%%====================
|
||||
%% API Helpers
|
||||
|
||||
add_collect_family(Callback, MetricWithType, Data) ->
|
||||
_ = [add_collect_family(Name, Data, Callback, Type) || {Name, Type} <- MetricWithType],
|
||||
ok.
|
||||
|
||||
add_collect_family(Name, Data, Callback, Type) ->
|
||||
%% TODO: help document from Name
|
||||
Callback(create_mf(Name, _Help = <<"">>, Type, ?MODULE, Data)).
|
||||
|
||||
collect_metrics(Name, Metrics) ->
|
||||
collect_di(Name, Metrics).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collector
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% Data Integration Overview
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% All Rules
|
||||
%% Rules
|
||||
collect_di(K = emqx_rules_count, Data) -> gauge_metric(?MG(K, Data));
|
||||
%%====================
|
||||
%% Schema Registry
|
||||
collect_di(K = emqx_schema_registrys_count, Data) -> gauge_metric(?MG(K, Data));
|
||||
%%====================
|
||||
%% Connectors
|
||||
collect_di(K = emqx_connectors_count, Data) -> gauge_metric(?MG(K, Data));
|
||||
%%========================================
|
||||
%% Data Integration Metric for: Rule && Action && Connector
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% Rule Metric
|
||||
collect_di(K = emqx_rule_enable, Data) -> gauge_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_matched, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_passed, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_failed_exception, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_failed_no_result, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_actions_total, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_actions_success, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_actions_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data));
|
||||
%%====================
|
||||
%% Action Metric
|
||||
collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||
%% inflight type: gauge
|
||||
collect_di(K = emqx_action_inflight, Data) -> gauge_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_received, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_late_reply, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_retried, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_retried_success, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_retried_failed, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_dropped_resource_stopped, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_dropped_resource_not_found, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_dropped_queue_full, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_dropped_other, Data) -> counter_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_action_dropped_expired, Data) -> counter_metrics(?MG(K, Data));
|
||||
%% queuing type: gauge
|
||||
collect_di(K = emqx_action_queuing, Data) -> gauge_metrics(?MG(K, Data));
|
||||
%%====================
|
||||
%% Connector Metric
|
||||
collect_di(K = emqx_connector_enable, Data) -> gauge_metrics(?MG(K, Data));
|
||||
collect_di(K = emqx_connector_status, Data) -> gauge_metrics(?MG(K, Data)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%========================================
|
||||
%% Data Integration Overview
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% All Rules
|
||||
|
||||
rules_ov_metric_meta() ->
|
||||
[
|
||||
{emqx_rules_count, gauge}
|
||||
].
|
||||
|
||||
rules_ov_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(rules_ov_metric_meta()).
|
||||
|
||||
-define(RULE_TAB, emqx_rule_engine).
|
||||
rules_ov_data(_Rules) ->
|
||||
#{
|
||||
emqx_rules_count => ets:info(?RULE_TAB, size)
|
||||
}.
|
||||
|
||||
%%====================
|
||||
%% Schema Registry
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
|
||||
maybe_collect_family_schema_registry(Callback) ->
|
||||
ok = add_collect_family(Callback, schema_registry_metric_meta(), schema_registry_data()),
|
||||
ok.
|
||||
|
||||
schema_registry_metric_meta() ->
|
||||
[
|
||||
{emqx_schema_registrys_count, gauge}
|
||||
].
|
||||
|
||||
schema_registry_data() ->
|
||||
#{
|
||||
emqx_schema_registrys_count => erlang:map_size(emqx_schema_registry:list_schemas())
|
||||
}.
|
||||
|
||||
maybe_collect_schema_registry() ->
|
||||
schema_registry_data().
|
||||
|
||||
-else.
|
||||
|
||||
maybe_collect_family_schema_registry(_) ->
|
||||
ok.
|
||||
|
||||
maybe_collect_schema_registry() ->
|
||||
#{}.
|
||||
|
||||
-endif.
|
||||
|
||||
%%====================
|
||||
%% Connectors
|
||||
|
||||
connectors_ov_metric_meta() ->
|
||||
[
|
||||
{emqx_connectors_count, gauge}
|
||||
].
|
||||
|
||||
connectors_ov_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
|
||||
|
||||
connectors_ov_data(Brdiges) ->
|
||||
#{
|
||||
%% Both Bridge V1 and V2
|
||||
emqx_connectors_count => erlang:length(Brdiges)
|
||||
}.
|
||||
|
||||
%%========================================
|
||||
%% Data Integration Metric for: Rule && Action && Connector
|
||||
%%========================================
|
||||
|
||||
%%====================
|
||||
%% Rule Metric
|
||||
%% With rule_id as label key: `rule_id`
|
||||
|
||||
rule_metric_meta() ->
|
||||
[
|
||||
{emqx_rule_enable, gauge},
|
||||
{emqx_rule_matched, counter},
|
||||
{emqx_rule_failed, counter},
|
||||
{emqx_rule_passed, counter},
|
||||
{emqx_rule_failed_exception, counter},
|
||||
{emqx_rule_failed_no_result, counter},
|
||||
{emqx_rule_actions_total, counter},
|
||||
{emqx_rule_actions_success, counter},
|
||||
{emqx_rule_actions_failed, counter},
|
||||
{emqx_rule_actions_failed_out_of_service, counter},
|
||||
{emqx_rule_actions_failed_unknown, counter}
|
||||
].
|
||||
|
||||
rule_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(rule_metric_meta()).
|
||||
|
||||
rule_metric_data(Rules) ->
|
||||
lists:foldl(
|
||||
fun(#{id := Id} = Rule, AccIn) ->
|
||||
merge_acc_with_rules(Id, get_metric(Rule), AccIn)
|
||||
end,
|
||||
maps:from_keys(rule_metric(names), []),
|
||||
Rules
|
||||
).
|
||||
|
||||
merge_acc_with_rules(Id, RuleMetrics, PointsAcc) ->
|
||||
maps:fold(
|
||||
fun(K, V, AccIn) ->
|
||||
AccIn#{K => [rule_point(Id, V) | ?MG(K, AccIn)]}
|
||||
end,
|
||||
PointsAcc,
|
||||
RuleMetrics
|
||||
).
|
||||
|
||||
rule_point(Id, V) ->
|
||||
{[{id, Id}], V}.
|
||||
|
||||
get_metric(#{id := Id, enable := Bool} = _Rule) ->
|
||||
case emqx_metrics_worker:get_metrics(rule_metrics, Id) of
|
||||
#{counters := Counters} ->
|
||||
#{
|
||||
emqx_rule_enable => emqx_prometheus_cluster:boolean_to_number(Bool),
|
||||
emqx_rule_matched => ?MG(matched, Counters),
|
||||
emqx_rule_failed => ?MG(failed, Counters),
|
||||
emqx_rule_passed => ?MG(passed, Counters),
|
||||
emqx_rule_failed_exception => ?MG('failed.exception', Counters),
|
||||
emqx_rule_failed_no_result => ?MG('failed.no_result', Counters),
|
||||
emqx_rule_actions_total => ?MG('actions.total', Counters),
|
||||
emqx_rule_actions_success => ?MG('actions.success', Counters),
|
||||
emqx_rule_actions_failed => ?MG('actions.failed', Counters),
|
||||
emqx_rule_actions_failed_out_of_service => ?MG(
|
||||
'actions.failed.out_of_service', Counters
|
||||
),
|
||||
emqx_rule_actions_failed_unknown => ?MG('actions.failed.unknown', Counters)
|
||||
}
|
||||
end.
|
||||
|
||||
%%====================
|
||||
%% Action Metric
|
||||
%% With action_id: `{type}:{name}` as label key: `action_id`
|
||||
|
||||
action_metric_meta() ->
|
||||
[
|
||||
{emqx_action_matched, counter},
|
||||
{emqx_action_dropped, counter},
|
||||
{emqx_action_success, counter},
|
||||
{emqx_action_failed, counter},
|
||||
{emqx_action_inflight, gauge},
|
||||
{emqx_action_received, counter},
|
||||
{emqx_action_late_reply, counter},
|
||||
{emqx_action_retried, counter},
|
||||
{emqx_action_retried_success, counter},
|
||||
{emqx_action_retried_failed, counter},
|
||||
{emqx_action_dropped_resource_stopped, counter},
|
||||
{emqx_action_dropped_resource_not_found, counter},
|
||||
{emqx_action_dropped_queue_full, counter},
|
||||
{emqx_action_dropped_other, counter},
|
||||
{emqx_action_dropped_expired, counter},
|
||||
{emqx_action_queuing, gauge}
|
||||
].
|
||||
|
||||
action_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(action_metric_meta()).
|
||||
|
||||
action_metric_data(Bridges) ->
|
||||
lists:foldl(
|
||||
fun(#{type := Type, name := Name} = _Bridge, AccIn) ->
|
||||
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
||||
merge_acc_with_bridges(Id, get_bridge_metric(Type, Name), AccIn)
|
||||
end,
|
||||
maps:from_keys(action_metric(names), []),
|
||||
Bridges
|
||||
).
|
||||
|
||||
merge_acc_with_bridges(Id, BridgeMetrics, PointsAcc) ->
|
||||
maps:fold(
|
||||
fun(K, V, AccIn) ->
|
||||
AccIn#{K => [action_point(Id, V) | ?MG(K, AccIn)]}
|
||||
end,
|
||||
PointsAcc,
|
||||
BridgeMetrics
|
||||
).
|
||||
|
||||
action_point(Id, V) ->
|
||||
{[{id, Id}], V}.
|
||||
|
||||
get_bridge_metric(Type, Name) ->
|
||||
case emqx_bridge:get_metrics(Type, Name) of
|
||||
#{counters := Counters, gauges := Gauges} ->
|
||||
#{
|
||||
emqx_action_matched => ?MG0(matched, Counters),
|
||||
emqx_action_dropped => ?MG0(dropped, Counters),
|
||||
emqx_action_success => ?MG0(success, Counters),
|
||||
emqx_action_failed => ?MG0(failed, Counters),
|
||||
emqx_action_inflight => ?MG0(inflight, Gauges),
|
||||
emqx_action_received => ?MG0(received, Counters),
|
||||
emqx_action_late_reply => ?MG0(late_reply, Counters),
|
||||
emqx_action_retried => ?MG0(retried, Counters),
|
||||
emqx_action_retried_success => ?MG0('retried.success', Counters),
|
||||
emqx_action_retried_failed => ?MG0('retried.failed', Counters),
|
||||
emqx_action_dropped_resource_stopped => ?MG0('dropped.resource_stopped', Counters),
|
||||
emqx_action_dropped_resource_not_found => ?MG0(
|
||||
'dropped.resource_not_found', Counters
|
||||
),
|
||||
emqx_action_dropped_queue_full => ?MG0('dropped.queue_full', Counters),
|
||||
emqx_action_dropped_other => ?MG0('dropped.other', Counters),
|
||||
emqx_action_dropped_expired => ?MG0('dropped.expired', Counters),
|
||||
emqx_action_queuing => ?MG0(queuing, Gauges)
|
||||
}
|
||||
end.
|
||||
|
||||
%%====================
|
||||
%% Connector Metric
|
||||
%% With connector_id: `{type}:{name}` as label key: `connector_id`
|
||||
|
||||
connector_metric_meta() ->
|
||||
[
|
||||
{emqx_connector_enable, gauge},
|
||||
{emqx_connector_status, gauge}
|
||||
].
|
||||
|
||||
connectr_metric(names) ->
|
||||
emqx_prometheus_cluster:metric_names(connector_metric_meta()).
|
||||
|
||||
connector_metric_data(Bridges) ->
|
||||
lists:foldl(
|
||||
fun(#{type := Type, name := Name} = Bridge, AccIn) ->
|
||||
Id = emqx_bridge_resource:bridge_id(Type, Name),
|
||||
merge_acc_with_connectors(Id, get_connector_status(Bridge), AccIn)
|
||||
end,
|
||||
maps:from_keys(connectr_metric(names), []),
|
||||
Bridges
|
||||
).
|
||||
|
||||
merge_acc_with_connectors(Id, ConnectorMetrics, PointsAcc) ->
|
||||
maps:fold(
|
||||
fun(K, V, AccIn) ->
|
||||
AccIn#{K => [connector_point(Id, V) | ?MG(K, AccIn)]}
|
||||
end,
|
||||
PointsAcc,
|
||||
ConnectorMetrics
|
||||
).
|
||||
|
||||
connector_point(Id, V) ->
|
||||
{[{id, Id}], V}.
|
||||
|
||||
get_connector_status(#{resource_data := ResourceData} = _Bridge) ->
|
||||
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
|
||||
Status = ?MG(status, ResourceData),
|
||||
#{
|
||||
emqx_connector_enable => emqx_prometheus_cluster:boolean_to_number(Enabled),
|
||||
emqx_connector_status => emqx_prometheus_cluster:status_to_number(Status)
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Collect functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% merge / zip formatting funcs for type `application/json`
|
||||
collect_data_integration_overview(Rules, Bridges) ->
|
||||
RulesD = rules_ov_data(Rules),
|
||||
ConnectorsD = connectors_ov_data(Bridges),
|
||||
|
||||
M1 = lists:foldl(
|
||||
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,
|
||||
#{},
|
||||
rules_ov_metric(names)
|
||||
),
|
||||
M2 = lists:foldl(
|
||||
fun(K, AccIn) -> AccIn#{K => ?MG(K, ConnectorsD)} end,
|
||||
#{},
|
||||
connectors_ov_metric(names)
|
||||
),
|
||||
M3 = maybe_collect_schema_registry(),
|
||||
|
||||
lists:foldl(fun(M, AccIn) -> maps:merge(M, AccIn) end, #{}, [M1, M2, M3]).
|
||||
|
||||
collect_json_data(Data) ->
|
||||
emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_data_integration_metrics/3).
|
||||
|
||||
%% for initialized empty AccIn
|
||||
%% The following fields will be put into Result
|
||||
%% For Rules:
|
||||
%% `id` => [RULE_ID]
|
||||
%% For Actions
|
||||
%% `id` => [ACTION_ID]
|
||||
%% FOR Connectors
|
||||
%% `id` => [CONNECTOR_ID] %% CONNECTOR_ID = BRIDGE_ID
|
||||
%% formatted with {type}:{name}
|
||||
zip_json_data_integration_metrics(Key, Points, [] = _AccIn) ->
|
||||
lists:foldl(
|
||||
fun({Lables, Metric}, AccIn2) ->
|
||||
LablesKVMap = maps:from_list(Lables),
|
||||
Point = LablesKVMap#{Key => Metric},
|
||||
[Point | AccIn2]
|
||||
end,
|
||||
[],
|
||||
Points
|
||||
);
|
||||
zip_json_data_integration_metrics(Key, Points, AllResultedAcc) ->
|
||||
ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
|
||||
lists:zipwith(fun maps:merge/2, AllResultedAcc, ThisKeyResult).
|
|
@ -67,6 +67,9 @@ init([]) ->
|
|||
Children =
|
||||
case emqx_prometheus_config:is_push_gateway_server_enabled(Conf) of
|
||||
false -> [];
|
||||
%% TODO: add push gateway for endpoints
|
||||
%% `/prometheus/auth`
|
||||
%% `/prometheus/data_integration`
|
||||
true -> [?CHILD(emqx_prometheus, Conf)]
|
||||
end,
|
||||
{ok, {{one_for_one, 10, 3600}, Children}}.
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_prometheus_proto_v2).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
start/1,
|
||||
stop/1,
|
||||
|
||||
raw_prom_data/4
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.5.0".
|
||||
|
||||
-spec start([node()]) -> emqx_rpc:multicall_result().
|
||||
start(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_prometheus, do_start, [], 5000).
|
||||
|
||||
-spec stop([node()]) -> emqx_rpc:multicall_result().
|
||||
stop(Nodes) ->
|
||||
rpc:multicall(Nodes, emqx_prometheus, do_stop, [], 5000).
|
||||
|
||||
-type key() :: atom().
|
||||
-type arg() :: list(term()).
|
||||
|
||||
-spec raw_prom_data([node()], key(), key(), arg()) -> emqx_rpc:erpc_multicall(term()).
|
||||
raw_prom_data(Nodes, M, F, A) ->
|
||||
erpc:multicall(
|
||||
Nodes,
|
||||
emqx_prometheus_api,
|
||||
lookup_from_local_nodes,
|
||||
[M, F, A],
|
||||
5000
|
||||
).
|
|
@ -103,13 +103,16 @@ init_group() ->
|
|||
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
||||
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
|
||||
meck:expect(emqx_alarm, activate, 3, ok),
|
||||
meck:expect(emqx_alarm, deactivate, 3, ok).
|
||||
meck:expect(emqx_alarm, deactivate, 3, ok),
|
||||
meck:new(emqx_license_checker, [non_strict, passthrough, no_link]),
|
||||
meck:expect(emqx_license_checker, expiry_epoch, fun() -> 1859673600 end).
|
||||
|
||||
end_group() ->
|
||||
ekka:stop(),
|
||||
mria:stop(),
|
||||
mria_mnesia:delete_schema(),
|
||||
meck:unload(emqx_alarm),
|
||||
meck:unload(emqx_license_checker),
|
||||
emqx_common_test_helpers:stop_apps([emqx_prometheus]).
|
||||
|
||||
end_per_group(_Group, Config) ->
|
||||
|
|
|
@ -128,8 +128,8 @@ t_legacy_prometheus_api(_) ->
|
|||
Conf2 = emqx_utils_json:decode(Response2, [return_maps]),
|
||||
?assertEqual(NewConf, Conf2),
|
||||
|
||||
EnvCollectors = application:get_env(prometheus, collectors, []),
|
||||
PromCollectors = prometheus_registry:collectors(default),
|
||||
EnvCollectors = env_collectors(),
|
||||
PromCollectors = all_collectors(),
|
||||
?assertEqual(lists:sort(EnvCollectors), lists:sort(PromCollectors)),
|
||||
?assert(lists:member(prometheus_vm_statistics_collector, EnvCollectors), EnvCollectors),
|
||||
|
||||
|
@ -221,8 +221,8 @@ t_prometheus_api(_) ->
|
|||
Conf2 = emqx_utils_json:decode(Response2, [return_maps]),
|
||||
?assertMatch(NewConf, Conf2),
|
||||
|
||||
EnvCollectors = application:get_env(prometheus, collectors, []),
|
||||
PromCollectors = prometheus_registry:collectors(default),
|
||||
EnvCollectors = env_collectors(),
|
||||
PromCollectors = all_collectors(),
|
||||
?assertEqual(lists:sort(EnvCollectors), lists:sort(PromCollectors)),
|
||||
?assert(lists:member(prometheus_vm_statistics_collector, EnvCollectors), EnvCollectors),
|
||||
|
||||
|
@ -308,3 +308,16 @@ request_stats(JsonAuth, Auth) ->
|
|||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%%% Internal Functions
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
env_collectors() ->
|
||||
do_env_collectors(application:get_env(prometheus, collectors, []), []).
|
||||
|
||||
do_env_collectors([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
do_env_collectors([{_Registry, Collector} | Rest], Acc) when is_atom(Collector) ->
|
||||
do_env_collectors(Rest, [Collector | Acc]);
|
||||
do_env_collectors([Collector | Rest], Acc) when is_atom(Collector) ->
|
||||
do_env_collectors(Rest, [Collector | Acc]).
|
||||
|
||||
all_collectors() ->
|
||||
emqx_prometheus_config:all_collectors().
|
||||
|
|
|
@ -44,7 +44,11 @@ api_spec() ->
|
|||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||
|
||||
paths() ->
|
||||
[?PREFIX, ?PREFIX ++ "/messages", ?PREFIX ++ "/message/:topic"].
|
||||
[
|
||||
?PREFIX,
|
||||
?PREFIX ++ "/messages",
|
||||
?PREFIX ++ "/message/:topic"
|
||||
].
|
||||
|
||||
schema(?PREFIX) ->
|
||||
#{
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
Expose more metrics to improve observability:
|
||||
|
||||
Montior API:
|
||||
- Add `retained_msg_count` field to `/api/v5/monitor_current`.
|
||||
- Add `retained_msg_count` and `node_uptime` fields to `/api/v5/monitor_current/nodes/{node}`.
|
||||
|
||||
Prometheus API:
|
||||
- Add `emqx_cert_expiry_at` to `/api/v5/prometheus/stats` to display TLS listener certificate expiration time.
|
||||
- Add `/api/v5/prometheus/auth` endpoint to provide metrics such as execution count and running status for all authenticatiors and authorizators.
|
||||
- Add `/api/v5/prometheus/data_integration` endpoint to provide metrics such as execution count and status for all rules, actions, and connectors.
|
||||
|
||||
Limitations:
|
||||
Prometheus push gateway only supports content in `/api/v5/prometheus/stats?mode=node` for now.
|
||||
|
||||
For more API details and metric type information. Please see also in swagger api docs.
|
|
@ -0,0 +1,17 @@
|
|||
# Expose more metrics to improve observability:
|
||||
|
||||
Montior API:
|
||||
- Add `retained_msg_count` field to `/api/v5/monitor_current`.
|
||||
- Add `license_quota` field to `/api/v5/monitor_current`
|
||||
- Add `retained_msg_count` and `node_uptime` fields to `/api/v5/monitor_current/nodes/{node}`.
|
||||
- Add `retained_msg_count`, `license_quota` and `node_uptime` fields to `/api/v5/monitor_current/nodes/{node}`.
|
||||
|
||||
Prometheus API:
|
||||
- Add `emqx_cert_expiry_at` and `emqx_license_expiry_at` to `/api/v5/prometheus/stats` to display TLS listener certificate expiration time and license expiration time.
|
||||
- Add `/api/v5/prometheus/auth` endpoint to provide metrics such as execution count and running status for all authenticatiors and authorizators.
|
||||
- Add `/api/v5/prometheus/data_integration` endpoint to provide metrics such as execution count and status for all rules, actions, and connectors.
|
||||
|
||||
Limitations:
|
||||
Prometheus push gateway only supports the content in `/api/v5/prometheus/stats?mode=node`
|
||||
|
||||
For more API details and metric type information. Please see also in swagger api docs.
|
|
@ -15,4 +15,14 @@ get_prom_data.desc:
|
|||
get_prom_data.label:
|
||||
"""Prometheus Metrics"""
|
||||
|
||||
get_prom_auth_data.desc:
|
||||
"""Get Prometheus Metrics for AuthN, AuthZ and Banned"""
|
||||
get_prom_auth_data.label:
|
||||
"""Prometheus Metrics for Auth"""
|
||||
|
||||
get_prom_data_integration_data.desc:
|
||||
"""Get Prometheus Metrics for Data Integration"""
|
||||
get_prom_data_integration_data.label:
|
||||
"""Prometheus Metrics for Data Integration"""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue