151 lines
4.5 KiB
Erlang
151 lines
4.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_slow_subs_api).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
|
|
|
|
-export([slow_subs/2, get_history/0, settings/2]).
|
|
|
|
-import(hoconsc, [mk/2, ref/1, ref/2]).
|
|
-import(emqx_mgmt_util, [bad_request/0]).
|
|
|
|
-define(APP, emqx_slow_subs).
|
|
-define(APP_NAME, <<"emqx_slow_subs">>).
|
|
-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)).
|
|
|
|
namespace() -> "slow_subscribers_statistics".
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE).
|
|
|
|
paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"].
|
|
|
|
schema(("/slow_subscriptions")) ->
|
|
#{
|
|
'operationId' => slow_subs,
|
|
delete => #{
|
|
tags => [<<"slow subs">>],
|
|
description => ?DESC(clear_records_api),
|
|
parameters => [],
|
|
'requestBody' => [],
|
|
responses => #{204 => <<"No Content">>}
|
|
},
|
|
get => #{
|
|
tags => [<<"slow subs">>],
|
|
description => ?DESC(get_records_api),
|
|
parameters => [
|
|
ref(emqx_dashboard_swagger, page),
|
|
ref(emqx_dashboard_swagger, limit)
|
|
],
|
|
'requestBody' => [],
|
|
responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
|
|
}
|
|
};
|
|
schema("/slow_subscriptions/settings") ->
|
|
#{
|
|
'operationId' => settings,
|
|
get => #{
|
|
tags => [<<"slow subs">>],
|
|
description => ?DESC(get_setting_api),
|
|
responses => #{200 => conf_schema()}
|
|
},
|
|
put => #{
|
|
tags => [<<"slow subs">>],
|
|
description => ?DESC(update_setting_api),
|
|
'requestBody' => conf_schema(),
|
|
responses => #{200 => conf_schema()}
|
|
}
|
|
}.
|
|
|
|
fields(record) ->
|
|
[
|
|
{clientid, mk(string(), #{desc => ?DESC(clientid)})},
|
|
{node, mk(string(), #{desc => ?DESC(node)})},
|
|
{topic, mk(string(), #{desc => ?DESC(topic)})},
|
|
{timespan,
|
|
mk(
|
|
integer(),
|
|
#{desc => ?DESC(timespan)}
|
|
)},
|
|
{last_update_time, mk(integer(), #{desc => ?DESC(last_update_time)})}
|
|
].
|
|
|
|
conf_schema() ->
|
|
Ref = hoconsc:ref(emqx_slow_subs_schema, "slow_subs"),
|
|
hoconsc:mk(Ref, #{}).
|
|
|
|
slow_subs(delete, _) ->
|
|
_ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
|
|
{204};
|
|
slow_subs(get, _) ->
|
|
NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
|
|
Fun = fun
|
|
({ok, L}, Acc) -> L ++ Acc;
|
|
(_, Acc) -> Acc
|
|
end,
|
|
RankL = lists:foldl(Fun, [], NodeRankL),
|
|
|
|
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
|
|
A > B
|
|
end,
|
|
|
|
SortedL = lists:sort(SortFun, RankL),
|
|
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
|
|
|
|
{200, #{data => SortedL2}}.
|
|
|
|
get_history() ->
|
|
Node = node(),
|
|
RankL = ets:tab2list(?TOPK_TAB),
|
|
ConvFun = fun(
|
|
#top_k{
|
|
index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
|
|
last_update_time = LastUpdateTime
|
|
}
|
|
) ->
|
|
#{
|
|
clientid => ClientId,
|
|
node => Node,
|
|
topic => Topic,
|
|
timespan => TimeSpan,
|
|
last_update_time => LastUpdateTime
|
|
}
|
|
end,
|
|
|
|
lists:map(ConvFun, RankL).
|
|
|
|
settings(get, _) ->
|
|
{200, emqx:get_raw_config([slow_subs], #{})};
|
|
settings(put, #{body := Body}) ->
|
|
case emqx_slow_subs:update_settings(Body) of
|
|
{ok, #{config := NewConf}} ->
|
|
{200, NewConf};
|
|
{error, Reason} ->
|
|
Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])),
|
|
{400, 'BAD_REQUEST', Message}
|
|
end.
|
|
|
|
rpc_call(Fun) ->
|
|
Nodes = mria_mnesia:running_nodes(),
|
|
Fun(Nodes).
|