diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics.appup.src b/apps/emqx_st_statistics/src/emqx_st_statistics.appup.src deleted file mode 100644 index dcf0d8cdd..000000000 --- a/apps/emqx_st_statistics/src/emqx_st_statistics.appup.src +++ /dev/null @@ -1,9 +0,0 @@ -%% -*-: erlang -*- -{VSN, - [ - {<<".*">>, []} - ], - [ - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_st_statistics/src/emqx_st_statistics.erl b/apps/emqx_st_statistics/src/emqx_st_statistics.erl index 8a6c1535f..801a21c96 100644 --- a/apps/emqx_st_statistics/src/emqx_st_statistics.erl +++ b/apps/emqx_st_statistics/src/emqx_st_statistics.erl @@ -40,28 +40,30 @@ -compile(nowarn_unused_type). -type state() :: #{ config := proplist:proplist() - , index := index_map() - , begin_time := pos_integer() + , period := pos_integer() + , last_tick_at := pos_integer() , counter := counters:counter_ref() , enable := boolean() }. -type log() :: #{ topic := emqx_types:topic() - , times := pos_integer() + , count := pos_integer() , average := float() }. --type window_log() :: #{ begin_time := pos_integer() +-type window_log() :: #{ last_tick_at := pos_integer() , logs := [log()] }. -record(slow_log, { topic :: emqx_types:topic() - , times :: non_neg_integer() - , elapsed :: non_neg_integer() + , count :: pos_integer() + , elapsed :: pos_integer() }). --record(top_k, { key :: any() - , average :: float()}). +-record(top_k, { rank :: pos_integer() + , topic :: emqx_types:topic() + , average_count :: number() + , average_elapsed :: number()}). -type message() :: #message{}. @@ -70,11 +72,11 @@ -define(LOG_TAB, emqx_st_statistics_log). -define(TOPK_TAB, emqx_st_statistics_topk). -define(NOW, erlang:system_time(millisecond)). --define(TOP_KEY(Times, Topic), {Times, Topic}). -define(QUOTA_IDX, 1). --type top_key() :: ?TOP_KEY(pos_integer(), emqx_types:topic()). --type index_map() :: #{emqx_types:topic() => pos_integer()}. +-type slow_log() :: #slow_log{}. +-type top_k() :: #top_k{}. +-type top_k_map() :: #{emqx_types:topic() => top_k()}. %% erlang term order %% number < atom < reference < fun < port < pid < tuple < list < bit string @@ -124,8 +126,8 @@ init([Env]) -> Threshold = get_value(threshold_time, Env), load(Threshold, Counter), {ok, #{config => Env, - index => #{}, - begin_time => ?NOW, + period => 1, + last_tick_at => ?NOW, counter => Counter, enable => true}}. @@ -152,11 +154,11 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info(notification_tick, #{config := Cfg} = State) -> +handle_info(notification_tick, #{config := Cfg, period := Period} = State) -> notification_tick(Cfg), - Index2 = do_notification(State), - {noreply, State#{index := Index2, - begin_time := ?NOW}}; + do_notification(State), + {noreply, State#{last_tick_at := ?NOW, + period := Period + 1}}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), @@ -183,9 +185,9 @@ init_log_tab(_) -> ]). init_topk_tab(_) -> - ?TOPK_TAB = ets:new(?TOPK_TAB, [ ordered_set, protected, named_table - , {keypos, #top_k.key}, {write_concurrency, true} - , {read_concurrency, false} + ?TOPK_TAB = ets:new(?TOPK_TAB, [ set, protected, named_table + , {keypos, #top_k.rank}, {write_concurrency, false} + , {read_concurrency, true} ]). -spec get_log_quota(counter:counter_ref()) -> boolean(). @@ -203,107 +205,104 @@ set_log_quota(Cfg, Counter) -> MaxLogNum = get_value(max_log_num, Cfg), counters:put(Counter, ?QUOTA_IDX, MaxLogNum). --spec update_log(message(), non_neg_integer()) -> ok. +-spec update_log(message(), pos_integer()) -> ok. update_log(#message{topic = Topic}, Elapsed) -> _ = ets:update_counter(?LOG_TAB, Topic, - [{#slow_log.times, 1}, {#slow_log.elapsed, Elapsed}], + [{#slow_log.count, 1}, {#slow_log.elapsed, Elapsed}], #slow_log{topic = Topic, - times = 1, - elapsed = Elapsed}), + count = 1, + elapsed = Elapsed}), ok. --spec do_notification(state()) -> index_map(). -do_notification(#{begin_time := BeginTime, +-spec do_notification(state()) -> true. +do_notification(#{last_tick_at := TickTime, config := Cfg, - index := IndexMap, + period := Period, counter := Counter}) -> Logs = ets:tab2list(?LOG_TAB), ets:delete_all_objects(?LOG_TAB), - start_publish(Logs, BeginTime, Cfg), + start_publish(Logs, TickTime, Cfg), set_log_quota(Cfg, Counter), MaxRecord = get_value(top_k_num, Cfg), - Size = ets:info(?TOPK_TAB, size), - update_top_k(Logs, erlang:max(0, MaxRecord - Size), IndexMap). + update_topk(Logs, MaxRecord, Period). --spec update_top_k(list(#slow_log{}), non_neg_integer(), index_map()) -> index_map(). -update_top_k([#slow_log{topic = Topic, - times = NewTimes, - elapsed = Elapsed} = Log | T], - Left, - IndexMap) -> - case maps:get(Topic, IndexMap, 0) of - 0 -> - try_insert_new(Log, Left, T, IndexMap); - Times -> - [#top_k{key = Key, average = Average}] = ets:lookup(?TOPK_TAB, ?TOP_KEY(Times, Topic)), - Times2 = Times + NewTimes, - Total = Times * Average + Elapsed, - Average2 = Total / Times2, - ets:delete(?TOPK_TAB, Key), - ets:insert(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times2, Topic), average = Average2}), - update_top_k(T, Left, IndexMap#{Topic := Times2}) +-spec update_topk(list(slow_log()), pos_integer(), pos_integer()) -> true. +update_topk(Logs, MaxRecord, Period) -> + TopkMap = get_topk_map(Period), + TopkMap2 = update_topk_map(Logs, Period, TopkMap), + SortFun = fun(A, B) -> + A#top_k.average_count > B#top_k.average_count + end, + TopkL = lists:sort(SortFun, maps:values(TopkMap2)), + TopkL2 = lists:sublist(TopkL, 1, MaxRecord), + update_topk_tab(TopkL2). + +-spec update_topk_map(list(slow_log()), pos_integer(), top_k_map()) -> top_k_map(). +update_topk_map([#slow_log{topic = Topic, + count = LogTimes, + elapsed = LogElapsed} | T], Period, TopkMap) -> + case maps:get(Topic, TopkMap, undefined) of + undefined -> + Record = #top_k{rank = 1, + topic = Topic, + average_count = LogTimes, + average_elapsed = LogElapsed}, + TopkMap2 = TopkMap#{Topic => Record}, + update_topk_map(T, Period, TopkMap2); + #top_k{average_count = AvgCount, + average_elapsed = AvgElapsed} = Record -> + NewPeriod = Period + 1, + %% (a + b) / c = a / c + b / c + %% average_count(elapsed) dived NewPeriod in function get_topk_maps + AvgCount2 = AvgCount + LogTimes / NewPeriod, + AvgElapsed2 = AvgElapsed + LogElapsed / NewPeriod, + Record2 = Record#top_k{average_count = AvgCount2, + average_elapsed = AvgElapsed2}, + update_topk_map(T, Period, TopkMap#{Topic := Record2}) end; -update_top_k([], _, IndexMap) -> - IndexMap. +update_topk_map([], _, TopkMap) -> + TopkMap. --spec try_insert_new(#slow_log{}, - non_neg_integer(), list(#slow_log{}), index_map()) -> index_map(). -try_insert_new(#slow_log{topic = Topic, - times = Times, - elapsed = Elapsed}, Left, Logs, IndexMap) when Left > 0 -> - Average = Elapsed / Times, - ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}), - update_top_k(Logs, Left - 1, IndexMap#{Topic => Times}); +-spec update_topk_tab(list(top_k())) -> true. +update_topk_tab(Records) -> + Zip = fun(Rank, Item) -> Item#top_k{rank = Rank} end, + Len = erlang:length(Records), + RankedTopics = lists:zipwith(Zip, lists:seq(1, Len), Records), + ets:insert(?TOPK_TAB, RankedTopics). -try_insert_new(#slow_log{topic = Topic, - times = Times, - elapsed = Elapsed}, Left, Logs, IndexMap) -> - ?TOP_KEY(MinTimes, MinTopic) = MinKey = ets:first(?TOPK_TAB), - case MinTimes > Times of - true -> - update_top_k(Logs, Left, IndexMap); - _ -> - Average = Elapsed / Times, - ets:delete(?TOPK_TAB, MinKey), - ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}), - update_top_k(Logs, - Left - 1, - maps:put(Topic, Times, maps:remove(MinTopic, IndexMap))) - end. - -start_publish(Logs, BeginTime, Cfg) -> - emqx_pool:async_submit({fun do_publish/3, [Logs, BeginTime, Cfg]}). +start_publish(Logs, TickTime, Cfg) -> + emqx_pool:async_submit({fun do_publish/3, [Logs, TickTime, Cfg]}). do_publish([], _, _) -> ok; -do_publish(Logs, BeginTime, Cfg) -> +do_publish(Logs, TickTime, Cfg) -> BatchSize = get_value(notice_batch_size, Cfg), - do_publish(Logs, BatchSize, BeginTime, Cfg, []). + do_publish(Logs, BatchSize, TickTime, Cfg, []). -do_publish([Log | T], Size, BeginTime, Cfg, Cache) when Size > 0 -> +do_publish([Log | T], Size, TickTime, Cfg, Cache) when Size > 0 -> Cache2 = [convert_to_notice(Log) | Cache], - do_publish(T, Size - 1, BeginTime, Cfg, Cache2); + do_publish(T, Size - 1, TickTime, Cfg, Cache2); -do_publish(Logs, Size, BeginTime, Cfg, Cache) when Size =:= 0 -> - publish(BeginTime, Cfg, Cache), - do_publish(Logs, BeginTime, Cfg); +do_publish(Logs, Size, TickTime, Cfg, Cache) when Size =:= 0 -> + publish(TickTime, Cfg, Cache), + do_publish(Logs, TickTime, Cfg); -do_publish([], _, BeginTime, Cfg, Cache) -> - publish(BeginTime, Cfg, Cache), +do_publish([], _, TickTime, Cfg, Cache) -> + publish(TickTime, Cfg, Cache), ok. convert_to_notice(#slow_log{topic = Topic, - times = Times, + count = Count, elapsed = Elapsed}) -> #{topic => Topic, - times => Times, - average => Elapsed / Times}. + count => Count, + average => Elapsed / Count}. -publish(BeginTime, Cfg, Notices) -> - WindowLog = #{begin_time => BeginTime, +publish(TickTime, Cfg, Notices) -> + WindowLog = #{last_tick_at => TickTime, logs => Notices}, Payload = emqx_json:encode(WindowLog), _ = emqx:publish(#message{ id = emqx_guid:gen() @@ -322,6 +321,7 @@ load(Threshold, Counter) -> unload() -> emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). +-spec get_topic(proplists:proplist()) -> binary(). get_topic(Cfg) -> case get_value(notice_topic, Cfg) of Topic when is_binary(Topic) -> @@ -329,3 +329,24 @@ get_topic(Cfg) -> Topic -> erlang:list_to_binary(Topic) end. + +-spec get_topk_map(pos_integer()) -> top_k_map(). +get_topk_map(Period) -> + Size = ets:info(?TOPK_TAB, size), + get_topk_map(1, Size, Period, #{}). + +-spec get_topk_map(pos_integer(), + non_neg_integer(), pos_integer(), top_k_map()) -> top_k_map(). +get_topk_map(Index, Size, _, TopkMap) when Index > Size -> + TopkMap; +get_topk_map(Index, Size, Period, TopkMap) -> + [#top_k{topic = Topic, + average_count = AvgCount, + average_elapsed = AvgElapsed} = R] = ets:lookup(?TOPK_TAB, Index), + NewPeriod = Period + 1, + TotalTimes = AvgCount * Period, + AvgCount2 = TotalTimes / NewPeriod, + AvgElapsed2 = TotalTimes * AvgElapsed / NewPeriod, + TopkMap2 = TopkMap#{Topic => R#top_k{average_count = AvgCount2, + average_elapsed = AvgElapsed2}}, + get_topk_map(Index + 1, Size, Period, TopkMap2). diff --git a/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl b/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl index c08c8d986..e8bdd129f 100644 --- a/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl +++ b/apps/emqx_st_statistics/test/emqx_st_statistics_SUITE.erl @@ -61,7 +61,14 @@ t_log_and_pub(_) -> end, lists:seq(1, 10)), - ?assert(ets:info(?LOG_TAB, size) =:= 5), + timer:sleep(100), + + case ets:info(?LOG_TAB, size) of + 5 -> + ok; + _ -> + ?assert(ets:info(?TOPK_TAB, size) =/= 0) + end, timer:sleep(2400),