From fae815b35c283434121cbb3eba3c5be83f0e587c Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 10 Nov 2021 11:08:06 +0800 Subject: [PATCH] Feat/slow topic api (#6101) * feat(emqx_st_statistics): add api --- .../include/emqx_st_statistics.hrl | 25 +++ .../src/emqx_st_statistics.erl | 17 +- .../src/emqx_st_statistics_api.erl | 75 +++++++++ .../src/emqx_st_statistics_app.erl | 8 +- .../test/emqx_st_statistics_api_SUITE.erl | 152 ++++++++++++++++++ .../src/emqx_mod_st_statistics.erl | 47 ++++++ lib-ce/emqx_modules/src/emqx_modules.app.src | 2 +- .../emqx_modules/src/emqx_modules.appup.src | 6 + priv/emqx.schema | 1 + 9 files changed, 328 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_st_statistics/include/emqx_st_statistics.hrl create mode 100644 apps/emqx_st_statistics/src/emqx_st_statistics_api.erl create mode 100644 apps/emqx_st_statistics/test/emqx_st_statistics_api_SUITE.erl create mode 100644 lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl diff --git a/apps/emqx_st_statistics/include/emqx_st_statistics.hrl b/apps/emqx_st_statistics/include/emqx_st_statistics.hrl new file mode 100644 index 000000000..9184c3194 --- /dev/null +++ b/apps/emqx_st_statistics/include/emqx_st_statistics.hrl @@ -0,0 +1,25 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(LOG_TAB, emqx_st_statistics_log). +-define(TOPK_TAB, emqx_st_statistics_topk). + +-record(top_k, { rank :: pos_integer() + , topic :: emqx_types:topic() + , average_count :: number() + , average_elapsed :: number()}). + +-type top_k() :: #top_k{}. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics.erl b/apps/emqx_st_statistics/src/emqx_st_statistics.erl index 801a21c96..2aa5c9fb4 100644 --- a/apps/emqx_st_statistics/src/emqx_st_statistics.erl +++ b/apps/emqx_st_statistics/src/emqx_st_statistics.erl @@ -25,7 +25,7 @@ -logger_header("[SLOW TOPICS]"). -export([ start_link/1, on_publish_done/3, enable/0 - , disable/0 + , disable/0, clear_history/0 ]). %% gen_server callbacks @@ -78,6 +78,12 @@ -type top_k() :: #top_k{}. -type top_k_map() :: #{emqx_types:topic() => top_k()}. +-ifdef(TEST). +-define(TOPK_ACCESS, public). +-else. +-define(TOPK_ACCESS, protected). +-endif. + %% erlang term order %% number < atom < reference < fun < port < pid < tuple < list < bit string @@ -106,6 +112,9 @@ on_publish_done(#message{timestamp = Timestamp} = Msg, Threshold, Counter) -> ok end. +clear_history() -> + gen_server:call(?MODULE, ?FUNCTION_NAME). + enable() -> gen_server:call(?MODULE, {enable, true}). @@ -146,6 +155,10 @@ handle_call({enable, Enable}, _From, end, {reply, ok, State2}; +handle_call(clear_history, _, State) -> + ets:delete_all_objects(?TOPK_TAB), + {reply, ok, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -185,7 +198,7 @@ init_log_tab(_) -> ]). init_topk_tab(_) -> - ?TOPK_TAB = ets:new(?TOPK_TAB, [ set, protected, named_table + ?TOPK_TAB = ets:new(?TOPK_TAB, [ set, ?TOPK_ACCESS, named_table , {keypos, #top_k.rank}, {write_concurrency, false} , {read_concurrency, true} ]). diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics_api.erl b/apps/emqx_st_statistics/src/emqx_st_statistics_api.erl new file mode 100644 index 000000000..3980caccd --- /dev/null +++ b/apps/emqx_st_statistics/src/emqx_st_statistics_api.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_st_statistics_api). + +-rest_api(#{name => clear_history, + method => 'DELETE', + path => "/slow_topic", + func => clear_history, + descr => "Clear current data and re count slow topic"}). + +-rest_api(#{name => get_history, + method => 'GET', + path => "/slow_topic", + func => get_history, + descr => "Get slow topics statistics record data"}). + +-export([ clear_history/2 + , get_history/2 + ]). + +-include_lib("emqx_st_statistics/include/emqx_st_statistics.hrl"). + +-import(minirest, [return/1]). + +%%-------------------------------------------------------------------- +%% HTTP API +%%-------------------------------------------------------------------- + +clear_history(_Bindings, _Params) -> + ok = emqx_st_statistics:clear_history(), + return(ok). + +get_history(_Bindings, Params) -> + PageT = proplists:get_value(<<"_page">>, Params), + LimitT = proplists:get_value(<<"_limit">>, Params), + Page = erlang:binary_to_integer(PageT), + Limit = erlang:binary_to_integer(LimitT), + Start = (Page - 1) * Limit + 1, + Size = ets:info(?TOPK_TAB, size), + EndT = Start + Limit - 1, + End = erlang:min(EndT, Size), + Infos = lists:foldl(fun(Rank, Acc) -> + [#top_k{topic = Topic + , average_count = Count + , average_elapsed = Elapsed}] = ets:lookup(?TOPK_TAB, Rank), + + Info =[ {rank, Rank} + , {topic, Topic} + , {count, Count} + , {elapsed, Elapsed}], + + [Info | Acc] + end, + [], + lists:seq(Start, End)), + + return({ok, #{meta => #{page => Page, + limit => Limit, + hasnext => End < Size, + count => End - Start + 1}, + data => Infos}}). diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl b/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl index e5b62a00e..b8e5987db 100644 --- a/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl +++ b/apps/emqx_st_statistics/src/emqx_st_statistics_app.erl @@ -22,11 +22,15 @@ -export([ start/2 , stop/1 + , start/0 ]). start(_Type, _Args) -> - Env = application:get_all_env(emqx_st_statistics), - {ok, Sup} = emqx_st_statistics_sup:start_link(Env), + start(). + +start() -> + Conf = application:get_all_env(emqx_st_statistics), + {ok, Sup} = emqx_st_statistics_sup:start_link(Conf), {ok, Sup}. stop(_State) -> diff --git a/apps/emqx_st_statistics/test/emqx_st_statistics_api_SUITE.erl b/apps/emqx_st_statistics/test/emqx_st_statistics_api_SUITE.erl new file mode 100644 index 000000000..e6b99e70a --- /dev/null +++ b/apps/emqx_st_statistics/test/emqx_st_statistics_api_SUITE.erl @@ -0,0 +1,152 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_st_statistics_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_management/include/emqx_mgmt.hrl"). +-include_lib("emqx_st_statistics/include/emqx_st_statistics.hrl"). + +-define(CONTENT_TYPE, "application/x-www-form-urlencoded"). + +-define(HOST, "http://127.0.0.1:8081/"). + +-define(API_VERSION, "v4"). + +-define(BASE_PATH, "api"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + application:load(emqx_modules), + emqx_ct_helpers:start_apps([emqx_st_statistics, emqx_management]), + Config. + +end_per_suite(Config) -> + emqx_ct_helpers:stop_apps([emqx_st_statistics, emqx_management]), + Config. + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, Config) -> + Config. + +get(Key, ResponseBody) -> + maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])). + +lookup_alarm(Name, [#{<<"name">> := Name} | _More]) -> + true; +lookup_alarm(Name, [_Alarm | More]) -> + lookup_alarm(Name, More); +lookup_alarm(_Name, []) -> + false. + +is_existing(Name, [#{name := Name} | _More]) -> + true; +is_existing(Name, [_Alarm | More]) -> + is_existing(Name, More); +is_existing(_Name, []) -> + false. + +t_get_history(_) -> + ets:insert(?TOPK_TAB, #top_k{rank = 1, + topic = <<"test">>, + average_count = 12, + average_elapsed = 1500}), + + {ok, Data} = request_api(get, api_path(["slow_topic"]), "_page=1&_limit=10", + auth_header_()), + + Return = #{meta => #{page => 1, + limit => 10, + hasnext => false, + count => 1}, + data => [#{topic => <<"test">>, + rank => 1, + elapsed => 1500, + count => 12}], + code => 0}, + + ShouldBe = emqx_json:encode(Return), + + ?assertEqual(ShouldBe, erlang:list_to_binary(Data)). + +t_clear(_) -> + ets:insert(?TOPK_TAB, #top_k{rank = 1, + topic = <<"test">>, + average_count = 12, + average_elapsed = 1500}), + + {ok, _} = request_api(delete, api_path(["slow_topic"]), [], + auth_header_()), + + ?assertEqual(0, ets:info(?TOPK_TAB, size)). + +request_api(Method, Url, Auth) -> + request_api(Method, Url, [], Auth, []). + +request_api(Method, Url, QueryParams, Auth) -> + request_api(Method, Url, QueryParams, Auth, []). + +request_api(Method, Url, QueryParams, Auth, []) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth]}); +request_api(Method, Url, QueryParams, Auth, Body) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). + +do_request_api(Method, Request)-> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], []) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } + when Code =:= 200 orelse Code =:= 201 -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +auth_header_() -> + AppId = <<"admin">>, + AppSecret = <<"public">>, + auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User,":",Pass])), + {"Authorization","Basic " ++ Encoded}. + +api_path(Parts)-> + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). + +filter(List, Key, Value) -> + lists:filter(fun(Item) -> + maps:get(Key, Item) == Value + end, List). diff --git a/lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl b/lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl new file mode 100644 index 000000000..01a1444bd --- /dev/null +++ b/lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_st_statistics). + +-behaviour(emqx_gen_mod). + +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[st_statistics]"). + +%% emqx_gen_mod callbacks +-export([ load/1 + , unload/1 + , description/0 + ]). + +%%-------------------------------------------------------------------- +%% Load/Unload +%%-------------------------------------------------------------------- + +-spec(load(list()) -> ok). +load(_Env) -> + _ = emqx_st_statistics_app:start(), + ok. + +-spec(unload(list()) -> ok). +unload(_Env) -> + emqx_st_statistics_app:stop(undefined). + +description() -> + "EMQ X Slow Topic Statistics Module". + +%%-------------------------------------------------------------------- diff --git a/lib-ce/emqx_modules/src/emqx_modules.app.src b/lib-ce/emqx_modules/src/emqx_modules.app.src index bcb05fe31..47a3d8888 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.app.src +++ b/lib-ce/emqx_modules/src/emqx_modules.app.src @@ -1,6 +1,6 @@ {application, emqx_modules, [{description, "EMQ X Module Management"}, - {vsn, "4.3.3"}, + {vsn, "4.3.4"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, diff --git a/lib-ce/emqx_modules/src/emqx_modules.appup.src b/lib-ce/emqx_modules/src/emqx_modules.appup.src index f52ba1a61..aed228213 100644 --- a/lib-ce/emqx_modules/src/emqx_modules.appup.src +++ b/lib-ce/emqx_modules/src/emqx_modules.appup.src @@ -1,6 +1,9 @@ %% -*-: erlang -*- {VSN, [ + {"4.3.3", [ + {load_module, emqx_mod_st_statistics, brutal_purge, soft_purge, []} + ]}, {"4.3.2", [ {load_module, emqx_mod_presence, brutal_purge, soft_purge, []} ]}, @@ -16,6 +19,9 @@ {<<".*">>, []} ], [ + {"4.3.3", [ + {load_module, emqx_mod_st_statistics, brutal_purge, soft_purge, []} + ]}, {"4.3.2", [ {load_module, emqx_mod_presence, brutal_purge, soft_purge, []} ]}, diff --git a/priv/emqx.schema b/priv/emqx.schema index 4b33cf65c..6418501e3 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2217,6 +2217,7 @@ end}. [{emqx_mod_rewrite, Rewrites()}], [{emqx_mod_topic_metrics, []}], [{emqx_mod_delayed, []}], + [{emqx_mod_st_statistics, []}], [{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] ]) end}.