refactor(emqx_st_statistics): optimize the implementation of topk

This commit is contained in:
lafirest 2021-11-05 16:11:48 +08:00 committed by firest
parent 77c82cf189
commit 354b0bc08e
3 changed files with 118 additions and 99 deletions

View File

@ -1,9 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{<<".*">>, []}
],
[
{<<".*">>, []}
]
}.

View File

@ -40,28 +40,30 @@
-compile(nowarn_unused_type).
-type state() :: #{ config := proplist:proplist()
, index := index_map()
, begin_time := pos_integer()
, period := pos_integer()
, last_tick_at := pos_integer()
, counter := counters:counter_ref()
, enable := boolean()
}.
-type log() :: #{ topic := emqx_types:topic()
, times := pos_integer()
, count := pos_integer()
, average := float()
}.
-type window_log() :: #{ begin_time := pos_integer()
-type window_log() :: #{ last_tick_at := pos_integer()
, logs := [log()]
}.
-record(slow_log, { topic :: emqx_types:topic()
, times :: non_neg_integer()
, elapsed :: non_neg_integer()
, count :: pos_integer()
, elapsed :: pos_integer()
}).
-record(top_k, { key :: any()
, average :: float()}).
-record(top_k, { rank :: pos_integer()
, topic :: emqx_types:topic()
, average_count :: number()
, average_elapsed :: number()}).
-type message() :: #message{}.
@ -70,11 +72,11 @@
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(NOW, erlang:system_time(millisecond)).
-define(TOP_KEY(Times, Topic), {Times, Topic}).
-define(QUOTA_IDX, 1).
-type top_key() :: ?TOP_KEY(pos_integer(), emqx_types:topic()).
-type index_map() :: #{emqx_types:topic() => pos_integer()}.
-type slow_log() :: #slow_log{}.
-type top_k() :: #top_k{}.
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
%% erlang term order
%% number < atom < reference < fun < port < pid < tuple < list < bit string
@ -124,8 +126,8 @@ init([Env]) ->
Threshold = get_value(threshold_time, Env),
load(Threshold, Counter),
{ok, #{config => Env,
index => #{},
begin_time => ?NOW,
period => 1,
last_tick_at => ?NOW,
counter => Counter,
enable => true}}.
@ -152,11 +154,11 @@ handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(notification_tick, #{config := Cfg} = State) ->
handle_info(notification_tick, #{config := Cfg, period := Period} = State) ->
notification_tick(Cfg),
Index2 = do_notification(State),
{noreply, State#{index := Index2,
begin_time := ?NOW}};
do_notification(State),
{noreply, State#{last_tick_at := ?NOW,
period := Period + 1}};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
@ -183,9 +185,9 @@ init_log_tab(_) ->
]).
init_topk_tab(_) ->
?TOPK_TAB = ets:new(?TOPK_TAB, [ ordered_set, protected, named_table
, {keypos, #top_k.key}, {write_concurrency, true}
, {read_concurrency, false}
?TOPK_TAB = ets:new(?TOPK_TAB, [ set, protected, named_table
, {keypos, #top_k.rank}, {write_concurrency, false}
, {read_concurrency, true}
]).
-spec get_log_quota(counter:counter_ref()) -> boolean().
@ -203,107 +205,104 @@ set_log_quota(Cfg, Counter) ->
MaxLogNum = get_value(max_log_num, Cfg),
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
-spec update_log(message(), non_neg_integer()) -> ok.
-spec update_log(message(), pos_integer()) -> ok.
update_log(#message{topic = Topic}, Elapsed) ->
_ = ets:update_counter(?LOG_TAB,
Topic,
[{#slow_log.times, 1}, {#slow_log.elapsed, Elapsed}],
[{#slow_log.count, 1}, {#slow_log.elapsed, Elapsed}],
#slow_log{topic = Topic,
times = 1,
elapsed = Elapsed}),
count = 1,
elapsed = Elapsed}),
ok.
-spec do_notification(state()) -> index_map().
do_notification(#{begin_time := BeginTime,
-spec do_notification(state()) -> true.
do_notification(#{last_tick_at := TickTime,
config := Cfg,
index := IndexMap,
period := Period,
counter := Counter}) ->
Logs = ets:tab2list(?LOG_TAB),
ets:delete_all_objects(?LOG_TAB),
start_publish(Logs, BeginTime, Cfg),
start_publish(Logs, TickTime, Cfg),
set_log_quota(Cfg, Counter),
MaxRecord = get_value(top_k_num, Cfg),
Size = ets:info(?TOPK_TAB, size),
update_top_k(Logs, erlang:max(0, MaxRecord - Size), IndexMap).
update_topk(Logs, MaxRecord, Period).
-spec update_top_k(list(#slow_log{}), non_neg_integer(), index_map()) -> index_map().
update_top_k([#slow_log{topic = Topic,
times = NewTimes,
elapsed = Elapsed} = Log | T],
Left,
IndexMap) ->
case maps:get(Topic, IndexMap, 0) of
0 ->
try_insert_new(Log, Left, T, IndexMap);
Times ->
[#top_k{key = Key, average = Average}] = ets:lookup(?TOPK_TAB, ?TOP_KEY(Times, Topic)),
Times2 = Times + NewTimes,
Total = Times * Average + Elapsed,
Average2 = Total / Times2,
ets:delete(?TOPK_TAB, Key),
ets:insert(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times2, Topic), average = Average2}),
update_top_k(T, Left, IndexMap#{Topic := Times2})
-spec update_topk(list(slow_log()), pos_integer(), pos_integer()) -> true.
update_topk(Logs, MaxRecord, Period) ->
TopkMap = get_topk_map(Period),
TopkMap2 = update_topk_map(Logs, Period, TopkMap),
SortFun = fun(A, B) ->
A#top_k.average_count > B#top_k.average_count
end,
TopkL = lists:sort(SortFun, maps:values(TopkMap2)),
TopkL2 = lists:sublist(TopkL, 1, MaxRecord),
update_topk_tab(TopkL2).
-spec update_topk_map(list(slow_log()), pos_integer(), top_k_map()) -> top_k_map().
update_topk_map([#slow_log{topic = Topic,
count = LogTimes,
elapsed = LogElapsed} | T], Period, TopkMap) ->
case maps:get(Topic, TopkMap, undefined) of
undefined ->
Record = #top_k{rank = 1,
topic = Topic,
average_count = LogTimes,
average_elapsed = LogElapsed},
TopkMap2 = TopkMap#{Topic => Record},
update_topk_map(T, Period, TopkMap2);
#top_k{average_count = AvgCount,
average_elapsed = AvgElapsed} = Record ->
NewPeriod = Period + 1,
%% (a + b) / c = a / c + b / c
%% average_count(elapsed) dived NewPeriod in function get_topk_maps
AvgCount2 = AvgCount + LogTimes / NewPeriod,
AvgElapsed2 = AvgElapsed + LogElapsed / NewPeriod,
Record2 = Record#top_k{average_count = AvgCount2,
average_elapsed = AvgElapsed2},
update_topk_map(T, Period, TopkMap#{Topic := Record2})
end;
update_top_k([], _, IndexMap) ->
IndexMap.
update_topk_map([], _, TopkMap) ->
TopkMap.
-spec try_insert_new(#slow_log{},
non_neg_integer(), list(#slow_log{}), index_map()) -> index_map().
try_insert_new(#slow_log{topic = Topic,
times = Times,
elapsed = Elapsed}, Left, Logs, IndexMap) when Left > 0 ->
Average = Elapsed / Times,
ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}),
update_top_k(Logs, Left - 1, IndexMap#{Topic => Times});
-spec update_topk_tab(list(top_k())) -> true.
update_topk_tab(Records) ->
Zip = fun(Rank, Item) -> Item#top_k{rank = Rank} end,
Len = erlang:length(Records),
RankedTopics = lists:zipwith(Zip, lists:seq(1, Len), Records),
ets:insert(?TOPK_TAB, RankedTopics).
try_insert_new(#slow_log{topic = Topic,
times = Times,
elapsed = Elapsed}, Left, Logs, IndexMap) ->
?TOP_KEY(MinTimes, MinTopic) = MinKey = ets:first(?TOPK_TAB),
case MinTimes > Times of
true ->
update_top_k(Logs, Left, IndexMap);
_ ->
Average = Elapsed / Times,
ets:delete(?TOPK_TAB, MinKey),
ets:insert_new(?TOPK_TAB, #top_k{key = ?TOP_KEY(Times, Topic), average = Average}),
update_top_k(Logs,
Left - 1,
maps:put(Topic, Times, maps:remove(MinTopic, IndexMap)))
end.
start_publish(Logs, BeginTime, Cfg) ->
emqx_pool:async_submit({fun do_publish/3, [Logs, BeginTime, Cfg]}).
start_publish(Logs, TickTime, Cfg) ->
emqx_pool:async_submit({fun do_publish/3, [Logs, TickTime, Cfg]}).
do_publish([], _, _) ->
ok;
do_publish(Logs, BeginTime, Cfg) ->
do_publish(Logs, TickTime, Cfg) ->
BatchSize = get_value(notice_batch_size, Cfg),
do_publish(Logs, BatchSize, BeginTime, Cfg, []).
do_publish(Logs, BatchSize, TickTime, Cfg, []).
do_publish([Log | T], Size, BeginTime, Cfg, Cache) when Size > 0 ->
do_publish([Log | T], Size, TickTime, Cfg, Cache) when Size > 0 ->
Cache2 = [convert_to_notice(Log) | Cache],
do_publish(T, Size - 1, BeginTime, Cfg, Cache2);
do_publish(T, Size - 1, TickTime, Cfg, Cache2);
do_publish(Logs, Size, BeginTime, Cfg, Cache) when Size =:= 0 ->
publish(BeginTime, Cfg, Cache),
do_publish(Logs, BeginTime, Cfg);
do_publish(Logs, Size, TickTime, Cfg, Cache) when Size =:= 0 ->
publish(TickTime, Cfg, Cache),
do_publish(Logs, TickTime, Cfg);
do_publish([], _, BeginTime, Cfg, Cache) ->
publish(BeginTime, Cfg, Cache),
do_publish([], _, TickTime, Cfg, Cache) ->
publish(TickTime, Cfg, Cache),
ok.
convert_to_notice(#slow_log{topic = Topic,
times = Times,
count = Count,
elapsed = Elapsed}) ->
#{topic => Topic,
times => Times,
average => Elapsed / Times}.
count => Count,
average => Elapsed / Count}.
publish(BeginTime, Cfg, Notices) ->
WindowLog = #{begin_time => BeginTime,
publish(TickTime, Cfg, Notices) ->
WindowLog = #{last_tick_at => TickTime,
logs => Notices},
Payload = emqx_json:encode(WindowLog),
_ = emqx:publish(#message{ id = emqx_guid:gen()
@ -322,6 +321,7 @@ load(Threshold, Counter) ->
unload() ->
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
-spec get_topic(proplists:proplist()) -> binary().
get_topic(Cfg) ->
case get_value(notice_topic, Cfg) of
Topic when is_binary(Topic) ->
@ -329,3 +329,24 @@ get_topic(Cfg) ->
Topic ->
erlang:list_to_binary(Topic)
end.
-spec get_topk_map(pos_integer()) -> top_k_map().
get_topk_map(Period) ->
Size = ets:info(?TOPK_TAB, size),
get_topk_map(1, Size, Period, #{}).
-spec get_topk_map(pos_integer(),
non_neg_integer(), pos_integer(), top_k_map()) -> top_k_map().
get_topk_map(Index, Size, _, TopkMap) when Index > Size ->
TopkMap;
get_topk_map(Index, Size, Period, TopkMap) ->
[#top_k{topic = Topic,
average_count = AvgCount,
average_elapsed = AvgElapsed} = R] = ets:lookup(?TOPK_TAB, Index),
NewPeriod = Period + 1,
TotalTimes = AvgCount * Period,
AvgCount2 = TotalTimes / NewPeriod,
AvgElapsed2 = TotalTimes * AvgElapsed / NewPeriod,
TopkMap2 = TopkMap#{Topic => R#top_k{average_count = AvgCount2,
average_elapsed = AvgElapsed2}},
get_topk_map(Index + 1, Size, Period, TopkMap2).

View File

@ -61,7 +61,14 @@ t_log_and_pub(_) ->
end,
lists:seq(1, 10)),
?assert(ets:info(?LOG_TAB, size) =:= 5),
timer:sleep(100),
case ets:info(?LOG_TAB, size) of
5 ->
ok;
_ ->
?assert(ets:info(?TOPK_TAB, size) =/= 0)
end,
timer:sleep(2400),