emqx/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

263 lines
9.2 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_dashboard_monitor_api).
-include("emqx_dashboard.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hocon_types.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-behaviour(minirest_api).
-export([api_spec/0]).
-export([
paths/0,
schema/1,
fields/1,
namespace/0
]).
-export([
monitor/2,
monitor_current/2
]).
namespace() -> undefined.
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
paths() ->
[
"/monitor",
"/monitor/nodes/:node",
"/monitor_current",
"/monitor_current/nodes/:node"
].
schema("/monitor") ->
#{
'operationId' => monitor,
get => #{
tags => [<<"Metrics">>],
description => ?DESC(list_monitor),
parameters => [parameter_latest()],
responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
400 => emqx_dashboard_swagger:error_codes(['BAD_RPC'], <<"Bad RPC">>)
}
}
};
schema("/monitor/nodes/:node") ->
#{
'operationId' => monitor,
get => #{
tags => [<<"Metrics">>],
description => ?DESC(list_monitor_node),
parameters => [parameter_node(), parameter_latest()],
responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(sampler)), #{}),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Node not found">>)
}
}
};
schema("/monitor_current") ->
#{
'operationId' => monitor_current,
get => #{
tags => [<<"Metrics">>],
description => ?DESC(current_stats),
responses => #{
200 => hoconsc:mk(hoconsc:ref(sampler_current), #{})
}
}
};
schema("/monitor_current/nodes/:node") ->
#{
'operationId' => monitor_current,
get => #{
tags => [<<"Metrics">>],
description => ?DESC(current_stats_node),
parameters => [parameter_node()],
responses => #{
200 => hoconsc:mk(hoconsc:ref(sampler_current_node), #{}),
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Node not found">>)
}
}
}.
parameter_latest() ->
Info = #{
in => query,
required => false,
example => 5 * 60,
desc => <<"The latest N seconds data. Like 300 for 5 min.">>
},
{latest, hoconsc:mk(range(1, inf), Info)}.
parameter_node() ->
Info = #{
in => path,
required => true,
example => node(),
desc => <<"EMQX node name.">>
},
{node, hoconsc:mk(binary(), Info)}.
fields(sampler) ->
Samplers =
[
{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|| SamplerName <- ?SAMPLER_LIST
],
[{time_stamp, hoconsc:mk(non_neg_integer(), #{desc => <<"Timestamp">>})} | Samplers];
fields(sampler_current_node) ->
fields_current(sample_names(sampler_current_node));
fields(sampler_current) ->
fields_current(sample_names(sampler_current)).
sample_names(sampler_current_node) ->
maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST ++ ?CURRENT_SAMPLE_NON_RATE;
sample_names(sampler_current) ->
sample_names(sampler_current_node) -- [node_uptime].
fields_current(Names) ->
[
{SamplerName, hoconsc:mk(integer(), #{desc => swagger_desc(SamplerName)})}
|| SamplerName <- Names
].
%% -------------------------------------------------------------------------------------------------
%% API
monitor(get, #{query_string := QS, bindings := Bindings}) ->
Latest = maps:get(<<"latest">>, QS, infinity),
RawNode = maps:get(node, Bindings, <<"all">>),
emqx_utils_api:with_node_or_cluster(RawNode, dashboard_samplers_fun(Latest)).
dashboard_samplers_fun(Latest) ->
fun(NodeOrCluster) ->
case emqx_dashboard_monitor:samplers(NodeOrCluster, Latest) of
{badrpc, _} = Error -> {error, Error};
Samplers -> {ok, Samplers}
end
end.
monitor_current(get, #{bindings := Bindings}) ->
RawNode = maps:get(node, Bindings, <<"all">>),
case emqx_utils_api:with_node_or_cluster(RawNode, fun current_rate/1) of
?OK(Rates) ->
?OK(maybe_reject_cluster_only_metrics(RawNode, Rates));
Error ->
Error
end.
-spec current_rate(atom()) ->
{error, term()}
| {ok, Result :: map()}.
current_rate(Node) ->
%% Node :: 'all' or `NodeName`
case emqx_dashboard_monitor:current_rate(Node) of
{badrpc, _} = BadRpc ->
{error, BadRpc};
{ok, _} = OkResult ->
OkResult
end.
%% -------------------------------------------------------------------------------------------------
%% Internal
-define(APPROXIMATE_DESC, " Can only represent an approximate state.").
swagger_desc(received) ->
swagger_desc_format("Received messages ");
swagger_desc(received_bytes) ->
swagger_desc_format("Received bytes ");
swagger_desc(sent) ->
swagger_desc_format("Sent messages ");
swagger_desc(sent_bytes) ->
swagger_desc_format("Sent bytes ");
swagger_desc(dropped) ->
swagger_desc_format("Dropped messages ");
swagger_desc(validation_succeeded) ->
swagger_desc_format("Schema validations succeeded ");
swagger_desc(validation_failed) ->
swagger_desc_format("Schema validations failed ");
swagger_desc(transformation_succeeded) ->
swagger_desc_format("Message transformations succeeded ");
swagger_desc(transformation_failed) ->
swagger_desc_format("Message transformations failed ");
swagger_desc(persisted) ->
swagger_desc_format("Messages saved to the durable storage ");
swagger_desc(disconnected_durable_sessions) ->
<<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(subscriptions_durable) ->
<<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(subscriptions) ->
<<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(topics) ->
<<"Count topics at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(connections) ->
<<"Sessions at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(live_connections) ->
<<"Connections at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(cluster_sessions) ->
<<
"Total number of sessions in the cluster at the time of sampling. "
"It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`."
?APPROXIMATE_DESC
>>;
swagger_desc(received_msg_rate) ->
swagger_desc_format("Dropped messages ", per);
%swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per);
swagger_desc(sent_msg_rate) ->
swagger_desc_format("Sent messages ", per);
%swagger_desc(sent_bytes_rate) -> swagger_desc_format("Sent bytes ", per);
swagger_desc(dropped_msg_rate) ->
swagger_desc_format("Dropped messages ", per);
swagger_desc(validation_succeeded_rate) ->
swagger_desc_format("Schema validations succeeded ", per);
swagger_desc(validation_failed_rate) ->
swagger_desc_format("Schema validations failed ", per);
swagger_desc(transformation_succeeded_rate) ->
swagger_desc_format("Message transformations succeeded ", per);
swagger_desc(transformation_failed_rate) ->
swagger_desc_format("Message transformations failed ", per);
swagger_desc(persisted_rate) ->
swagger_desc_format("Messages saved to the durable storage ", per);
swagger_desc(retained_msg_count) ->
<<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(shared_subscriptions) ->
<<"Shared subscriptions count at the time of sampling.", ?APPROXIMATE_DESC>>;
swagger_desc(node_uptime) ->
<<"Node up time in seconds. Only presented in endpoint: `/monitor_current/nodes/:node`.">>;
swagger_desc(license_quota) ->
<<"License quota. AKA: limited max_connections for cluster">>.
swagger_desc_format(Format) ->
swagger_desc_format(Format, last).
swagger_desc_format(Format, Type) ->
Interval = emqx_conf:get([dashboard, monitor, interval], ?DEFAULT_SAMPLE_INTERVAL),
list_to_binary(io_lib:format(Format ++ "~p ~p seconds", [Type, Interval])).
maybe_reject_cluster_only_metrics(<<"all">>, Rates) ->
Rates;
maybe_reject_cluster_only_metrics(_Node, Rates) ->
maps:without(?CLUSTERONLY_SAMPLER_LIST, Rates).