style: reformat slow subs codes

This commit is contained in:
firest 2022-04-19 15:31:05 +08:00
parent d6fdf7428e
commit 83511f8a4c
8 changed files with 232 additions and 171 deletions

View File

@ -23,12 +23,13 @@
-define(MAX_SIZE, 1000). -define(MAX_SIZE, 1000).
-record(top_k, { index :: topk_index() -record(top_k, {
, last_update_time :: pos_integer() index :: topk_index(),
, extra = [] last_update_time :: pos_integer(),
}). extra = []
}).
-record(index_tab, { index :: index()}). -record(index_tab, {index :: index()}).
-type top_k() :: #top_k{}. -type top_k() :: #top_k{}.
-type index_tab() :: #index_tab{}. -type index_tab() :: #index_tab{}.

View File

@ -1,4 +1,5 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{deps, [ {emqx, {path, "../emqx"}} {deps, [{emqx, {path, "../emqx"}}]}.
]}.
{project_plugins, [erlfmt]}.

View File

@ -41,48 +41,52 @@ paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"].
schema(("/slow_subscriptions")) -> schema(("/slow_subscriptions")) ->
#{ #{
'operationId' => slow_subs, 'operationId' => slow_subs,
delete => #{tags => [<<"slow subs">>], delete => #{
description => <<"Clear current data and re count slow topic">>, tags => [<<"slow subs">>],
parameters => [], description => <<"Clear current data and re count slow topic">>,
'requestBody' => [], parameters => [],
responses => #{204 => <<"No Content">>} 'requestBody' => [],
}, responses => #{204 => <<"No Content">>}
get => #{tags => [<<"slow subs">>], },
description => <<"Get slow topics statistics record data">>, get => #{
parameters => [ {page, mk(pos_integer(), #{in => query})} tags => [<<"slow subs">>],
, {limit, mk(pos_integer(), #{in => query})} description => <<"Get slow topics statistics record data">>,
], parameters => [
'requestBody' => [], {page, mk(pos_integer(), #{in => query})},
responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]} {limit, mk(pos_integer(), #{in => query})}
} ],
}; 'requestBody' => [],
responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
}
};
schema("/slow_subscriptions/settings") -> schema("/slow_subscriptions/settings") ->
#{'operationId' => settings, #{
get => #{tags => [<<"slow subs">>], 'operationId' => settings,
description => <<"Get slow subs settings">>, get => #{
responses => #{200 => conf_schema()} tags => [<<"slow subs">>],
}, description => <<"Get slow subs settings">>,
put => #{tags => [<<"slow subs">>], responses => #{200 => conf_schema()}
description => <<"Update slow subs settings">>, },
'requestBody' => conf_schema(), put => #{
responses => #{200 => conf_schema()} tags => [<<"slow subs">>],
} description => <<"Update slow subs settings">>,
}. 'requestBody' => conf_schema(),
responses => #{200 => conf_schema()}
}
}.
fields(record) -> fields(record) ->
[ {clientid, [
mk(string(), #{desc => <<"the clientid">>})}, {clientid, mk(string(), #{desc => <<"the clientid">>})},
{node, {node, mk(string(), #{desc => <<"the node">>})},
mk(string(), #{desc => <<"the node">>})}, {topic, mk(string(), #{desc => <<"the topic">>})},
{topic, {timespan,
mk(string(), #{desc => <<"the topic">>})}, mk(
{timespan, integer(),
mk(integer(), #{desc => <<"timespan for message transmission">>}
#{desc => <<"timespan for message transmission">>})}, )},
{last_update_time, {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})}
mk(integer(), #{desc => <<"the timestamp of last update">>})}
]. ].
conf_schema() -> conf_schema() ->
@ -92,17 +96,17 @@ conf_schema() ->
slow_subs(delete, _) -> slow_subs(delete, _) ->
_ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end), _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
{204}; {204};
slow_subs(get, _) -> slow_subs(get, _) ->
NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end), NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
Fun = fun({ok, L}, Acc) -> L ++ Acc; Fun = fun
(_, Acc) -> Acc ({ok, L}, Acc) -> L ++ Acc;
end, (_, Acc) -> Acc
end,
RankL = lists:foldl(Fun, [], NodeRankL), RankL = lists:foldl(Fun, [], NodeRankL),
SortFun = fun(#{timespan := A}, #{timespan := B}) -> SortFun = fun(#{timespan := A}, #{timespan := B}) ->
A > B A > B
end, end,
SortedL = lists:sort(SortFun, RankL), SortedL = lists:sort(SortFun, RankL),
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE), SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
@ -112,22 +116,25 @@ slow_subs(get, _) ->
get_history() -> get_history() ->
Node = node(), Node = node(),
RankL = ets:tab2list(?TOPK_TAB), RankL = ets:tab2list(?TOPK_TAB),
ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)), ConvFun = fun(
last_update_time = LastUpdateTime #top_k{
}) -> index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
#{ clientid => ClientId last_update_time = LastUpdateTime
, node => Node }
, topic => Topic ) ->
, timespan => TimeSpan #{
, last_update_time => LastUpdateTime clientid => ClientId,
} node => Node,
end, topic => Topic,
timespan => TimeSpan,
last_update_time => LastUpdateTime
}
end,
lists:map(ConvFun, RankL). lists:map(ConvFun, RankL).
settings(get, _) -> settings(get, _) ->
{200, emqx:get_raw_config([slow_subs], #{})}; {200, emqx:get_raw_config([slow_subs], #{})};
settings(put, #{body := Body}) -> settings(put, #{body := Body}) ->
case emqx_slow_subs:update_settings(Body) of case emqx_slow_subs:update_settings(Body) of
{ok, #{config := NewConf}} -> {ok, #{config := NewConf}} ->

View File

@ -18,9 +18,10 @@
-behaviour(application). -behaviour(application).
-export([ start/2 -export([
, stop/1 start/2,
]). stop/1
]).
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_slow_subs_sup:start_link(), {ok, Sup} = emqx_slow_subs_sup:start_link(),

View File

@ -9,23 +9,32 @@ namespace() -> "slow_subs".
roots() -> ["slow_subs"]. roots() -> ["slow_subs"].
fields("slow_subs") -> fields("slow_subs") ->
[ {enable, sc(boolean(), false, "Enable this feature.")} [
, {threshold, {enable, sc(boolean(), false, "Enable this feature.")},
sc(emqx_schema:duration_ms(), {threshold,
"500ms", sc(
"The latency threshold for statistics, the minimum value is 100ms.")} emqx_schema:duration_ms(),
, {expire_interval, "500ms",
sc(emqx_schema:duration_ms(), "The latency threshold for statistics, the minimum value is 100ms."
"300s", )},
"The eviction time of the record, which in the statistics record table.")} {expire_interval,
, {top_k_num, sc(
sc(pos_integer(), emqx_schema:duration_ms(),
10, "300s",
"The maximum number of records in the slow subscription statistics record table.")} "The eviction time of the record, which in the statistics record table."
, {stats_type, )},
sc(hoconsc:union([whole, internal, response]), {top_k_num,
whole, sc(
"The method to calculate the latency.")} pos_integer(),
10,
"The maximum number of records in the slow subscription statistics record table."
)},
{stats_type,
sc(
hoconsc:union([whole, internal, response]),
whole,
"The method to calculate the latency."
)}
]. ].
desc("slow_subs") -> desc("slow_subs") ->

View File

@ -27,10 +27,14 @@ start_link() ->
init([]) -> init([]) ->
emqx_slow_subs:init_tab(), emqx_slow_subs:init_tab(),
{ok, {{one_for_one, 10, 3600}, {ok,
[#{id => st_statistics, {{one_for_one, 10, 3600}, [
start => {emqx_slow_subs, start_link, []}, #{
restart => permanent, id => st_statistics,
shutdown => 5000, start => {emqx_slow_subs, start_link, []},
type => worker, restart => permanent,
modules => [emqx_slow_subs]}]}}. shutdown => 5000,
type => worker,
modules => [emqx_slow_subs]
}
]}}.

View File

@ -27,13 +27,17 @@
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(BASE_CONF, <<""" -define(BASE_CONF, <<
slow_subs { ""
enable = true "\n"
top_k_num = 5, "slow_subs {\n"
expire_interval = 5m " enable = true\n"
stats_type = whole " top_k_num = 5,\n"
}""">>). " expire_interval = 5m\n"
" stats_type = whole\n"
" }"
""
>>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -46,7 +50,6 @@ init_per_suite(Config) ->
meck:expect(emqx_alarm, activate, 3, ok), meck:expect(emqx_alarm, activate, 3, ok),
meck:expect(emqx_alarm, deactivate, 3, ok), meck:expect(emqx_alarm, deactivate, 3, ok),
ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF), ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF),
emqx_common_test_helpers:start_apps([emqx_slow_subs]), emqx_common_test_helpers:start_apps([emqx_slow_subs]),
Config. Config.
@ -64,13 +67,13 @@ init_per_testcase(t_expire, Config) ->
Cfg = emqx_config:get([slow_subs]), Cfg = emqx_config:get([slow_subs]),
emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}), emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}),
Config; Config;
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
Config. Config.
end_per_testcase(_, _) -> end_per_testcase(_, _) ->
case erlang:whereis(node()) of case erlang:whereis(node()) of
undefined -> ok; undefined ->
ok;
P -> P ->
erlang:unlink(P), erlang:unlink(P),
erlang:exit(P, kill) erlang:exit(P, kill)
@ -88,21 +91,25 @@ t_pub(_) ->
Now = ?NOW, Now = ?NOW,
%% publish %% publish
lists:foreach(fun(I) -> lists:foreach(
Topic = list_to_binary(io_lib:format("/test1/~p", [I])), fun(I) ->
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
emqx:publish(Msg#message{timestamp = Now - 500}), Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
timer:sleep(100) emqx:publish(Msg#message{timestamp = Now - 500}),
end, timer:sleep(100)
lists:seq(1, 10)), end,
lists:seq(1, 10)
),
lists:foreach(fun(I) -> lists:foreach(
Topic = list_to_binary(io_lib:format("/test2/~p", [I])), fun(I) ->
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
emqx:publish(Msg#message{timestamp = Now - 500}), Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
timer:sleep(100) emqx:publish(Msg#message{timestamp = Now - 500}),
end, timer:sleep(100)
lists:seq(1, 10)), end,
lists:seq(1, 10)
),
timer:sleep(1000), timer:sleep(1000),
Size = ets:info(?TOPK_TAB, size), Size = ets:info(?TOPK_TAB, size),
@ -114,10 +121,12 @@ t_pub(_) ->
t_expire(_) -> t_expire(_) ->
Now = ?NOW, Now = ?NOW,
Each = fun(I) -> Each = fun(I) ->
ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), ets:insert(?TOPK_TAB, #top_k{
last_update_time = Now - timer:minutes(5)}) index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
end, last_update_time = Now - timer:minutes(5)
})
end,
lists:foreach(Each, lists:seq(1, 5)), lists:foreach(Each, lists:seq(1, 5)),
@ -130,10 +139,12 @@ start_client(Subs) ->
[spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
client(I, Subs) -> client(I, Subs) ->
{ok, C} = emqtt:start_link([{host, "localhost"}, {ok, C} = emqtt:start_link([
{clientid, io_lib:format("slow_subs_~p", [I])}, {host, "localhost"},
{username, <<"plain">>}, {clientid, io_lib:format("slow_subs_~p", [I])},
{password, <<"plain">>}]), {username, <<"plain">>},
{password, <<"plain">>}
]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
Len = erlang:length(Subs), Len = erlang:length(Subs),

View File

@ -34,15 +34,18 @@
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<""" -define(CONF_DEFAULT, <<
slow_subs ""
{ "\n"
enable = true "slow_subs\n"
top_k_num = 5, "{\n"
expire_interval = 60000 " enable = true\n"
stats_type = whole " top_k_num = 5,\n"
}""">>). " expire_interval = 60000\n"
" stats_type = whole\n"
"}"
""
>>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -79,7 +82,8 @@ init_per_testcase(_, Config) ->
end_per_testcase(_, Config) -> end_per_testcase(_, Config) ->
application:stop(emqx_slow_subs), application:stop(emqx_slow_subs),
case erlang:whereis(node()) of case erlang:whereis(node()) of
undefined -> ok; undefined ->
ok;
P -> P ->
erlang:unlink(P), erlang:unlink(P),
erlang:exit(P, kill) erlang:exit(P, kill)
@ -89,50 +93,70 @@ end_per_testcase(_, Config) ->
t_get_history(_) -> t_get_history(_) ->
Now = ?NOW, Now = ?NOW,
Each = fun(I) -> Each = fun(I) ->
ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), ets:insert(?TOPK_TAB, #top_k{
last_update_time = Now}) index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
end, last_update_time = Now
})
end,
lists:foreach(Each, lists:seq(1, 5)), lists:foreach(Each, lists:seq(1, 5)),
{ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "page=1&limit=10", {ok, Data} = request_api(
auth_header_()), get,
api_path(["slow_subscriptions"]),
"page=1&limit=10",
auth_header_()
),
#{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]),
?assertMatch(#{<<"clientid">> := <<"test_5">>, ?assertMatch(
<<"topic">> := <<"topic">>, #{
<<"last_update_time">> := Now, <<"clientid">> := <<"test_5">>,
<<"node">> := _, <<"topic">> := <<"topic">>,
<<"timespan">> := _}, First). <<"last_update_time">> := Now,
<<"node">> := _,
<<"timespan">> := _
},
First
).
t_clear(_) -> t_clear(_) ->
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)), ets:insert(?TOPK_TAB, #top_k{
last_update_time = ?NOW}), index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)),
last_update_time = ?NOW
}),
{ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], {ok, _} = request_api(
auth_header_()), delete,
api_path(["slow_subscriptions"]),
[],
auth_header_()
),
?assertEqual(0, ets:info(?TOPK_TAB, size)). ?assertEqual(0, ets:info(?TOPK_TAB, size)).
t_settting(_) -> t_settting(_) ->
Conf = emqx:get_config([slow_subs]), Conf = emqx:get_config([slow_subs]),
Conf2 = Conf#{stats_type => internal}, Conf2 = Conf#{stats_type => internal},
{ok, Data} = request_api(put, {ok, Data} = request_api(
api_path(["slow_subscriptions", "settings"]), put,
[], api_path(["slow_subscriptions", "settings"]),
auth_header_(), [],
Conf2), auth_header_(),
Conf2
),
Return = decode_json(Data), Return = decode_json(Data),
?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return),
{ok, GetData} = request_api(get, {ok, GetData} = request_api(
api_path(["slow_subscriptions", "settings"]), get,
[], api_path(["slow_subscriptions", "settings"]),
auth_header_() [],
), auth_header_()
),
timer:sleep(1000), timer:sleep(1000),
@ -151,25 +175,28 @@ request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []). request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) -> request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of NewUrl =
"" -> Url; case QueryParams of
_ -> Url ++ "?" ++ QueryParams "" -> Url;
end, _ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]}); do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) -> request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of NewUrl =
"" -> Url; case QueryParams of
_ -> Url ++ "?" ++ QueryParams "" -> Url;
end, _ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)-> do_request_api(Method, Request) ->
ct:pal("Method: ~p, Request: ~p", [Method, Request]), ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], [{body_format, binary}]) of case httpc:request(Method, Request, [], [{body_format, binary}]) of
{error, socket_closed_remotely} -> {error, socket_closed_remotely} ->
{error, socket_closed_remotely}; {error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} } {ok, {{"HTTP/1.1", Code, _}, _, Return}} when
when Code =:= 200 orelse Code =:= 204 -> Code =:= 200 orelse Code =:= 204
->
{ok, Return}; {ok, Return};
{ok, {Reason, _, _}} -> {ok, {Reason, _, _}} ->
{error, Reason} {error, Reason}
@ -181,8 +208,8 @@ auth_header_() ->
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) -> auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])), Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
{"Authorization","Basic " ++ Encoded}. {"Authorization", "Basic " ++ Encoded}.
api_path(Parts)-> api_path(Parts) ->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).