emqx/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl

151 lines
4.5 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_slow_subs_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
-export([slow_subs/2, get_history/0, settings/2]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_mgmt_util, [bad_request/0]).
-define(APP, emqx_slow_subs).
-define(APP_NAME, <<"emqx_slow_subs">>).
-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)).
namespace() -> "slow_subscribers_statistics".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE).
paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"].
schema(("/slow_subscriptions")) ->
#{
'operationId' => slow_subs,
delete => #{
tags => [<<"slow subs">>],
description => ?DESC(clear_records_api),
parameters => [],
'requestBody' => [],
responses => #{204 => <<"No Content">>}
},
get => #{
tags => [<<"slow subs">>],
description => ?DESC(get_records_api),
parameters => [
ref(emqx_dashboard_swagger, page),
ref(emqx_dashboard_swagger, limit)
],
'requestBody' => [],
responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
}
};
schema("/slow_subscriptions/settings") ->
#{
'operationId' => settings,
get => #{
tags => [<<"slow subs">>],
description => ?DESC(get_setting_api),
responses => #{200 => conf_schema()}
},
put => #{
tags => [<<"slow subs">>],
description => ?DESC(update_setting_api),
'requestBody' => conf_schema(),
responses => #{200 => conf_schema()}
}
}.
fields(record) ->
[
{clientid, mk(string(), #{desc => ?DESC(clientid)})},
{node, mk(string(), #{desc => ?DESC(node)})},
{topic, mk(string(), #{desc => ?DESC(topic)})},
{timespan,
mk(
integer(),
#{desc => ?DESC(timespan)}
)},
{last_update_time, mk(integer(), #{desc => ?DESC(last_update_time)})}
].
conf_schema() ->
Ref = hoconsc:ref(emqx_slow_subs_schema, "slow_subs"),
hoconsc:mk(Ref, #{}).
slow_subs(delete, _) ->
_ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
{204};
slow_subs(get, _) ->
NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
Fun = fun
({ok, L}, Acc) -> L ++ Acc;
(_, Acc) -> Acc
end,
RankL = lists:foldl(Fun, [], NodeRankL),
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
A > B
end,
SortedL = lists:sort(SortFun, RankL),
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
{200, #{data => SortedL2}}.
get_history() ->
Node = node(),
RankL = ets:tab2list(?TOPK_TAB),
ConvFun = fun(
#top_k{
index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
last_update_time = LastUpdateTime
}
) ->
#{
clientid => ClientId,
node => Node,
topic => Topic,
timespan => TimeSpan,
last_update_time => LastUpdateTime
}
end,
lists:map(ConvFun, RankL).
settings(get, _) ->
{200, emqx:get_raw_config([slow_subs], #{})};
settings(put, #{body := Body}) ->
case emqx_slow_subs:update_settings(Body) of
{ok, #{config := NewConf}} ->
{200, NewConf};
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])),
{400, 'BAD_REQUEST', Message}
end.
rpc_call(Fun) ->
Nodes = mria_mnesia:running_nodes(),
Fun(Nodes).