485 lines
14 KiB
Erlang
485 lines
14 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_topic_metrics_api).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include("emqx_modules.hrl").
|
|
|
|
-import(
|
|
hoconsc,
|
|
[
|
|
mk/2,
|
|
ref/1,
|
|
ref/2,
|
|
array/1,
|
|
map/2
|
|
]
|
|
).
|
|
|
|
-export([
|
|
topic_metrics/2,
|
|
operate_topic_metrics/2
|
|
]).
|
|
|
|
-export([
|
|
cluster_accumulation_metrics/0,
|
|
cluster_accumulation_metrics/1
|
|
]).
|
|
|
|
-export([
|
|
api_spec/0,
|
|
paths/0,
|
|
schema/1,
|
|
fields/1,
|
|
namespace/0
|
|
]).
|
|
|
|
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
|
|
-define(BAD_TOPIC, 'BAD_TOPIC').
|
|
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
|
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
|
|
|
namespace() -> undefined.
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
|
|
|
paths() ->
|
|
[
|
|
"/mqtt/topic_metrics",
|
|
"/mqtt/topic_metrics/:topic"
|
|
].
|
|
|
|
schema("/mqtt/topic_metrics") ->
|
|
#{
|
|
'operationId' => topic_metrics,
|
|
get =>
|
|
#{
|
|
description => ?DESC(get_topic_metrics_api),
|
|
tags => ?API_TAG_MQTT,
|
|
responses =>
|
|
#{
|
|
200 =>
|
|
mk(array(hoconsc:ref(topic_metrics)), #{
|
|
desc => ?DESC(get_topic_metrics_api)
|
|
})
|
|
}
|
|
},
|
|
put =>
|
|
#{
|
|
description => ?DESC(reset_topic_metrics_api),
|
|
tags => ?API_TAG_MQTT,
|
|
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
|
|
ref(reset),
|
|
reset_examples()
|
|
),
|
|
responses =>
|
|
#{
|
|
204 => ?DESC(reset_topic_metrics_api),
|
|
404 =>
|
|
emqx_dashboard_swagger:error_codes(
|
|
[?TOPIC_NOT_FOUND], ?DESC(topic_metrics_api_response404)
|
|
)
|
|
}
|
|
},
|
|
post =>
|
|
#{
|
|
description => ?DESC(post_topic_metrics_api),
|
|
tags => ?API_TAG_MQTT,
|
|
'requestBody' => [topic(body)],
|
|
responses =>
|
|
#{
|
|
204 => ?DESC(post_topic_metrics_api),
|
|
409 => emqx_dashboard_swagger:error_codes(
|
|
[?EXCEED_LIMIT],
|
|
?DESC(topic_metrics_api_response409)
|
|
),
|
|
400 => emqx_dashboard_swagger:error_codes(
|
|
[?BAD_REQUEST, ?BAD_TOPIC],
|
|
?DESC(topic_metrics_api_response400)
|
|
)
|
|
}
|
|
}
|
|
};
|
|
schema("/mqtt/topic_metrics/:topic") ->
|
|
#{
|
|
'operationId' => operate_topic_metrics,
|
|
get =>
|
|
#{
|
|
description => ?DESC(gat_topic_metrics_data_api),
|
|
tags => ?API_TAG_MQTT,
|
|
parameters => [topic(path)],
|
|
responses =>
|
|
#{
|
|
200 => mk(ref(topic_metrics), #{desc => ?DESC(topic)}),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
[?TOPIC_NOT_FOUND],
|
|
?DESC(topic_metrics_api_response404)
|
|
)
|
|
}
|
|
},
|
|
delete =>
|
|
#{
|
|
description => ?DESC(delete_topic_metrics_data_api),
|
|
tags => ?API_TAG_MQTT,
|
|
parameters => [topic(path)],
|
|
responses =>
|
|
#{
|
|
204 => ?DESC(delete_topic_metrics_data_api),
|
|
404 => emqx_dashboard_swagger:error_codes(
|
|
[?TOPIC_NOT_FOUND],
|
|
?DESC(topic_metrics_api_response404)
|
|
)
|
|
}
|
|
}
|
|
}.
|
|
|
|
fields(reset) ->
|
|
[
|
|
{topic,
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC(reset_topic_desc),
|
|
example => <<"testtopic/1">>,
|
|
required => false
|
|
}
|
|
)},
|
|
{action,
|
|
mk(
|
|
string(),
|
|
#{
|
|
desc => ?DESC(action),
|
|
enum => [reset],
|
|
required => true,
|
|
example => <<"reset">>
|
|
}
|
|
)}
|
|
];
|
|
fields(topic_metrics) ->
|
|
[
|
|
{topic,
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => ?DESC(topic),
|
|
example => <<"testtopic/1">>,
|
|
required => true
|
|
}
|
|
)},
|
|
{create_time,
|
|
mk(
|
|
emqx_utils_calendar:epoch_second(),
|
|
#{
|
|
desc => ?DESC(create_time),
|
|
required => true,
|
|
example => <<"2022-01-14T21:48:47+08:00">>
|
|
}
|
|
)},
|
|
{reset_time,
|
|
mk(
|
|
emqx_utils_calendar:epoch_second(),
|
|
#{
|
|
desc => ?DESC(reset_time),
|
|
required => false,
|
|
example => <<"2022-01-14T21:48:47+08:00">>
|
|
}
|
|
)},
|
|
{metrics,
|
|
mk(
|
|
ref(metrics),
|
|
#{
|
|
desc => ?DESC(metrics),
|
|
required => true
|
|
}
|
|
)}
|
|
];
|
|
fields(metrics) ->
|
|
Integers = [
|
|
'message.dropped.count',
|
|
'message.in.count',
|
|
'message.out.count',
|
|
'message.qos0.in.count',
|
|
'message.qos0.out.count',
|
|
'message.qos1.in.count',
|
|
'message.qos1.out.count',
|
|
'message.qos2.in.count',
|
|
'message.qos2.out.count'
|
|
],
|
|
Numbers = [
|
|
'message.dropped.rate',
|
|
'message.in.rate',
|
|
'message.out.rate',
|
|
'message.qos0.in.rate',
|
|
'message.qos0.out.rate',
|
|
'message.qos1.in.rate',
|
|
'message.qos1.out.rate',
|
|
'message.qos2.in.rate',
|
|
'message.qos2.out.rate'
|
|
],
|
|
ToDesc =
|
|
fun(Key) ->
|
|
%% message.dropped.rate -> message_dropped_rate
|
|
Str = string:replace(atom_to_binary(Key, utf8), ".", "_", all),
|
|
NKey = binary_to_atom(list_to_binary(Str), utf8),
|
|
?DESC(NKey)
|
|
end,
|
|
[{Key, mk(integer(), #{desc => ToDesc(Key), example => 0})} || Key <- Integers] ++
|
|
[{Key, mk(number(), #{desc => ToDesc(Key), example => 0})} || Key <- Numbers].
|
|
|
|
topic(In) ->
|
|
Desc =
|
|
case In of
|
|
body -> ?DESC(topic_in_body);
|
|
path -> ?DESC(topic_in_path)
|
|
end,
|
|
{topic,
|
|
mk(
|
|
binary(),
|
|
#{
|
|
desc => Desc,
|
|
required => true,
|
|
in => In,
|
|
example => <<"testtopic/1">>
|
|
}
|
|
)}.
|
|
|
|
reset_examples() ->
|
|
#{
|
|
reset_specific_one_topic_metrics =>
|
|
#{
|
|
summary => <<"reset_specific_one_topic_metrics">>,
|
|
value =>
|
|
#{
|
|
topic => "testtopic/1",
|
|
action => "reset"
|
|
}
|
|
},
|
|
reset_all_topic_metrics =>
|
|
#{
|
|
summary => <<"reset_all_topic_metrics">>,
|
|
value =>
|
|
#{action => "reset"}
|
|
}
|
|
}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% HTTP Callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
topic_metrics(get, _) ->
|
|
get_cluster_response([]);
|
|
topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
|
|
case reset(Topic) of
|
|
ok ->
|
|
get_cluster_response([Topic]);
|
|
{error, Reason} ->
|
|
reason2httpresp(Reason)
|
|
end;
|
|
topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
|
|
reset(),
|
|
get_cluster_response([]);
|
|
topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
|
|
{400, 'BAD_REQUEST', <<"Topic can not be empty">>};
|
|
topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
|
|
case lists:member(Topic, emqx_modules_conf:topic_metrics()) of
|
|
false ->
|
|
case emqx_modules_conf:add_topic_metrics(Topic) of
|
|
{ok, Topic} ->
|
|
get_cluster_response([Topic]);
|
|
{error, Reason} ->
|
|
reason2httpresp(Reason)
|
|
end;
|
|
true ->
|
|
reason2httpresp(already_existed)
|
|
end.
|
|
|
|
operate_topic_metrics(get, #{bindings := #{topic := Topic}}) ->
|
|
get_cluster_response([Topic]);
|
|
operate_topic_metrics(delete, #{bindings := #{topic := Topic}}) ->
|
|
case emqx_modules_conf:remove_topic_metrics(Topic) of
|
|
ok -> {204};
|
|
{error, Reason} -> reason2httpresp(Reason)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
cluster_accumulation_metrics() ->
|
|
Nodes = mria:running_nodes(),
|
|
case emqx_topic_metrics_proto_v1:metrics(Nodes) of
|
|
{SuccResList, []} ->
|
|
{ok, accumulate_nodes_metrics(SuccResList)};
|
|
{_, FailedNodes} ->
|
|
{error, {badrpc, FailedNodes}}
|
|
end.
|
|
|
|
cluster_accumulation_metrics(Topic) ->
|
|
Nodes = mria:running_nodes(),
|
|
case emqx_topic_metrics_proto_v1:metrics(Nodes, Topic) of
|
|
{SuccResList, []} ->
|
|
case
|
|
lists:filter(
|
|
fun
|
|
({error, _}) -> false;
|
|
(_) -> true
|
|
end,
|
|
SuccResList
|
|
)
|
|
of
|
|
[] ->
|
|
{error, topic_not_found};
|
|
TopicMetrics ->
|
|
NTopicMetrics = [[T] || T <- TopicMetrics],
|
|
[AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
|
|
{ok, AccMetrics}
|
|
end;
|
|
{_, FailedNodes} ->
|
|
{error, {badrpc, FailedNodes}}
|
|
end.
|
|
|
|
accumulate_nodes_metrics(NodesTopicMetrics) ->
|
|
AccMap = lists:foldl(
|
|
fun(TopicMetrics, ExAcc) ->
|
|
MetricsMap = lists:foldl(
|
|
fun(
|
|
#{
|
|
topic := Topic,
|
|
metrics := Metrics,
|
|
create_time := CreateTime
|
|
},
|
|
Acc
|
|
) ->
|
|
Acc#{Topic => {Metrics, CreateTime}}
|
|
end,
|
|
#{},
|
|
TopicMetrics
|
|
),
|
|
accumulate_metrics(MetricsMap, ExAcc)
|
|
end,
|
|
#{},
|
|
NodesTopicMetrics
|
|
),
|
|
maps:fold(
|
|
fun(Topic, {Metrics, CreateTime1}, Acc1) ->
|
|
[
|
|
#{
|
|
topic => Topic,
|
|
metrics => Metrics,
|
|
create_time => CreateTime1
|
|
}
|
|
| Acc1
|
|
]
|
|
end,
|
|
[],
|
|
AccMap
|
|
).
|
|
|
|
%% @doc TopicMetricsIn :: #{<<"topic">> := {Metrics, CreateTime}}
|
|
accumulate_metrics(TopicMetricsIn, TopicMetricsAcc) ->
|
|
Topics = maps:keys(TopicMetricsIn),
|
|
lists:foldl(
|
|
fun(Topic, Acc) ->
|
|
{Metrics, CreateTime} = maps:get(Topic, TopicMetricsIn),
|
|
NMetrics = do_accumulation_metrics(
|
|
Metrics,
|
|
maps:get(Topic, TopicMetricsAcc, undefined)
|
|
),
|
|
maps:put(Topic, {NMetrics, CreateTime}, Acc)
|
|
end,
|
|
#{},
|
|
Topics
|
|
).
|
|
|
|
%% @doc MetricsIn :: #{'messages.dropped.rate' :: integer(), ...}
|
|
do_accumulation_metrics(MetricsIn, undefined) ->
|
|
MetricsIn;
|
|
do_accumulation_metrics(MetricsIn, {MetricsAcc, _}) ->
|
|
Keys = maps:keys(MetricsIn),
|
|
lists:foldl(
|
|
fun(Key, Acc) ->
|
|
InVal = maps:get(Key, MetricsIn),
|
|
NVal = InVal + maps:get(Key, MetricsAcc, 0),
|
|
maps:put(Key, NVal, Acc)
|
|
end,
|
|
#{},
|
|
Keys
|
|
).
|
|
|
|
reset() ->
|
|
Nodes = mria:running_nodes(),
|
|
_ = emqx_topic_metrics_proto_v1:reset(Nodes),
|
|
ok.
|
|
|
|
reset(Topic) ->
|
|
Nodes = mria:running_nodes(),
|
|
case emqx_topic_metrics_proto_v1:reset(Nodes, Topic) of
|
|
{SuccResList, []} ->
|
|
case
|
|
lists:filter(
|
|
fun
|
|
({error, _}) -> true;
|
|
(_) -> false
|
|
end,
|
|
SuccResList
|
|
)
|
|
of
|
|
[{error, Reason} | _] ->
|
|
{error, Reason};
|
|
[] ->
|
|
ok
|
|
end
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% utils
|
|
|
|
reason2httpresp(quota_exceeded) ->
|
|
Msg = list_to_binary(
|
|
io_lib:format(
|
|
"Max topic metrics count is ~p",
|
|
[emqx_topic_metrics:max_limit()]
|
|
)
|
|
),
|
|
{409, #{code => ?EXCEED_LIMIT, message => Msg}};
|
|
reason2httpresp(bad_topic) ->
|
|
Msg = <<"Wildcard topic is not supported">>,
|
|
{400, #{code => ?BAD_TOPIC, message => Msg}};
|
|
reason2httpresp(already_existed) ->
|
|
Msg = <<"Topic already registered">>,
|
|
{400, #{code => ?BAD_TOPIC, message => Msg}};
|
|
reason2httpresp(topic_not_found) ->
|
|
Msg = <<"Topic not found">>,
|
|
{404, #{code => ?TOPIC_NOT_FOUND, message => Msg}};
|
|
reason2httpresp(not_found) ->
|
|
Msg = <<"Topic not found">>,
|
|
{404, #{code => ?TOPIC_NOT_FOUND, message => Msg}}.
|
|
|
|
get_cluster_response(Args) ->
|
|
case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of
|
|
{error, {badrpc, RPCReason}} ->
|
|
{500, RPCReason};
|
|
{error, Reason} when is_atom(Reason) ->
|
|
reason2httpresp(Reason);
|
|
{ok, Metrics} ->
|
|
{200, Metrics}
|
|
end.
|