diff --git a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl index f282037dc..4c6b1cc8c 100644 --- a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl +++ b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl @@ -23,12 +23,13 @@ -define(MAX_SIZE, 1000). --record(top_k, { index :: topk_index() - , last_update_time :: pos_integer() - , extra = [] - }). +-record(top_k, { + index :: topk_index(), + last_update_time :: pos_integer(), + extra = [] +}). --record(index_tab, { index :: index()}). +-record(index_tab, {index :: index()}). -type top_k() :: #top_k{}. -type index_tab() :: #index_tab{}. diff --git a/apps/emqx_slow_subs/rebar.config b/apps/emqx_slow_subs/rebar.config index 528efecb6..9f17b7657 100644 --- a/apps/emqx_slow_subs/rebar.config +++ b/apps/emqx_slow_subs/rebar.config @@ -1,4 +1,5 @@ %% -*- mode: erlang -*- -{deps, [ {emqx, {path, "../emqx"}} - ]}. +{deps, [{emqx, {path, "../emqx"}}]}. + +{project_plugins, [erlfmt]}. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 801c5ff51..74adfde1c 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -41,48 +41,52 @@ paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"]. schema(("/slow_subscriptions")) -> #{ - 'operationId' => slow_subs, - delete => #{tags => [<<"slow subs">>], - description => <<"Clear current data and re count slow topic">>, - parameters => [], - 'requestBody' => [], - responses => #{204 => <<"No Content">>} - }, - get => #{tags => [<<"slow subs">>], - description => <<"Get slow topics statistics record data">>, - parameters => [ {page, mk(pos_integer(), #{in => query})} - , {limit, mk(pos_integer(), #{in => query})} - ], - 'requestBody' => [], - responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]} - } - }; - + 'operationId' => slow_subs, + delete => #{ + tags => [<<"slow subs">>], + description => <<"Clear current data and re count slow topic">>, + parameters => [], + 'requestBody' => [], + responses => #{204 => <<"No Content">>} + }, + get => #{ + tags => [<<"slow subs">>], + description => <<"Get slow topics statistics record data">>, + parameters => [ + {page, mk(pos_integer(), #{in => query})}, + {limit, mk(pos_integer(), #{in => query})} + ], + 'requestBody' => [], + responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]} + } + }; schema("/slow_subscriptions/settings") -> - #{'operationId' => settings, - get => #{tags => [<<"slow subs">>], - description => <<"Get slow subs settings">>, - responses => #{200 => conf_schema()} - }, - put => #{tags => [<<"slow subs">>], - description => <<"Update slow subs settings">>, - 'requestBody' => conf_schema(), - responses => #{200 => conf_schema()} - } - }. + #{ + 'operationId' => settings, + get => #{ + tags => [<<"slow subs">>], + description => <<"Get slow subs settings">>, + responses => #{200 => conf_schema()} + }, + put => #{ + tags => [<<"slow subs">>], + description => <<"Update slow subs settings">>, + 'requestBody' => conf_schema(), + responses => #{200 => conf_schema()} + } + }. fields(record) -> - [ {clientid, - mk(string(), #{desc => <<"the clientid">>})}, - {node, - mk(string(), #{desc => <<"the node">>})}, - {topic, - mk(string(), #{desc => <<"the topic">>})}, - {timespan, - mk(integer(), - #{desc => <<"timespan for message transmission">>})}, - {last_update_time, - mk(integer(), #{desc => <<"the timestamp of last update">>})} + [ + {clientid, mk(string(), #{desc => <<"the clientid">>})}, + {node, mk(string(), #{desc => <<"the node">>})}, + {topic, mk(string(), #{desc => <<"the topic">>})}, + {timespan, + mk( + integer(), + #{desc => <<"timespan for message transmission">>} + )}, + {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} ]. conf_schema() -> @@ -92,17 +96,17 @@ conf_schema() -> slow_subs(delete, _) -> _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end), {204}; - slow_subs(get, _) -> NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end), - Fun = fun({ok, L}, Acc) -> L ++ Acc; - (_, Acc) -> Acc - end, + Fun = fun + ({ok, L}, Acc) -> L ++ Acc; + (_, Acc) -> Acc + end, RankL = lists:foldl(Fun, [], NodeRankL), SortFun = fun(#{timespan := A}, #{timespan := B}) -> - A > B - end, + A > B + end, SortedL = lists:sort(SortFun, RankL), SortedL2 = lists:sublist(SortedL, ?MAX_SIZE), @@ -112,22 +116,25 @@ slow_subs(get, _) -> get_history() -> Node = node(), RankL = ets:tab2list(?TOPK_TAB), - ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)), - last_update_time = LastUpdateTime - }) -> - #{ clientid => ClientId - , node => Node - , topic => Topic - , timespan => TimeSpan - , last_update_time => LastUpdateTime - } - end, + ConvFun = fun( + #top_k{ + index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)), + last_update_time = LastUpdateTime + } + ) -> + #{ + clientid => ClientId, + node => Node, + topic => Topic, + timespan => TimeSpan, + last_update_time => LastUpdateTime + } + end, lists:map(ConvFun, RankL). settings(get, _) -> {200, emqx:get_raw_config([slow_subs], #{})}; - settings(put, #{body := Body}) -> case emqx_slow_subs:update_settings(Body) of {ok, #{config := NewConf}} -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl index ba2b84fba..f1de12088 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl @@ -18,9 +18,10 @@ -behaviour(application). --export([ start/2 - , stop/1 - ]). +-export([ + start/2, + stop/1 +]). start(_Type, _Args) -> {ok, Sup} = emqx_slow_subs_sup:start_link(), diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index e9fe2fec3..fe6a5aa0c 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -9,23 +9,32 @@ namespace() -> "slow_subs". roots() -> ["slow_subs"]. fields("slow_subs") -> - [ {enable, sc(boolean(), false, "Enable this feature.")} - , {threshold, - sc(emqx_schema:duration_ms(), - "500ms", - "The latency threshold for statistics, the minimum value is 100ms.")} - , {expire_interval, - sc(emqx_schema:duration_ms(), - "300s", - "The eviction time of the record, which in the statistics record table.")} - , {top_k_num, - sc(pos_integer(), - 10, - "The maximum number of records in the slow subscription statistics record table.")} - , {stats_type, - sc(hoconsc:union([whole, internal, response]), - whole, - "The method to calculate the latency.")} + [ + {enable, sc(boolean(), false, "Enable this feature.")}, + {threshold, + sc( + emqx_schema:duration_ms(), + "500ms", + "The latency threshold for statistics, the minimum value is 100ms." + )}, + {expire_interval, + sc( + emqx_schema:duration_ms(), + "300s", + "The eviction time of the record, which in the statistics record table." + )}, + {top_k_num, + sc( + pos_integer(), + 10, + "The maximum number of records in the slow subscription statistics record table." + )}, + {stats_type, + sc( + hoconsc:union([whole, internal, response]), + whole, + "The method to calculate the latency." + )} ]. desc("slow_subs") -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl index c4c5625e0..5c94f594b 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl @@ -27,10 +27,14 @@ start_link() -> init([]) -> emqx_slow_subs:init_tab(), - {ok, {{one_for_one, 10, 3600}, - [#{id => st_statistics, - start => {emqx_slow_subs, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_slow_subs]}]}}. + {ok, + {{one_for_one, 10, 3600}, [ + #{ + id => st_statistics, + start => {emqx_slow_subs, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_slow_subs] + } + ]}}. diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl index 94f17acd5..0547eb1f8 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -27,13 +27,17 @@ -define(NOW, erlang:system_time(millisecond)). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). --define(BASE_CONF, <<""" -slow_subs { - enable = true - top_k_num = 5, - expire_interval = 5m - stats_type = whole - }""">>). +-define(BASE_CONF, << + "" + "\n" + "slow_subs {\n" + " enable = true\n" + " top_k_num = 5,\n" + " expire_interval = 5m\n" + " stats_type = whole\n" + " }" + "" +>>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -46,7 +50,6 @@ init_per_suite(Config) -> meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok), - ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF), emqx_common_test_helpers:start_apps([emqx_slow_subs]), Config. @@ -64,13 +67,13 @@ init_per_testcase(t_expire, Config) -> Cfg = emqx_config:get([slow_subs]), emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}), Config; - init_per_testcase(_, Config) -> Config. end_per_testcase(_, _) -> case erlang:whereis(node()) of - undefined -> ok; + undefined -> + ok; P -> erlang:unlink(P), erlang:exit(P, kill) @@ -88,21 +91,25 @@ t_pub(_) -> Now = ?NOW, %% publish - lists:foreach(fun(I) -> - Topic = list_to_binary(io_lib:format("/test1/~p", [I])), - Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}), - timer:sleep(100) - end, - lists:seq(1, 10)), + lists:foreach( + fun(I) -> + Topic = list_to_binary(io_lib:format("/test1/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}), + timer:sleep(100) + end, + lists:seq(1, 10) + ), - lists:foreach(fun(I) -> - Topic = list_to_binary(io_lib:format("/test2/~p", [I])), - Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}), - timer:sleep(100) - end, - lists:seq(1, 10)), + lists:foreach( + fun(I) -> + Topic = list_to_binary(io_lib:format("/test2/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}), + timer:sleep(100) + end, + lists:seq(1, 10) + ), timer:sleep(1000), Size = ets:info(?TOPK_TAB, size), @@ -114,10 +121,12 @@ t_pub(_) -> t_expire(_) -> Now = ?NOW, Each = fun(I) -> - ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), - ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), - last_update_time = Now - timer:minutes(5)}) - end, + ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), + ets:insert(?TOPK_TAB, #top_k{ + index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), + last_update_time = Now - timer:minutes(5) + }) + end, lists:foreach(Each, lists:seq(1, 5)), @@ -130,10 +139,12 @@ start_client(Subs) -> [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. client(I, Subs) -> - {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, io_lib:format("slow_subs_~p", [I])}, - {username, <<"plain">>}, - {password, <<"plain">>}]), + {ok, C} = emqtt:start_link([ + {host, "localhost"}, + {clientid, io_lib:format("slow_subs_~p", [I])}, + {username, <<"plain">>}, + {password, <<"plain">>} + ]), {ok, _} = emqtt:connect(C), Len = erlang:length(Subs), diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index fec10839d..3f5f63f29 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -34,15 +34,18 @@ -define(NOW, erlang:system_time(millisecond)). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). --define(CONF_DEFAULT, <<""" -slow_subs -{ - enable = true - top_k_num = 5, - expire_interval = 60000 - stats_type = whole -}""">>). - +-define(CONF_DEFAULT, << + "" + "\n" + "slow_subs\n" + "{\n" + " enable = true\n" + " top_k_num = 5,\n" + " expire_interval = 60000\n" + " stats_type = whole\n" + "}" + "" +>>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -79,7 +82,8 @@ init_per_testcase(_, Config) -> end_per_testcase(_, Config) -> application:stop(emqx_slow_subs), case erlang:whereis(node()) of - undefined -> ok; + undefined -> + ok; P -> erlang:unlink(P), erlang:exit(P, kill) @@ -89,50 +93,70 @@ end_per_testcase(_, Config) -> t_get_history(_) -> Now = ?NOW, Each = fun(I) -> - ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), - ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), - last_update_time = Now}) - end, + ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), + ets:insert(?TOPK_TAB, #top_k{ + index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), + last_update_time = Now + }) + end, lists:foreach(Each, lists:seq(1, 5)), - {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "page=1&limit=10", - auth_header_()), + {ok, Data} = request_api( + get, + api_path(["slow_subscriptions"]), + "page=1&limit=10", + auth_header_() + ), #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), - ?assertMatch(#{<<"clientid">> := <<"test_5">>, - <<"topic">> := <<"topic">>, - <<"last_update_time">> := Now, - <<"node">> := _, - <<"timespan">> := _}, First). + ?assertMatch( + #{ + <<"clientid">> := <<"test_5">>, + <<"topic">> := <<"topic">>, + <<"last_update_time">> := Now, + <<"node">> := _, + <<"timespan">> := _ + }, + First + ). t_clear(_) -> - ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)), - last_update_time = ?NOW}), + ets:insert(?TOPK_TAB, #top_k{ + index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)), + last_update_time = ?NOW + }), - {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], - auth_header_()), + {ok, _} = request_api( + delete, + api_path(["slow_subscriptions"]), + [], + auth_header_() + ), ?assertEqual(0, ets:info(?TOPK_TAB, size)). t_settting(_) -> Conf = emqx:get_config([slow_subs]), Conf2 = Conf#{stats_type => internal}, - {ok, Data} = request_api(put, - api_path(["slow_subscriptions", "settings"]), - [], - auth_header_(), - Conf2), + {ok, Data} = request_api( + put, + api_path(["slow_subscriptions", "settings"]), + [], + auth_header_(), + Conf2 + ), Return = decode_json(Data), ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), - {ok, GetData} = request_api(get, - api_path(["slow_subscriptions", "settings"]), - [], - auth_header_() - ), + {ok, GetData} = request_api( + get, + api_path(["slow_subscriptions", "settings"]), + [], + auth_header_() + ), timer:sleep(1000), @@ -151,25 +175,28 @@ request_api(Method, Url, QueryParams, Auth) -> request_api(Method, Url, QueryParams, Auth, []). request_api(Method, Url, QueryParams, Auth, []) -> - NewUrl = case QueryParams of - "" -> Url; - _ -> Url ++ "?" ++ QueryParams - end, + NewUrl = + case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, do_request_api(Method, {NewUrl, [Auth]}); request_api(Method, Url, QueryParams, Auth, Body) -> - NewUrl = case QueryParams of - "" -> Url; - _ -> Url ++ "?" ++ QueryParams - end, + NewUrl = + case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). -do_request_api(Method, Request)-> +do_request_api(Method, Request) -> ct:pal("Method: ~p, Request: ~p", [Method, Request]), case httpc:request(Method, Request, [], [{body_format, binary}]) of {error, socket_closed_remotely} -> {error, socket_closed_remotely}; - {ok, {{"HTTP/1.1", Code, _}, _, Return} } - when Code =:= 200 orelse Code =:= 204 -> + {ok, {{"HTTP/1.1", Code, _}, _, Return}} when + Code =:= 200 orelse Code =:= 204 + -> {ok, Return}; {ok, {Reason, _, _}} -> {error, Reason} @@ -181,8 +208,8 @@ auth_header_() -> auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). auth_header_(User, Pass) -> - Encoded = base64:encode_to_string(lists:append([User,":",Pass])), - {"Authorization","Basic " ++ Encoded}. + Encoded = base64:encode_to_string(lists:append([User, ":", Pass])), + {"Authorization", "Basic " ++ Encoded}. -api_path(Parts)-> +api_path(Parts) -> ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).