From b09683bfcd49454b629cde25b747133d000898aa Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 9 Feb 2022 18:18:06 +0800 Subject: [PATCH] fix(emqx_slow_subs): fix some errors and test cases --- apps/emqx/src/emqx_cm.erl | 3 +- apps/emqx/src/emqx_schema.erl | 7 ---- apps/emqx/src/emqx_session.erl | 8 ++--- apps/emqx/src/emqx_zone_schema.erl | 2 +- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 16 ++++----- .../emqx_slow_subs/src/emqx_slow_subs_sup.erl | 2 +- .../test/emqx_slow_subs_SUITE.erl | 26 +++----------- .../test/emqx_slow_subs_api_SUITE.erl | 34 +++++++------------ 8 files changed, 29 insertions(+), 69 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 3c46c6b54..23c47660c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -322,8 +322,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{receive_maximum := Ma %% TODO: Add conf for allowing/disallowing persistent sessions. %% Note that the connection info is already enriched to have %% default config values for session expiry. - is_persistent => EI > 0, - latency_stats => emqx_config:get_zone_conf(Zone, [latency_stats]) + is_persistent => EI > 0 }. mqueue_confs(Zone) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ad6d7c9ac..eba7d2617 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -182,9 +182,6 @@ this number of messages or bytes have passed through.""" , {"persistent_session_store", sc(ref("persistent_session_store"), #{})} - , {"latency_stats", - sc(ref("latency_stats"), - #{})} , {"trace", sc(ref("trace"), #{desc => """ @@ -1105,10 +1102,6 @@ when deactivated, but after the retention time. } ]; -fields("latency_stats") -> - [ {"samples", sc(integer(), #{default => 10, - desc => "the number of samples for calculate the average latency of delivery"})} - ]; fields("trace") -> [ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{ default => text, diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 5e65b67e5..7851ae56f 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -183,7 +183,6 @@ , mqueue => emqx_mqueue:options() , is_persistent => boolean() , clientid => emqx_types:clientid() - , latency_stats => emqx_message_latency_stats:create_options() }. %%-------------------------------------------------------------------- @@ -211,8 +210,7 @@ init(Opts) -> awaiting_rel = #{}, max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), - created_at = erlang:system_time(millisecond), - latency_stats = emqx_message_latency_stats:new(maps:get(latency_stats, Opts, #{})) + created_at = erlang:system_time(millisecond) }. %%-------------------------------------------------------------------- @@ -268,9 +266,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt; -info(latency_stats, #session{latency_stats = Stats}) -> - emqx_message_latency_stats:latency(Stats). + CreatedAt. %% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index c2dfcf1a6..140cd1aca 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -24,7 +24,7 @@ namespace() -> zone. %% roots are added only for document generation. roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown", "conn_congestion", "rate_limit", "quota", "force_gc", - "overload_protection", "latency_stats" + "overload_protection" ]. %% zone schemas are clones from the same name from root level diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 2ae8c0b31..bd61ff5a6 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -22,7 +22,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). --export([ start_link/0, on_delivery_completed/4, update_settings/1 +-export([ start_link/0, on_delivery_completed/3, update_settings/1 , clear_history/0, init_tab/0, post_config_update/5 ]). @@ -77,17 +77,15 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -on_delivery_completed(_ClientInfo, - #message{timestamp = Ts}, - #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> +on_delivery_completed(#message{timestamp = Ts}, + #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> ok; -on_delivery_completed(ClientInfo, Msg, Env, Cfg) -> - on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg). +on_delivery_completed(Msg, Env, Cfg) -> + on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). -on_delivery_completed(#{clientid := ClientId}, - #message{topic = Topic} = Msg, - _Env, +on_delivery_completed(#message{topic = Topic} = Msg, + #{clientid := ClientId}, Now, #{threshold := Threshold, stats_type := StatsType, diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl index ce2c55a15..c4c5625e0 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl @@ -26,7 +26,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - emqx_slow_subs:init_topk_tab(), + emqx_slow_subs:init_tab(), {ok, {{one_for_one, 10, 3600}, [#{id => st_statistics, start => {emqx_slow_subs, start_link, []}, diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl index 79c4bb600..21ca43e87 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -31,10 +31,8 @@ slow_subs { enable = true top_k_num = 5, expire_interval = 3000 - notice_interval = 1500 - notice_qos = 0 - notice_batch_size = 3 -}""">>). + stats_type = whole + }""">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -60,7 +58,6 @@ t_log_and_pub(_) -> %% Sub topic first Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Clients = start_client(Subs), - emqx:subscribe("$SYS/brokers/+/slow_subs"), timer:sleep(1000), Now = ?NOW, %% publish @@ -82,15 +79,9 @@ t_log_and_pub(_) -> timer:sleep(1000), Size = ets:info(?TOPK_TAB, size), %% some time record maybe delete due to it expired - ?assert(Size =< 6 andalso Size >= 4), + ?assert(Size =< 6 andalso Size > 3), - timer:sleep(1500), - Recs = try_receive([]), - RecSum = lists:sum(Recs), - ?assert(RecSum >= 5), - ?assert(lists:all(fun(E) -> E =< 3 end, Recs)), - - timer:sleep(3000), + timer:sleep(4000), ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. @@ -113,12 +104,3 @@ client(I, Subs) -> stop -> ok end. - -try_receive(Acc) -> - receive - {deliver, _, #message{payload = Payload}} -> - #{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]), - try_receive([length(Logs) | Acc]) - after 500 -> - Acc - end. diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index fc2162863..7b25ef634 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -40,9 +40,7 @@ slow_subs enable = true top_k_num = 5, expire_interval = 60000 - notice_interval = 0 - notice_qos = 0 - notice_batch_size = 3 + stats_type = whole }""">>). @@ -92,8 +90,7 @@ t_get_history(_) -> Now = ?NOW, Each = fun(I) -> ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), - ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), - type = average, + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), last_update_time = Now}) end, @@ -101,18 +98,16 @@ t_get_history(_) -> {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", auth_header_()), - #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), + [First | _] = emqx_json:decode(Data, [return_maps]), - RFirst = #{<<"clientid">> => <<"test_5">>, - <<"latency">> => 5, - <<"type">> => <<"average">>, - <<"last_update_time">> => Now}, - - ?assertEqual(RFirst, First). + ?assertMatch(#{<<"clientid">> := <<"test_5">>, + <<"topic">> := <<"topic">>, + <<"last_update_time">> := Now, + <<"node">> := _, + <<"timespan">> := _}, First). t_clear(_) -> - ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), - type = average, + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)), last_update_time = ?NOW}), {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], @@ -122,7 +117,7 @@ t_clear(_) -> t_settting(_) -> Conf = emqx:get_config([slow_subs]), - Conf2 = Conf#{threshold => 1000}, + Conf2 = Conf#{stats_type => internal}, {ok, Data} = request_api(put, api_path(["slow_subscriptions", "settings"]), [], @@ -131,22 +126,19 @@ t_settting(_) -> Return = decode_json(Data), - ?assertEqual(Conf2, Return), + ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), {ok, GetData} = request_api(get, api_path(["slow_subscriptions", "settings"]), [], auth_header_() - ), + ), timer:sleep(1000), GetReturn = decode_json(GetData), - ?assertEqual(Conf2, GetReturn), - - ?assertEqual(1000, - emqx_message_latency_stats:get_threshold()). + ?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn). decode_json(Data) -> BinJosn = emqx_json:decode(Data, [return_maps]),