Feat/slow topic api (#6101)

* feat(emqx_st_statistics): add api
This commit is contained in:
lafirest 2021-11-10 11:08:06 +08:00 committed by GitHub
parent 388c29344a
commit fae815b35c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 328 additions and 5 deletions

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-record(top_k, { rank :: pos_integer()
, topic :: emqx_types:topic()
, average_count :: number()
, average_elapsed :: number()}).
-type top_k() :: #top_k{}.

View File

@ -25,7 +25,7 @@
-logger_header("[SLOW TOPICS]").
-export([ start_link/1, on_publish_done/3, enable/0
, disable/0
, disable/0, clear_history/0
]).
%% gen_server callbacks
@ -78,6 +78,12 @@
-type top_k() :: #top_k{}.
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
-ifdef(TEST).
-define(TOPK_ACCESS, public).
-else.
-define(TOPK_ACCESS, protected).
-endif.
%% erlang term order
%% number < atom < reference < fun < port < pid < tuple < list < bit string
@ -106,6 +112,9 @@ on_publish_done(#message{timestamp = Timestamp} = Msg, Threshold, Counter) ->
ok
end.
clear_history() ->
gen_server:call(?MODULE, ?FUNCTION_NAME).
enable() ->
gen_server:call(?MODULE, {enable, true}).
@ -146,6 +155,10 @@ handle_call({enable, Enable}, _From,
end,
{reply, ok, State2};
handle_call(clear_history, _, State) ->
ets:delete_all_objects(?TOPK_TAB),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
@ -185,7 +198,7 @@ init_log_tab(_) ->
]).
init_topk_tab(_) ->
?TOPK_TAB = ets:new(?TOPK_TAB, [ set, protected, named_table
?TOPK_TAB = ets:new(?TOPK_TAB, [ set, ?TOPK_ACCESS, named_table
, {keypos, #top_k.rank}, {write_concurrency, false}
, {read_concurrency, true}
]).

View File

@ -0,0 +1,75 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_api).
-rest_api(#{name => clear_history,
method => 'DELETE',
path => "/slow_topic",
func => clear_history,
descr => "Clear current data and re count slow topic"}).
-rest_api(#{name => get_history,
method => 'GET',
path => "/slow_topic",
func => get_history,
descr => "Get slow topics statistics record data"}).
-export([ clear_history/2
, get_history/2
]).
-include_lib("emqx_st_statistics/include/emqx_st_statistics.hrl").
-import(minirest, [return/1]).
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
clear_history(_Bindings, _Params) ->
ok = emqx_st_statistics:clear_history(),
return(ok).
get_history(_Bindings, Params) ->
PageT = proplists:get_value(<<"_page">>, Params),
LimitT = proplists:get_value(<<"_limit">>, Params),
Page = erlang:binary_to_integer(PageT),
Limit = erlang:binary_to_integer(LimitT),
Start = (Page - 1) * Limit + 1,
Size = ets:info(?TOPK_TAB, size),
EndT = Start + Limit - 1,
End = erlang:min(EndT, Size),
Infos = lists:foldl(fun(Rank, Acc) ->
[#top_k{topic = Topic
, average_count = Count
, average_elapsed = Elapsed}] = ets:lookup(?TOPK_TAB, Rank),
Info =[ {rank, Rank}
, {topic, Topic}
, {count, Count}
, {elapsed, Elapsed}],
[Info | Acc]
end,
[],
lists:seq(Start, End)),
return({ok, #{meta => #{page => Page,
limit => Limit,
hasnext => End < Size,
count => End - Start + 1},
data => Infos}}).

View File

@ -22,11 +22,15 @@
-export([ start/2
, stop/1
, start/0
]).
start(_Type, _Args) ->
Env = application:get_all_env(emqx_st_statistics),
{ok, Sup} = emqx_st_statistics_sup:start_link(Env),
start().
start() ->
Conf = application:get_all_env(emqx_st_statistics),
{ok, Sup} = emqx_st_statistics_sup:start_link(Conf),
{ok, Sup}.
stop(_State) ->

View File

@ -0,0 +1,152 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl").
-include_lib("emqx_st_statistics/include/emqx_st_statistics.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_modules),
emqx_ct_helpers:start_apps([emqx_st_statistics, emqx_management]),
Config.
end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_st_statistics, emqx_management]),
Config.
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, Config) ->
Config.
get(Key, ResponseBody) ->
maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])).
lookup_alarm(Name, [#{<<"name">> := Name} | _More]) ->
true;
lookup_alarm(Name, [_Alarm | More]) ->
lookup_alarm(Name, More);
lookup_alarm(_Name, []) ->
false.
is_existing(Name, [#{name := Name} | _More]) ->
true;
is_existing(Name, [_Alarm | More]) ->
is_existing(Name, More);
is_existing(_Name, []) ->
false.
t_get_history(_) ->
ets:insert(?TOPK_TAB, #top_k{rank = 1,
topic = <<"test">>,
average_count = 12,
average_elapsed = 1500}),
{ok, Data} = request_api(get, api_path(["slow_topic"]), "_page=1&_limit=10",
auth_header_()),
Return = #{meta => #{page => 1,
limit => 10,
hasnext => false,
count => 1},
data => [#{topic => <<"test">>,
rank => 1,
elapsed => 1500,
count => 12}],
code => 0},
ShouldBe = emqx_json:encode(Return),
?assertEqual(ShouldBe, erlang:list_to_binary(Data)).
t_clear(_) ->
ets:insert(?TOPK_TAB, #top_k{rank = 1,
topic = <<"test">>,
average_count = 12,
average_elapsed = 1500}),
{ok, _} = request_api(delete, api_path(["slow_topic"]), [],
auth_header_()),
?assertEqual(0, ets:info(?TOPK_TAB, size)).
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
filter(List, Key, Value) ->
lists:filter(fun(Item) ->
maps:get(Key, Item) == Value
end, List).

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_st_statistics).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/logger.hrl").
-logger_header("[st_statistics]").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-spec(load(list()) -> ok).
load(_Env) ->
_ = emqx_st_statistics_app:start(),
ok.
-spec(unload(list()) -> ok).
unload(_Env) ->
emqx_st_statistics_app:stop(undefined).
description() ->
"EMQ X Slow Topic Statistics Module".
%%--------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
{application, emqx_modules,
[{description, "EMQ X Module Management"},
{vsn, "4.3.3"},
{vsn, "4.3.4"},
{modules, []},
{applications, [kernel,stdlib]},
{mod, {emqx_modules_app, []}},

View File

@ -1,6 +1,9 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.3", [
{load_module, emqx_mod_st_statistics, brutal_purge, soft_purge, []}
]},
{"4.3.2", [
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
]},
@ -16,6 +19,9 @@
{<<".*">>, []}
],
[
{"4.3.3", [
{load_module, emqx_mod_st_statistics, brutal_purge, soft_purge, []}
]},
{"4.3.2", [
{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}
]},

View File

@ -2217,6 +2217,7 @@ end}.
[{emqx_mod_rewrite, Rewrites()}],
[{emqx_mod_topic_metrics, []}],
[{emqx_mod_delayed, []}],
[{emqx_mod_st_statistics, []}],
[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}]
])
end}.