From 44fe882f14393725ff0a445c70d504e00fa6034a Mon Sep 17 00:00:00 2001 From: lafirest Date: Fri, 7 Jan 2022 17:21:30 +0800 Subject: [PATCH] refactor(emqx_slow_subs): refactor slow subs --- .../include/emqx_slow_subs.hrl | 18 +- .../src/emqx_slow_subs/emqx_slow_subs.erl | 290 ++++++++---------- .../src/emqx_slow_subs/emqx_slow_subs_api.erl | 67 +++- etc/emqx.conf | 21 +- lib-ce/emqx_modules/src/emqx_mod_sup.erl | 2 +- .../test/emqx_slow_subs_SUITE.erl | 56 ++-- .../test/emqx_slow_subs_api_SUITE.erl | 23 +- priv/emqx.schema | 16 +- src/emqx_session.erl | 75 ++--- .../emqx_message_latency_stats.erl | 119 ------- test/emqx_session_SUITE.erl | 24 +- 11 files changed, 300 insertions(+), 411 deletions(-) delete mode 100644 src/emqx_slow_subs/emqx_message_latency_stats.erl diff --git a/apps/emqx_plugin_libs/include/emqx_slow_subs.hrl b/apps/emqx_plugin_libs/include/emqx_slow_subs.hrl index 0b5e3a035..2bb3f9b16 100644 --- a/apps/emqx_plugin_libs/include/emqx_slow_subs.hrl +++ b/apps/emqx_plugin_libs/include/emqx_slow_subs.hrl @@ -15,14 +15,24 @@ %%-------------------------------------------------------------------- -define(TOPK_TAB, emqx_slow_subs_topk). +-define(INDEX_TAB, emqx_slow_subs_index). --define(INDEX(Latency, ClientId), {Latency, ClientId}). +-define(ID(ClientId, Topic), {ClientId, Topic}). +-define(INDEX(TimeSpan, Id), {Id, TimeSpan}). +-define(TOPK_INDEX(TimeSpan, Id), {TimeSpan, Id}). --record(top_k, { index :: index() - , type :: emqx_message_latency_stats:latency_type() +-define(MAX_SIZE, 1000). + +-record(top_k, { index :: topk_index() , last_update_time :: pos_integer() , extra = [] }). +-record(index_tab, { index :: index()}). + -type top_k() :: #top_k{}. --type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()). +-type index_tab() :: #index_tab{}. + +-type id() :: {emqx_types:clientid(), emqx_types:topic()}. +-type index() :: ?INDEX(non_neg_integer(), id()). +-type topk_index() :: ?TOPK_INDEX(non_neg_integer(), id()). diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl index 11d25380e..254d902bd 100644 --- a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl @@ -24,8 +24,8 @@ -logger_header("[SLOW Subs]"). --export([ start_link/1, on_stats_update/2, enable/0 - , disable/0, clear_history/0, init_topk_tab/0 +-export([ start_link/1, on_publish_completed/3, enable/0 + , disable/0, clear_history/0, init_tab/0 ]). %% gen_server callbacks @@ -44,28 +44,19 @@ , last_tick_at := pos_integer() }. --type log() :: #{ rank := pos_integer() - , clientid := emqx_types:clientid() - , latency := non_neg_integer() - , type := emqx_message_latency_stats:latency_type() - }. - --type window_log() :: #{ last_tick_at := pos_integer() - , logs := [log()] - }. - -type message() :: #message{}. -import(proplists, [get_value/2]). --type stats_update_args() :: #{ clientid := emqx_types:clientid() - , latency := non_neg_integer() - , type := emqx_message_latency_stats:latency_type() - , last_insert_value := non_neg_integer() - , update_time := timer:time() - }. +-type stats_type() :: whole %% whole = internal + response + | internal %% timespan from message in to deliver + | response. %% timespan from delivery to client response --type stats_update_env() :: #{max_size := pos_integer()}. +-type stats_update_args() :: #{ clientid := emqx_types:clientid()}. + +-type stats_update_env() :: #{ threshold := non_neg_integer() + , stats_type := stats_type() + , max_size := pos_integer()}. -ifdef(TEST). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). @@ -74,7 +65,6 @@ -endif. -define(NOW, erlang:system_time(millisecond)). --define(NOTICE_TOPIC_NAME, "slow_subs"). -define(DEF_CALL_TIMEOUT, timer:seconds(10)). %% erlang term order @@ -91,36 +81,29 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). -%% XXX NOTE:pay attention to the performance here --spec on_stats_update(stats_update_args(), stats_update_env()) -> true. -on_stats_update(#{clientid := ClientId, - latency := Latency, - type := Type, - last_insert_value := LIV, - update_time := Ts}, - #{max_size := MaxSize}) -> +on_publish_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg) + when Ts =< BirthTime -> + ok; - LastIndex = ?INDEX(LIV, ClientId), - Index = ?INDEX(Latency, ClientId), +on_publish_completed(Msg, Env, Cfg) -> + on_publish_completed(Msg, Env, erlang:system_time(millisecond), Cfg). - %% check whether the client is in the table - case ets:lookup(?TOPK_TAB, LastIndex) of - [#top_k{index = Index}] -> - %% if last value == the new value, update the type and last_update_time - %% XXX for clients whose latency are stable for a long time, is it possible to reduce updates? - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}); - [_] -> - %% if Latency > minimum value, we should update it - %% if Latency < minimum value, maybe it can replace the minimum value - %% so alwyas update at here - %% do we need check if Latency == minimum ??? - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}), - ets:delete(?TOPK_TAB, LastIndex); - [] -> - %% try to insert - try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) +on_publish_completed(#message{topic = Topic} = Msg, + #{clientid := ClientId}, + Now, #{threshold := Threshold, + stats_type := StatsType, + max_size := MaxSize}) -> + TimeSpan = calc_timespan(StatsType, Msg, Now), + case TimeSpan =< Threshold of + true -> ok; + _ -> + Id = ?ID(ClientId, Topic), + LastUpdateValue = find_last_update_value(Id), + case TimeSpan =< LastUpdateValue of + true -> ok; + _ -> + try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) + end end. clear_history() -> @@ -132,26 +115,23 @@ enable() -> disable() -> gen_server:call(?MODULE, {enable, false}, ?DEF_CALL_TIMEOUT). -init_topk_tab() -> - case ets:whereis(?TOPK_TAB) of - undefined -> - ?TOPK_TAB = ets:new(?TOPK_TAB, - [ ordered_set, public, named_table - , {keypos, #top_k.index}, {write_concurrency, true} +init_tab() -> + safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table + , {keypos, #top_k.index}, {write_concurrency, true} + , {read_concurrency, true} + ]), + + safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table + , {keypos, #index_tab.index}, {write_concurrency, true} , {read_concurrency, true} - ]); - _ -> - ?TOPK_TAB - end. + ]). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([Conf]) -> - notice_tick(Conf), expire_tick(Conf), - update_threshold(Conf), load(Conf), {ok, #{config => Conf, last_tick_at => ?NOW, @@ -163,7 +143,6 @@ handle_call({enable, Enable}, _From, IsEnable -> State; true -> - update_threshold(Cfg), load(Cfg), State#{enable := true}; _ -> @@ -173,7 +152,7 @@ handle_call({enable, Enable}, _From, {reply, ok, State2}; handle_call(clear_history, _, State) -> - ets:delete_all_objects(?TOPK_TAB), + do_clear_history(), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -190,12 +169,6 @@ handle_info(expire_tick, #{config := Cfg} = State) -> do_clear(Cfg, Logs), {noreply, State}; -handle_info(notice_tick, #{config := Cfg} = State) -> - notice_tick(Cfg), - Logs = ets:tab2list(?TOPK_TAB), - do_notification(Logs, State), - {noreply, State#{last_tick_at := ?NOW}}; - handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. @@ -213,115 +186,116 @@ code_change(_OldVsn, State, _Extra) -> expire_tick(_) -> erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). -notice_tick(Cfg) -> - case get_value(notice_interval, Cfg) of - 0 -> ok; - Interval -> - erlang:send_after(Interval, self(), ?FUNCTION_NAME), - ok - end. - --spec do_notification(list(), state()) -> ok. -do_notification([], _) -> - ok; - -do_notification(Logs, #{last_tick_at := LastTickTime, config := Cfg}) -> - start_publish(Logs, LastTickTime, Cfg), - ok. - -start_publish(Logs, TickTime, Cfg) -> - emqx_pool:async_submit({fun do_publish/4, [Logs, erlang:length(Logs), TickTime, Cfg]}). - -do_publish([], _, _, _) -> - ok; - -do_publish(Logs, Rank, TickTime, Cfg) -> - BatchSize = get_value(notice_batch_size, Cfg), - do_publish(Logs, BatchSize, Rank, TickTime, Cfg, []). - -do_publish([Log | T], Size, Rank, TickTime, Cfg, Cache) when Size > 0 -> - Cache2 = [convert_to_notice(Rank, Log) | Cache], - do_publish(T, Size - 1, Rank - 1, TickTime, Cfg, Cache2); - -do_publish(Logs, Size, Rank, TickTime, Cfg, Cache) when Size =:= 0 -> - publish(TickTime, Cfg, Cache), - do_publish(Logs, Rank, TickTime, Cfg); - -do_publish([], _, _Rank, TickTime, Cfg, Cache) -> - publish(TickTime, Cfg, Cache), - ok. - -convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId), - type = Type, - last_update_time = Ts}) -> - #{rank => Rank, - clientid => ClientId, - latency => Latency, - type => Type, - timestamp => Ts}. - -publish(TickTime, Cfg, Notices) -> - WindowLog = #{last_tick_at => TickTime, - logs => lists:reverse(Notices)}, - Payload = emqx_json:encode(WindowLog), - Msg = #message{ id = emqx_guid:gen() - , qos = get_value(notice_qos, Cfg) - , from = ?MODULE - , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) - , payload = Payload - , timestamp = ?NOW - }, - _ = emqx_broker:safe_publish(Msg), - ok. - load(Cfg) -> MaxSize = get_value(top_k_num, Cfg), - _ = emqx:hook('message.slow_subs_stats', - fun ?MODULE:on_stats_update/2, - [#{max_size => MaxSize}]), + StatsType = get_value(stats_type, Cfg), + Threshold = get_value(threshold, Cfg), + _ = emqx:hook('message.publish_completed', + fun ?MODULE:on_publish_completed/3, + [#{max_size => MaxSize, + stats_type => StatsType, + threshold => Threshold + }]), ok. unload() -> - emqx:unhook('message.slow_subs_stats', fun ?MODULE:on_stats_update/2). + emqx:unhook('message.publish_completed', fun ?MODULE:on_publish_completed/3 ), + do_clear_history(). do_clear(Cfg, Logs) -> Now = ?NOW, Interval = get_value(expire_interval, Cfg), - Each = fun(#top_k{index = Index, last_update_time = Ts}) -> + Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) -> case Now - Ts >= Interval of true -> - ets:delete(?TOPK_TAB, Index); + delete_with_index(TimeSpan, Id); _ -> true - end + end end, lists:foreach(Each, Logs). -try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> +-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer(). +calc_timespan(whole, #message{timestamp = Ts}, Now) -> + Now - Ts; + +calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) -> + End = emqx_message:get_header(deliver_begin_at, Msg, Now), + End - Ts; + +calc_timespan(response, Msg, Now) -> + Begin = emqx_message:get_header(deliver_begin_at, Msg, Now), + Now - Begin. + +%% update_topk is safe, because each process has a unique clientid +%% insert or delete are bind to this clientid, so there is no race condition +%% +%% but, the delete_with_index in L249 may have a race condition +%% because the data belong to other clientid will be deleted here (deleted the data written by other processes).%% so it may appear that: +%% when deleting a record, the other process is performing an update operation on this recrod +%% in order to solve this race condition problem, the index table also uses the ordered_set type, +%% so that even if the above situation occurs, it will only cause the old data to be deleted twice +%% and the correctness of the data will not be affected + +try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) -> case ets:info(?TOPK_TAB, size) of Size when Size < MaxSize -> - %% if the size is under limit, insert it directly - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}); + update_topk(Now, LastUpdateValue, TimeSpan, Id); _Size -> - %% find the minimum value - ?INDEX(Min, _) = First = - case ets:first(?TOPK_TAB) of - ?INDEX(_, _) = I -> I; - _ -> ?INDEX(Latency - 1, <<>>) - end, - - case Latency =< Min of - true -> true; - _ -> - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}), - - ets:delete(?TOPK_TAB, First) + case ets:first(?TOPK_TAB) of + '$end_of_table' -> + update_topk(Now, LastUpdateValue, TimeSpan, Id); + ?TOPK_INDEX(_, Id) -> + update_topk(Now, LastUpdateValue, TimeSpan, Id); + ?TOPK_INDEX(Min, MinId) -> + case TimeSpan =< Min of + true -> false; + _ -> + update_topk(Now, LastUpdateValue, TimeSpan, Id), + delete_with_index(Min, MinId) + end end end. -update_threshold(Conf) -> - Threshold = proplists:get_value(threshold, Conf), - _ = emqx_message_latency_stats:update_threshold(Threshold), - ok. +-spec find_last_update_value(id()) -> non_neg_integer(). +find_last_update_value(Id) -> + case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of + #index_tab{index = ?INDEX(LastUpdateValue, Id)} -> + LastUpdateValue; + _ -> + 0 + end. + +-spec update_topk(non_neg_integer(), non_neg_integer(), non_neg_integer(), integer()) -> true. +update_topk(Now, LastUpdateValue, TimeSpan, Id) -> + %% update record + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id), + last_update_time = Now, + extra = [] + }), + + %% update index + ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}), + + %% delete the old record & index + delete_with_index(LastUpdateValue, Id). + +-spec delete_with_index(non_neg_integer(), id()) -> true. +delete_with_index(0, _) -> + true; + +delete_with_index(TimeSpan, Id) -> + ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)), + ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)). + +safe_create_tab(Name, Opts) -> + case ets:whereis(Name) of + undefined -> + Name = ets:new(Name, Opts); + _ -> + Name + end. + +do_clear_history() -> + ets:delete_all_objects(?INDEX_TAB), + ets:delete_all_objects(?TOPK_TAB). diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl index a7c9fc050..23d39273d 100644 --- a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl @@ -30,9 +30,12 @@ -export([ clear_history/2 , get_history/2 + , get_history/0 ]). --include("include/emqx_slow_subs.hrl"). +-include_lib("emqx_plugin_libs/include/emqx_slow_subs.hrl"). + +-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)). -import(minirest, [return/1]). @@ -41,17 +44,55 @@ %%-------------------------------------------------------------------- clear_history(_Bindings, _Params) -> - ok = emqx_slow_subs:clear_history(), + Nodes = ekka_mnesia:running_nodes(), + _ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT) + || Node <- Nodes], return(ok). -get_history(_Bindings, Params) -> - RowFun = fun(#top_k{index = ?INDEX(Latency, ClientId), - type = Type, - last_update_time = Ts}) -> - [{clientid, ClientId}, - {latency, Latency}, - {type, Type}, - {last_update_time, Ts}] - end, - Return = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, Params, RowFun), - return({ok, Return}). +get_history(_Bindings, _Params) -> + Nodes = ekka_mnesia:running_nodes(), + Fun = fun(Node, Acc) -> + NodeRankL = rpc_call(Node, + ?MODULE, + ?FUNCTION_NAME, + [], + [], + ?DEFAULT_RPC_TIMEOUT), + NodeRankL ++ Acc + end, + + RankL = lists:foldl(Fun, [], Nodes), + + SortFun = fun(#{timespan := A}, #{timespan := B}) -> + A > B + end, + + SortedL = lists:sort(SortFun, RankL), + SortedL2 = lists:sublist(SortedL, ?MAX_SIZE), + + return({ok, SortedL2}). + +get_history() -> + Node = node(), + RankL = ets:tab2list(?TOPK_TAB), + ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)), + last_update_time = LastUpdateTime + }) -> + #{ clientid => ClientId + , node => Node + , topic => Topic + , timespan => TimeSpan + , last_update_time => LastUpdateTime + } + end, + + lists:map(ConvFun, RankL). + +rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() -> + erlang:apply(M, F, A); + +rpc_call(Node, M, F, A, ErrorR, T) -> + case rpc:call(Node, M, F, A, T) of + {badrpc, _} -> ErrorR; + Res -> Res + end. diff --git a/etc/emqx.conf b/etc/emqx.conf index 4e8aba756..19fdbc656 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2228,25 +2228,18 @@ module.presence.qos = 1 ## maximum number of Top-K record ## -## Value: 10 +## Defalut: 10 #module.slow_subs.top_k_num = 10 -## enable notification -## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval -## publish is disabled if set to 0s. +## Stats Type ## -## Defaut: 0s -#module.slow_subs.notice_interval = 0s +## Default: whole +#module.slow_subs.stats_type = whole -## QoS of notification message in notice topic +## Stats Threshold ## -## Defaut: 0 -#module.slow_subs.notice_qos = 0 - -## Maximum information number in one notification -## -## Default: 100 -#module.slow_subs.notice_batch_size = 100 +## Default: 500ms +#module.slow_subs.threshold = 500ms ## CONFIG_SECTION_END=modules ================================================== diff --git a/lib-ce/emqx_modules/src/emqx_mod_sup.erl b/lib-ce/emqx_modules/src/emqx_mod_sup.erl index 4c8f54679..28f62168c 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_sup.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_sup.erl @@ -69,7 +69,7 @@ stop_child(ChildId) -> init([]) -> ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), - emqx_slow_subs:init_topk_tab(), + emqx_slow_subs:init_tab(), {ok, {{one_for_one, 10, 100}, []}}. %%-------------------------------------------------------------------- diff --git a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl index f2763ae6c..6cfbdea23 100644 --- a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl @@ -53,47 +53,39 @@ t_log_and_pub(_) -> %% Sub topic first Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Clients = start_client(Subs), - emqx:subscribe("$SYS/brokers/+/slow_subs"), - timer:sleep(1000), - Now = ?NOW, - %% publish - - lists:foreach(fun(I) -> - Topic = list_to_binary(io_lib:format("/test1/~p", [I])), - Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}) - end, - lists:seq(1, 10)), - - lists:foreach(fun(I) -> - Topic = list_to_binary(io_lib:format("/test2/~p", [I])), - Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}) - end, - lists:seq(1, 10)), - - timer:sleep(1000), - Size = ets:info(?TOPK_TAB, size), - %% some time record maybe delete due to it expired - ?assert(Size =< 6 andalso Size >= 4), - timer:sleep(1500), - Recs = try_receive([]), - RecSum = lists:sum(Recs), - ?assert(RecSum >= 5), - ?assert(lists:all(fun(E) -> E =< 3 end, Recs)), + Now = ?NOW, + + %% publish + lists:foreach(fun(I) -> + Topic = list_to_binary(io_lib:format("/test1/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}) + end, + lists:seq(1, 10)), + + lists:foreach(fun(I) -> + Topic = list_to_binary(io_lib:format("/test2/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}) + end, + lists:seq(1, 10)), timer:sleep(2000), + Size = ets:info(?TOPK_TAB, size), + %% some time record maybe delete due to it expired + ?assert(Size =< 6 andalso Size >= 4, + unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))), + + timer:sleep(3000), ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. base_conf() -> - [ {threshold, 500} + [ {threshold, 300} , {top_k_num, 5} , {expire_interval, timer:seconds(3)} - , {notice_interval, 1500} - , {notice_qos, 0} - , {notice_batch_size, 3} + , {stats_type, whole} ]. start_client(Subs) -> diff --git a/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl index c8f6b9cfc..36ed45f01 100644 --- a/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl @@ -68,31 +68,28 @@ base_conf() -> t_get_history(_) -> Now = ?NOW, Each = fun(I) -> - ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), - ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), - type = average, - last_update_time = Now}) + ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), + Topic = erlang:list_to_binary(io_lib:format("topic/~p", [I])), + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(I, ?ID(ClientId, Topic)), + last_update_time = Now}) end, lists:foreach(Each, lists:seq(1, 5)), - {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", + {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "", auth_header_()), - #{meta := Meta, data := [First | _]} = decode(Data), - - RMeta = #{page => 1, limit => 10, count => 5}, - ?assertEqual(RMeta, Meta), + #{data := [First | _]} = decode(Data), RFirst = #{clientid => <<"test_5">>, - latency => 5, - type => <<"average">>, + topic => <<"topic/5">>, + timespan => 5, + node => erlang:atom_to_binary(node()), last_update_time => Now}, ?assertEqual(RFirst, First). t_clear(_) -> - ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), - type = average, + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"test">>, <<"test">>)), last_update_time = ?NOW}), {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], diff --git a/priv/emqx.schema b/priv/emqx.schema index 0aa77865c..cc616f2e8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1008,6 +1008,7 @@ end}. ]}. %% @doc the number of smaples for calculate the average latency of delivery +%% @deprecated This is a obsoleted configuration, kept here only for compatibility {mapping, "zone.$name.latency_samples", "emqx.zones", [ {default, 10}, {datatype, integer} @@ -2230,26 +2231,35 @@ end}. ]}. {mapping, "module.slow_subs.expire_interval", "emqx.modules", [ - {default, "5m"}, + {default, "300s"}, {datatype, {duration, ms}} ]}. {mapping, "module.slow_subs.top_k_num", "emqx.modules", [ - {default, 500}, - {datatype, integer} + {default, 10}, + {datatype, integer}, + {validators, ["range:0-1000"]} ]}. +{mapping, "module.slow_subs.stats_type", "emqx.modules", [ + {default, whole}, + {datatype, {enum, [whole, internal, response]}} +]}. + +%% @deprecated This is a obsoleted configuration, kept here only for compatibility {mapping, "module.slow_subs.notice_interval", "emqx.modules", [ {default, "0s"}, {datatype, {duration, ms}} ]}. +%% @deprecated This is a obsoleted configuration, kept here only for compatibility {mapping, "module.slow_subs.notice_qos", "emqx.modules", [ {default, 0}, {datatype, integer}, {validators, ["range:0-1"]} ]}. +%% @deprecated This is a obsoleted configuration, kept here only for compatibility {mapping, "module.slow_subs.notice_batch_size", "emqx.modules", [ {default, 500}, {datatype, integer} diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 3ba286f54..e4487234c 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -124,14 +124,13 @@ await_rel_timeout :: timeout(), %% Created at created_at :: pos_integer(), - %% Message deliver latency stats - latency_stats :: emqx_message_latency_stats:stats() + + extras :: map() }). %% in the previous code, we will replace the message record with the pubrel atom %% in the pubrec function, this will lose the creation time of the message, -%% but now we need this time to calculate latency, so now pubrel atom is changed to this record --record(pubrel_await, {timestamp :: non_neg_integer()}). +-record(pubrel_await, {message :: emqx_types:message()}). -type(session() :: #session{}). @@ -157,8 +156,7 @@ mqueue_dropped, next_pkt_id, awaiting_rel_cnt, - awaiting_rel_max, - latency_stats + awaiting_rel_max ]). -define(DEFAULT_BATCH_N, 1000). @@ -187,7 +185,7 @@ init(#{zone := Zone} = CInfo, #{receive_maximum := MaxInflight}) -> max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)), created_at = erlang:system_time(millisecond), - latency_stats = emqx_message_latency_stats:new(Zone) + extras = #{} }. %% @private init mq @@ -244,9 +242,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout div 1000; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt; -info(latency_stats, #session{latency_stats = Stats}) -> - emqx_message_latency_stats:latency(Stats). + CreatedAt. %% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). @@ -339,10 +335,10 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> + on_publish_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - Session2 = update_latency(Msg, Session), - return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1})); - {value, {_Pubrel, _Ts}} -> + return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); + {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -364,10 +360,10 @@ return_with(Msg, {ok, Publishes, Session}) -> pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), + Update = with_ts(#pubrel_await{message = Msg}), Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; - {value, {_PUBREL, _Ts}} -> + {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -396,10 +392,10 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> | {error, emqx_types:reason_code()}). pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> - Session2 = update_latency(Pubrel, Session), + {value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) -> + on_publish_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(ClientInfo, Session2#session{inflight = Inflight1}); + dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -464,8 +460,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) -> end. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> - emqx:run_hook('message.publish_done', - [Msg, #{session_rebirth_time => Session#session.created_at}]), + on_publish_completed(Msg, Session), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = @@ -480,7 +475,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = {ok, Session1}; false -> Publish = {PacketId, maybe_ack(Msg)}, - Session1 = await(PacketId, Msg, Session), + Msg2 = mark_begin_deliver(Msg), + Session1 = await(PacketId, Msg2, Session), {ok, [Publish], next_pkt_id(Session1)} end. @@ -574,14 +570,13 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> -spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> +retry(ClientInfo, Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> Now = erlang:system_time(millisecond), - Session2 = check_expire_latency(Now, RetryInterval, Session), retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), - [], Now, Session2, ClientInfo) + [], Now, Session, ClientInfo) end. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> @@ -717,31 +712,15 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %%-------------------------------------------------------------------- %% Message Latency Stats %%-------------------------------------------------------------------- -update_latency(Msg, - #session{clientid = ClientId, - latency_stats = Stats, - created_at = CreateAt} = S) -> - case get_birth_timestamp(Msg, CreateAt) of - 0 -> S; - Ts -> - Latency = erlang:system_time(millisecond) - Ts, - Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats), - S#session{latency_stats = Stats2} - end. +on_publish_completed(Msg, + #session{clientid = ClientId, created_at = CreateAt}) -> + emqx:run_hook('message.publish_completed', + [Msg, #{ session_birth_time => CreateAt + , clientid => ClientId + }]). -check_expire_latency(Now, Interval, - #session{clientid = ClientId, latency_stats = Stats} = S) -> - Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats), - S#session{latency_stats = Stats2}. - -get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> - Ts; - -get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> - Ts; - -get_birth_timestamp(_, _) -> - 0. +mark_begin_deliver(Msg) -> + emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). %%-------------------------------------------------------------------- %% Helper functions diff --git a/src/emqx_slow_subs/emqx_message_latency_stats.erl b/src/emqx_slow_subs/emqx_message_latency_stats.erl deleted file mode 100644 index 1d6158d59..000000000 --- a/src/emqx_slow_subs/emqx_message_latency_stats.erl +++ /dev/null @@ -1,119 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_message_latency_stats). - -%% API --export([ new/1, update/3, check_expire/4, latency/1]). - --export([get_threshold/0, update_threshold/1]). - --define(NOW, erlang:system_time(millisecond)). --define(MINIMUM_INSERT_INTERVAL, 1000). --define(MINIMUM_THRESHOLD, 100). --define(DEFAULT_THRESHOLD, 500). --define(THRESHOLD_KEY, {?MODULE, threshold}). - --opaque stats() :: #{ ema := emqx_moving_average:ema() - , last_update_time := timestamp() - , last_access_time := timestamp() %% timestamp of last try to call hook - , last_insert_value := non_neg_integer() - }. - --type timestamp() :: non_neg_integer(). --type timespan() :: number(). - --type latency_type() :: average - | expire. - --import(emqx_zone, [get_env/3]). - --export_type([stats/0, latency_type/0]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - --spec new(non_neg_integer() | emqx_types:zone()) -> stats(). -new(SamplesT) when is_integer(SamplesT) -> - Samples = erlang:max(1, SamplesT), - #{ ema => emqx_moving_average:new(exponential, #{period => Samples}) - , last_update_time => 0 - , last_access_time => 0 - , last_insert_value => 0 - }; - -new(Zone) -> - Samples = get_env(Zone, latency_samples, 1), - new(Samples). - --spec update(emqx_types:clientid(), number(), stats()) -> stats(). -update(ClientId, Val, #{ema := EMA} = Stats) -> - Now = ?NOW, - #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA), - Stats2 = call_hook(ClientId, Now, average, Latency, Stats), - Stats2#{ ema := EMA2 - , last_update_time := ?NOW}. - --spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats(). -check_expire(_, Now, Interval, #{last_update_time := LUT} = S) - when LUT >= Now - Interval -> - S; - -check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) -> - Latency = Now - LUT, - call_hook(ClientId, Now, expire, Latency, S). - --spec latency(stats()) -> number(). -latency(#{ema := #{average := Average}}) -> - Average. - --spec update_threshold(pos_integer()) -> pos_integer(). -update_threshold(Threshold) -> - Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD), - persistent_term:put(?THRESHOLD_KEY, Val), - Val. - -get_threshold() -> - persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- --spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats(). -call_hook(_, _, _, Latency, S) - when Latency =< ?MINIMUM_THRESHOLD -> - S; - -call_hook(_, Now, _, _, #{last_access_time := LIT} = S) - when Now =< LIT + ?MINIMUM_INSERT_INTERVAL -> - S; - -call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> - case Latency =< get_threshold() of - true -> - Stats#{last_access_time := Now}; - _ -> - ToInsert = erlang:floor(Latency), - Arg = #{clientid => ClientId, - latency => ToInsert, - type => Type, - last_insert_value => LIV, - update_time => Now}, - emqx:run_hook('message.slow_subs_stats', [Arg]), - Stats#{last_insert_value := ToInsert, - last_access_time := Now} - end. diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index a1d24402c..cfcf0f132 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -192,7 +192,8 @@ t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), - {ok, Msg, Session1} = emqx_session:pubrec(2, Session), + {ok, MsgWithTime, Session1} = emqx_session:pubrec(2, Session), + ?assertEqual(Msg, emqx_message:remove_header(deliver_begin_at, MsgWithTime)), ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> @@ -214,7 +215,7 @@ t_pubrel_error_packetid_not_found(_) -> t_pubcomp(_) -> Now = ts(millisecond), - Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, {{pubrel_await, undefined}, Now}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). @@ -271,9 +272,13 @@ t_deliver_qos1(_) -> ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), + + {ok, Msg1WithTime, Session2} = emqx_session:puback(clientinfo(), 1, Session1), + ?assertEqual(Msg1, emqx_message:remove_header(deliver_begin_at, Msg1WithTime)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), + + {ok, Msg2WithTime, Session3} = emqx_session:puback(clientinfo(), 2, Session2), + ?assertEqual(Msg2, emqx_message:remove_header(deliver_begin_at, Msg2WithTime)), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> @@ -317,7 +322,11 @@ t_retry(_) -> {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ok = timer:sleep(200), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), + {ok, Msgs1WithTime, 100, Session2} = emqx_session:retry(clientinfo(), Session1), + ?assertEqual(Msgs1, + lists:map(fun({Id, M}) -> + {Id, emqx_message:remove_header(deliver_begin_at, M)} + end, Msgs1WithTime)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -341,7 +350,10 @@ t_replay(_) -> Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), - ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), + ?assertEqual(Pubs1 ++ [{3, Msg}], + lists:map(fun({Id, M}) -> + {Id, emqx_message:remove_header(deliver_begin_at, M)} + end, ReplayPubs)), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). t_expire_awaiting_rel(_) ->