diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index fef3e4f19..162cff2e0 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -291,8 +291,9 @@ create_session(ClientInfo, ConnInfo) -> ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. -get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_interval := EI}) -> - #{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), +get_session_confs(#{zone := Zone, clientid := ClientId}, #{receive_maximum := MaxInflight, expiry_interval := EI}) -> + #{clientid => ClientId, + max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), max_inflight => MaxInflight, retry_interval => get_mqtt_conf(Zone, retry_interval), @@ -301,7 +302,8 @@ get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_inte %% TODO: Add conf for allowing/disallowing persistent sessions. %% Note that the connection info is already enriched to have %% default config values for session expiry. - is_persistent => EI > 0 + is_persistent => EI > 0, + latency_stats => emqx_config:get_zone_conf(Zone, [latency_stats]) }. mqueue_confs(Zone) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index c4f77b9a0..f8da72ad9 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -175,6 +175,9 @@ roots(low) -> , {"persistent_session_store", sc(ref("persistent_session_store"), #{})} + , {"latency_stats", + sc(ref("latency_stats"), + #{})} ]. fields("persistent_session_store") -> @@ -974,6 +977,11 @@ when deactivated, but after the retention time. """ }) } + ]; + +fields("latency_stats") -> + [ {"samples", sc(integer(), #{default => 10, + desc => "the number of smaples for calculate the average latency of delivery"})} ]. mqtt_listener() -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 610555819..bf79085af 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -98,7 +98,8 @@ ]). -record(session, { - %% sessionID, fresh for all new sessions unless it is a resumed persistent session + %% Client's id + clientid :: emqx_types:clientid(), id :: sessionID(), %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 is_persistent :: boolean(), @@ -128,9 +129,16 @@ %% Awaiting PUBREL Timeout (Unit: millsecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer() + created_at :: pos_integer(), + %% Message deliver latency stats + latency_stats :: emqx_message_latency_stats:stats() }). +%% in the previous code, we will replace the message record with the pubrel atom +%% in the pubrec function, this will lose the creation time of the message, +%% but now we need this time to calculate latency, so now pubrel atom is changed to this record +-record(pubrel_await, {timestamp :: non_neg_integer()}). + -type(session() :: #session{}). -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}). @@ -157,7 +165,8 @@ mqueue_dropped, next_pkt_id, awaiting_rel_cnt, - awaiting_rel_max + awaiting_rel_max, + latency_stats ]). -define(DEFAULT_BATCH_N, 1000). @@ -170,6 +179,8 @@ , max_inflight => integer() , mqueue => emqx_mqueue:options() , is_persistent => boolean() + , clientid => emqx_types:clientid() + , latency_stats => emqx_message_latency_stats:create_options() }. %%-------------------------------------------------------------------- @@ -185,6 +196,7 @@ init(Opts) -> }, maps:get(mqueue, Opts, #{})), #session{ id = emqx_guid:gen(), + clientid = maps:get(clientid, Opts, <<>>), is_persistent = maps:get(is_persistent, Opts, false), max_subscriptions = maps:get(max_subscriptions, Opts, infinity), subscriptions = #{}, @@ -196,7 +208,8 @@ init(Opts) -> awaiting_rel = #{}, max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), - created_at = erlang:system_time(millisecond) + created_at = erlang:system_time(millisecond), + latency_stats = emqx_message_latency_stats:new(maps:get(latency_stats, Opts, #{})) }. %%-------------------------------------------------------------------- @@ -252,7 +265,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +info(latency_stats, #session{latency_stats = Stats}) -> + emqx_message_latency_stats:latency(Stats). %% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). @@ -365,7 +380,8 @@ puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), - return_with(Msg, dequeue(Session#session{inflight = Inflight1})); + Session2 = update_latency(Msg, Session), + return_with(Msg, dequeue(Session2#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -388,9 +404,10 @@ return_with(Msg, {ok, Publishes, Session}) -> pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), + Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; - {value, {pubrel, _Ts}} -> + {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -419,9 +436,10 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> | {error, emqx_types:reason_code()}). pubcomp(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {pubrel, _Ts}} -> + {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> + Session2 = update_latency(Pubrel, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(Session#session{inflight = Inflight1}); + dequeue(Session2#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -588,11 +606,16 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %%-------------------------------------------------------------------- -spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(Session = #session{inflight = Inflight}) -> +retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; - false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), - [], erlang:system_time(millisecond), Session) + false -> + Now = erlang:system_time(millisecond), + Session2 = check_expire_latency(Now, RetryInterval, Session), + retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), + [], + Now, + Session2) end. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> @@ -619,8 +642,8 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) - {[{PacketId, Msg1}|Acc], Inflight1} end; -retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> - Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), +retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) -> + Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight), {[{pubrel, PacketId}|Acc], Inflight1}. %%-------------------------------------------------------------------- @@ -664,7 +687,7 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(session()) -> {ok, replies(), session()}). replay(Session = #session{inflight = Inflight}) -> - Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> + Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) -> {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} @@ -715,6 +738,35 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. +%%-------------------------------------------------------------------- +%% Message Latency Stats +%%-------------------------------------------------------------------- +update_latency(Msg, + #session{clientid = ClientId, + latency_stats = Stats, + created_at = CreateAt} = S) -> + case get_birth_timestamp(Msg, CreateAt) of + 0 -> S; + Ts -> + Latency = erlang:system_time(millisecond) - Ts, + Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats), + S#session{latency_stats = Stats2} + end. + +check_expire_latency(Now, Interval, + #session{clientid = ClientId, latency_stats = Stats} = S) -> + Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats), + S#session{latency_stats = Stats2}. + +get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> + Ts; + +get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> + Ts; + +get_birth_timestamp(_, _) -> + 0. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl b/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl new file mode 100644 index 000000000..237e1b08d --- /dev/null +++ b/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_message_latency_stats). + +%% API +-export([new/1, update/3, check_expire/4, latency/1]). + +-export([get_threshold/0, update_threshold/1]). + +-define(NOW, erlang:system_time(millisecond)). +-define(MINIMUM_INSERT_INTERVAL, 1000). +-define(MINIMUM_THRESHOLD, 100). +-define(DEFAULT_THRESHOLD, 500). +-define(DEFAULT_SAMPLES, 10). +-define(THRESHOLD_KEY, {?MODULE, threshold}). + +-opaque stats() :: #{ ema := emqx_moving_average:ema() + , last_update_time := timestamp() + , last_access_time := timestamp() %% timestamp of last access top-k + , last_insert_value := non_neg_integer() + }. + +-type timestamp() :: non_neg_integer(). +-type timespan() :: number(). + +-type latency_type() :: average + | expire. + +-type create_options() :: #{samples => pos_integer()}. + +-export_type([stats/0, latency_type/0, create_options/0]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec new(non_neg_integer() | create_options()) -> stats(). +new(SamplesT) when is_integer(SamplesT) -> + Samples = erlang:max(1, SamplesT), + #{ ema => emqx_moving_average:new(exponential, #{period => Samples}) + , last_update_time => 0 + , last_access_time => 0 + , last_insert_value => 0 + }; + +new(OptsT) -> + Opts = maps:merge(#{samples => ?DEFAULT_SAMPLES}, OptsT), + #{samples := Samples} = Opts, + new(Samples). + +-spec update(emqx_types:clientid(), number(), stats()) -> stats(). +update(ClientId, Val, #{ema := EMA} = Stats) -> + Now = ?NOW, + #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA), + Stats2 = call_hook(ClientId, Now, average, Latency, Stats), + Stats2#{ ema := EMA2 + , last_update_time := ?NOW}. + +-spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats(). +check_expire(_, Now, Interval, #{last_update_time := LUT} = S) + when LUT >= Now - Interval -> + S; + +check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) -> + Latency = Now - LUT, + call_hook(ClientId, Now, expire, Latency, S). + +-spec latency(stats()) -> number(). +latency(#{ema := #{average := Average}}) -> + Average. + +-spec update_threshold(pos_integer()) -> pos_integer(). +update_threshold(Threshold) -> + Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD), + persistent_term:put(?THRESHOLD_KEY, Val), + Val. + +get_threshold() -> + persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats(). +call_hook(_, _, _, Latency, S) + when Latency =< ?MINIMUM_THRESHOLD -> + S; + +call_hook(_, Now, _, _, #{last_access_time := LIT} = S) + when Now =< LIT + ?MINIMUM_INSERT_INTERVAL -> + S; + +call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> + case Latency =< get_threshold() of + true -> + Stats#{last_access_time := Now}; + _ -> + ToInsert = erlang:floor(Latency), + Arg = #{clientid => ClientId, + latency => ToInsert, + type => Type, + last_insert_value => LIV, + update_time => Now}, + emqx:run_hook('message.slow_subs_stats', [Arg]), + Stats#{last_insert_value := ToInsert, + last_access_time := Now} + end. diff --git a/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl b/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl new file mode 100644 index 000000000..64c73f987 --- /dev/null +++ b/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @see https://en.wikipedia.org/wiki/Moving_average + +-module(emqx_moving_average). + +%% API +-export([new/0, new/1, new/2, update/2]). + +-type type() :: cumulative + | exponential. + +-type ema() :: #{ type := exponential + , average := 0 | float() + , coefficient := float() + }. + +-type cma() :: #{ type := cumulative + , average := 0 | float() + , count := non_neg_integer() + }. + +-type moving_average() :: ema() + | cma(). + +-define(DEF_EMA_ARG, #{period => 10}). +-define(DEF_AVG_TYPE, exponential). + +-export_type([type/0, moving_average/0, ema/0, cma/0]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec new() -> moving_average(). +new() -> + new(?DEF_AVG_TYPE, #{}). + +-spec new(type()) -> moving_average(). +new(Type) -> + new(Type, #{}). + +-spec new(type(), Args :: map()) -> moving_average(). +new(cumulative, _) -> + #{ type => cumulative + , average => 0 + , count => 0 + }; + +new(exponential, Arg) -> + #{period := Period} = maps:merge(?DEF_EMA_ARG, Arg), + #{ type => exponential + , average => 0 + %% coefficient = 2/(N+1) is a common convention, see the wiki link for details + , coefficient => 2 / (Period + 1) + }. + +-spec update(number(), moving_average()) -> moving_average(). + +update(Val, #{average := 0} = Avg) -> + Avg#{average := Val}; + +update(Val, #{ type := cumulative + , average := Average + , count := Count} = CMA) -> + NewCount = Count + 1, + CMA#{average := (Count * Average + Val) / NewCount, + count := NewCount}; + +update(Val, #{ type := exponential + , average := Average + , coefficient := Coefficient} = EMA) -> + EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index 1d24f9481..9d8348c49 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -24,7 +24,7 @@ namespace() -> zone. %% roots are added only for document generation. roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown", "conn_congestion", "rate_limit", "quota", "force_gc", - "overload_protection" + "overload_protection", "latency_stats" ]. %% zone schemas are clones from the same name from root level diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 6462fffed..32aba9674 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -100,6 +100,7 @@ clientinfo() -> %% See emqx_session:session() type define sessioninfo() -> ?LET(Session, {session, + clientid(), sessionid(), % id boolean(), % is_persistent subscriptions(), % subscriptions @@ -112,7 +113,8 @@ sessioninfo() -> awaiting_rel(), % awaiting_rel non_neg_integer(), % max_awaiting_rel safty_timeout(), % await_rel_timeout - timestamp() % created_at + timestamp(), % created_at + latency_stats() }, emqx_session:info(Session)). @@ -336,6 +338,30 @@ normal_topic_filter() -> end end). +%% Type defined emqx_message_lantency_stats.erl - stats() +latency_stats() -> + Keys = [{threshold, number()}, + {ema, exp_moving_average()}, + {last_update_time, non_neg_integer()}, + {last_access_time, non_neg_integer()}, + {last_insert_value, non_neg_integer()} + ], + ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())}, + begin + maps:merge(maps:from_list(Ks), M) + end). + +%% Type defined emqx_moving_average.erl - ema() +exp_moving_average() -> + Keys = [{type, exponential}, + {average, number()}, + {coefficient, float()} + ], + ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())}, + begin + maps:merge(maps:from_list(Ks), M) + end). + %%-------------------------------------------------------------------- %% Basic Types %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 8e29b8201..e4a4945ce 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -24,6 +24,9 @@ all() -> emqx_common_test_helpers:all(?MODULE). +-define(NOW, erlang:system_time(millisecond)). +-record(pubrel_await, {timestamp :: non_neg_integer()}). + %%-------------------------------------------------------------------- %% CT callbacks %%-------------------------------------------------------------------- @@ -181,7 +184,7 @@ t_puback_with_dequeue(_) -> ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). t_puback_error_packet_id_in_use(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, session(#{inflight => Inflight})). @@ -193,10 +196,10 @@ t_pubrec(_) -> Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Msg, Session1} = emqx_session:pubrec(2, Session), - ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). + ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubrec(1, session(#{inflight => Inflight})). @@ -212,7 +215,7 @@ t_pubrel_error_packetid_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()). t_pubcomp(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Session1} = emqx_session:pubcomp(1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). @@ -261,7 +264,7 @@ t_deliver_qos0(_) -> t_deliver_qos1(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), {ok, Session} = emqx_session:subscribe( - clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), + clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), @@ -399,4 +402,3 @@ ts(second) -> erlang:system_time(second); ts(millisecond) -> erlang:system_time(millisecond). - diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 9f03d1e7f..5511a6dd0 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -58,6 +58,7 @@ , emqx_psk_schema , emqx_limiter_schema , emqx_connector_schema + , emqx_slow_subs_schema ]). namespace() -> undefined. diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 8fd0a29a1..a2450f988 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -61,10 +61,18 @@ do_paginate(Qh, Count, Params, {Module, FormatFun}) -> query_handle(Table) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table)]); + +query_handle({Table, Opts}) when is_atom(Table) -> + qlc:q([R || R <- ets:table(Table, Opts)]); + query_handle([Table]) when is_atom(Table) -> qlc:q([R || R <- ets:table(Table)]); + +query_handle([{Table, Opts}]) when is_atom(Table) -> + qlc:q([R || R <- ets:table(Table, Opts)]); + query_handle(Tables) -> - qlc:append([qlc:q([E || E <- ets:table(T)]) || T <- Tables]). + qlc:append([query_handle(T) || T <- Tables]). % query_handle(Table, MatchSpec) when is_atom(Table) -> Options = {traverse, {select, MatchSpec}}, @@ -78,8 +86,16 @@ query_handle(Tables, MatchSpec) -> count(Table) when is_atom(Table) -> ets:info(Table, size); + +count({Table, _}) when is_atom(Table) -> + ets:info(Table, size); + count([Table]) when is_atom(Table) -> ets:info(Table, size); + +count([{Table, _}]) when is_atom(Table) -> + ets:info(Table, size); + count(Tables) -> lists:sum([count(T) || T <- Tables]). diff --git a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf new file mode 100644 index 000000000..3cf8189d8 --- /dev/null +++ b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf @@ -0,0 +1,40 @@ +##-------------------------------------------------------------------- +## EMQ X Slow Subscribers Statistics +##-------------------------------------------------------------------- + +emqx_slow_subs { + enable = false + + threshold = 500ms + ## The latency threshold for statistics, the minimum value is 100ms + ## + ## Default: 500ms + + ## The eviction time of the record, which in the statistics record table + ## + ## Default: 5m + expire_interval = 5m + + ## The maximum number of records in the slow subscription statistics record table + ## + ## Value: 10 + top_k_num = 10 + + ## The interval for pushing statistics table records to the system topic. When set to 0, push is disabled + ## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval + ## publish is disabled if set to 0s. + ## + ## Value: 0s + expire_interval = 0s + + ## QoS of notification message + ## + ## Defaut: 0 + notice_qos = 0 + + ## Maximum information number in one notification + ## + ## Default: 100 + notice_batch_size = 100 + +} diff --git a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl new file mode 100644 index 000000000..0b5e3a035 --- /dev/null +++ b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl @@ -0,0 +1,28 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(TOPK_TAB, emqx_slow_subs_topk). + +-define(INDEX(Latency, ClientId), {Latency, ClientId}). + +-record(top_k, { index :: index() + , type :: emqx_message_latency_stats:latency_type() + , last_update_time :: pos_integer() + , extra = [] + }). + +-type top_k() :: #top_k{}. +-type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()). diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.app.src b/apps/emqx_slow_subs/src/emqx_slow_subs.app.src new file mode 100644 index 000000000..593170c37 --- /dev/null +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.app.src @@ -0,0 +1,12 @@ +{application, emqx_slow_subs, + [{description, "EMQ X Slow Subscribers Statistics"}, + {vsn, "1.0.0"}, % strict semver, bump manually! + {modules, []}, + {registered, [emqx_slow_subs_sup]}, + {applications, [kernel,stdlib]}, + {mod, {emqx_slow_subs_app,[]}}, + {env, []}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, []} + ]}. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl new file mode 100644 index 000000000..acb4ea441 --- /dev/null +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -0,0 +1,318 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs). + +-behaviour(gen_server). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). + +-export([ start_link/0, on_stats_update/2, update_settings/1 + , clear_history/0, init_topk_tab/0 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-compile(nowarn_unused_type). + +-type state() :: #{ enable := boolean() + , last_tick_at := pos_integer() + }. + +-type log() :: #{ rank := pos_integer() + , clientid := emqx_types:clientid() + , latency := non_neg_integer() + , type := emqx_message_latency_stats:latency_type() + }. + +-type window_log() :: #{ last_tick_at := pos_integer() + , logs := [log()] + }. + +-type message() :: #message{}. + +-type stats_update_args() :: #{ clientid := emqx_types:clientid() + , latency := non_neg_integer() + , type := emqx_message_latency_stats:latency_type() + , last_insert_value := non_neg_integer() + , update_time := timer:time() + }. + +-type stats_update_env() :: #{max_size := pos_integer()}. + +-ifdef(TEST). +-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). +-else. +-define(EXPIRE_CHECK_INTERVAL, timer:seconds(10)). +-endif. + +-define(NOW, erlang:system_time(millisecond)). +-define(NOTICE_TOPIC_NAME, "slow_subs"). +-define(DEF_CALL_TIMEOUT, timer:seconds(10)). + +%% erlang term order +%% number < atom < reference < fun < port < pid < tuple < list < bit string + +%% ets ordered_set is ascending by term order + +%%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +%% @doc Start the st_statistics +-spec(start_link() -> emqx_types:startlink_ret()). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% XXX NOTE:pay attention to the performance here +-spec on_stats_update(stats_update_args(), stats_update_env()) -> true. +on_stats_update(#{clientid := ClientId, + latency := Latency, + type := Type, + last_insert_value := LIV, + update_time := Ts}, + #{max_size := MaxSize}) -> + + LastIndex = ?INDEX(LIV, ClientId), + Index = ?INDEX(Latency, ClientId), + + %% check whether the client is in the table + case ets:lookup(?TOPK_TAB, LastIndex) of + [#top_k{index = Index}] -> + %% if last value == the new value, update the type and last_update_time + %% XXX for clients whose latency are stable for a long time, is it possible to reduce updates? + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}); + [_] -> + %% if Latency > minimum value, we should update it + %% if Latency < minimum value, maybe it can replace the minimum value + %% so alwyas update at here + %% do we need check if Latency == minimum ??? + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}), + ets:delete(?TOPK_TAB, LastIndex); + [] -> + %% try to insert + try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) + end. + +clear_history() -> + gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT). + +update_settings(Enable) -> + gen_server:call(?MODULE, {?FUNCTION_NAME, Enable}, ?DEF_CALL_TIMEOUT). + +init_topk_tab() -> + case ets:whereis(?TOPK_TAB) of + undefined -> + ?TOPK_TAB = ets:new(?TOPK_TAB, + [ ordered_set, public, named_table + , {keypos, #top_k.index}, {write_concurrency, true} + , {read_concurrency, true} + ]); + _ -> + ?TOPK_TAB + end. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + Enable = emqx:get_config([emqx_slow_subs, enable]), + {ok, check_enable(Enable, #{enable => false})}. + +handle_call({update_settings, Enable}, _From, State) -> + State2 = check_enable(Enable, State), + {reply, ok, State2}; + +handle_call(clear_history, _, State) -> + ets:delete_all_objects(?TOPK_TAB), + {reply, ok, State}; + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info(expire_tick, State) -> + expire_tick(), + Logs = ets:tab2list(?TOPK_TAB), + do_clear(Logs), + {noreply, State}; + +handle_info(notice_tick, State) -> + notice_tick(), + Logs = ets:tab2list(?TOPK_TAB), + do_notification(Logs, State), + {noreply, State#{last_tick_at := ?NOW}}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _) -> + unload(), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +expire_tick() -> + erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). + +notice_tick() -> + case emqx:get_config([emqx_slow_subs, notice_interval]) of + 0 -> ok; + Interval -> + erlang:send_after(Interval, self(), ?FUNCTION_NAME), + ok + end. + +-spec do_notification(list(), state()) -> ok. +do_notification([], _) -> + ok; + +do_notification(Logs, #{last_tick_at := LastTickTime}) -> + start_publish(Logs, LastTickTime), + ok. + +start_publish(Logs, TickTime) -> + emqx_pool:async_submit({fun do_publish/3, [Logs, erlang:length(Logs), TickTime]}). + +do_publish([], _, _) -> + ok; + +do_publish(Logs, Rank, TickTime) -> + BatchSize = emqx:get_config([emqx_slow_subs, notice_batch_size]), + do_publish(Logs, BatchSize, Rank, TickTime, []). + +do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 -> + Cache2 = [convert_to_notice(Rank, Log) | Cache], + do_publish(T, Size - 1, Rank - 1, TickTime, Cache2); + +do_publish(Logs, Size, Rank, TickTime, Cache) when Size =:= 0 -> + publish(TickTime, Cache), + do_publish(Logs, Rank, TickTime); + +do_publish([], _, _Rank, TickTime, Cache) -> + publish(TickTime, Cache), + ok. + +convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId), + type = Type, + last_update_time = Ts}) -> + #{rank => Rank, + clientid => ClientId, + latency => Latency, + type => Type, + timestamp => Ts}. + +publish(TickTime, Notices) -> + WindowLog = #{last_tick_at => TickTime, + logs => lists:reverse(Notices)}, + Payload = emqx_json:encode(WindowLog), + Msg = #message{ id = emqx_guid:gen() + , qos = emqx:get_config([emqx_slow_subs, notice_qos]) + , from = ?MODULE + , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) + , payload = Payload + , timestamp = ?NOW + }, + _ = emqx_broker:safe_publish(Msg), + ok. + +load() -> + MaxSize = emqx:get_config([emqx_slow_subs, top_k_num]), + _ = emqx:hook('message.slow_subs_stats', + {?MODULE, on_stats_update, [#{max_size => MaxSize}]} + ), + ok. + +unload() -> + emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}). + +do_clear(Logs) -> + Now = ?NOW, + Interval = emqx:get_config([emqx_slow_subs, expire_interval]), + Each = fun(#top_k{index = Index, last_update_time = Ts}) -> + case Now - Ts >= Interval of + true -> + ets:delete(?TOPK_TAB, Index); + _ -> + true + end + end, + lists:foreach(Each, Logs). + +try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> + case ets:info(?TOPK_TAB, size) of + Size when Size < MaxSize -> + %% if the size is under limit, insert it directly + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}); + _Size -> + %% find the minimum value + ?INDEX(Min, _) = First = + case ets:first(?TOPK_TAB) of + ?INDEX(_, _) = I -> I; + _ -> ?INDEX(Latency - 1, <<>>) + end, + + case Latency =< Min of + true -> true; + _ -> + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}), + + ets:delete(?TOPK_TAB, First) + end + end. + +check_enable(Enable, #{enable := IsEnable} = State) -> + update_threshold(), + case Enable of + IsEnable -> + State; + true -> + notice_tick(), + expire_tick(), + load(), + State#{enable := true, last_tick_at => ?NOW}; + _ -> + unload(), + State#{enable := false} + end. + +update_threshold() -> + Threshold = emqx:get_config([emqx_slow_subs, threshold]), + emqx_message_latency_stats:update_threshold(Threshold), + ok. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl new file mode 100644 index 000000000..8af4f14ea --- /dev/null +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). + +-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]). + +-export([slow_subs/2, encode_record/1, settings/2]). + +-import(hoconsc, [mk/2, ref/1]). +-import(emqx_mgmt_util, [bad_request/0]). + +-define(FORMAT_FUN, {?MODULE, encode_record}). +-define(APP, emqx_slow_subs). +-define(APP_NAME, <<"emqx_slow_subs">>). + +namespace() -> "slow_subscribers_statistics". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE). + +paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"]. + +schema(("/slow_subscriptions")) -> + #{ + 'operationId' => slow_subs, + delete => #{tags => [<<"slow subs">>], + description => <<"Clear current data and re count slow topic">>, + parameters => [], + 'requestBody' => [], + responses => #{204 => <<"No Content">>} + }, + get => #{tags => [<<"slow subs">>], + description => <<"Get slow topics statistics record data">>, + parameters => [ {page, mk(integer(), #{in => query})} + , {limit, mk(integer(), #{in => query})} + ], + 'requestBody' => [], + responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]} + } + }; + +schema("/slow_subscriptions/settings") -> + #{'operationId' => settings, + get => #{tags => [<<"slow subs">>], + description => <<"Get slow subs settings">>, + responses => #{200 => conf_schema()} + }, + put => #{tags => [<<"slow subs">>], + description => <<"Update slow subs settings">>, + 'requestBody' => conf_schema(), + responses => #{200 => conf_schema()} + } + }. + +fields(record) -> + [ + {clientid, mk(string(), #{desc => <<"the clientid">>})}, + {latency, mk(integer(), #{desc => <<"average time for message delivery or time for message expire">>})}, + {type, mk(string(), #{desc => <<"type of the latency, could be average or expire">>})}, + {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} + ]. + +conf_schema() -> + Ref = hoconsc:ref(emqx_slow_subs_schema, "emqx_slow_subs"), + hoconsc:mk(Ref, #{}). + +slow_subs(delete, _) -> + ok = emqx_slow_subs:clear_history(), + {204}; + +slow_subs(get, #{query_string := QS}) -> + Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN), + {200, Data}. + +encode_record(#top_k{index = ?INDEX(Latency, ClientId), + type = Type, + last_update_time = Ts}) -> + #{clientid => ClientId, + latency => Latency, + type => Type, + last_update_time => Ts}. + +settings(get, _) -> + {200, emqx:get_raw_config([?APP_NAME], #{})}; + +settings(put, #{body := Body}) -> + {ok, #{config := #{enable := Enable}}} = emqx:update_config([?APP], Body), + _ = emqx_slow_subs:update_settings(Enable), + {200, emqx:get_raw_config([?APP_NAME], #{})}. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl new file mode 100644 index 000000000..d171b0a4f --- /dev/null +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_app.erl @@ -0,0 +1,30 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_app). + +-behaviour(application). + +-export([ start/2 + , stop/1 + ]). + +start(_Type, _Args) -> + {ok, Sup} = emqx_slow_subs_sup:start_link(), + {ok, Sup}. + +stop(_State) -> + ok. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl new file mode 100644 index 000000000..7bdcb16f3 --- /dev/null +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -0,0 +1,44 @@ +-module(emqx_slow_subs_schema). + +-include_lib("typerefl/include/types.hrl"). + +-export([roots/0, fields/1]). + +roots() -> ["emqx_slow_subs"]. + +fields("emqx_slow_subs") -> + [ {enable, sc(boolean(), false, "switch of this function")} + , {threshold, + sc(emqx_schema:duration_ms(), + "500ms", + "The latency threshold for statistics, the minimum value is 100ms")} + , {expire_interval, + sc(emqx_schema:duration_ms(), + "5m", + "The eviction time of the record, which in the statistics record table")} + , {top_k_num, + sc(integer(), + 10, + "The maximum number of records in the slow subscription statistics record table")} + , {notice_interval, + sc(emqx_schema:duration_ms(), + "0s", + "The interval for pushing statistics table records to the system topic. When set to 0, push is disabled" + "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval" + "publish is disabled if set to 0s." + )} + , {notice_qos, + sc(range(0, 2), + 0, + "QoS of notification message in notice topic")} + , {notice_batch_size, + sc(integer(), + 0, + "Maximum information number in one notification")} + ]. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +sc(Type, Default, Desc) -> + hoconsc:mk(Type, #{default => Default, desc => Desc}). diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl new file mode 100644 index 000000000..a6ad72c74 --- /dev/null +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + emqx_slow_subs:init_topk_tab(), + {ok, {{one_for_one, 10, 3600}, + [#{id => st_statistics, + start => {emqx_slow_subs, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_slow_subs]}]}}. diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl new file mode 100644 index 000000000..f66122775 --- /dev/null +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -0,0 +1,124 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx.hrl"). + +-define(TOPK_TAB, emqx_slow_subs_topk). +-define(NOW, erlang:system_time(millisecond)). + +-define(BASE_CONF, <<""" +emqx_slow_subs { + enable = true + top_k_num = 5, + expire_interval = 3000 + notice_interval = 1500 + notice_qos = 0 + notice_batch_size = 3 +}""">>). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_config:init_load(emqx_slow_subs_schema, ?BASE_CONF), + emqx_common_test_helpers:start_apps([emqx_slow_subs]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([emqx_slow_subs]). + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _) -> + ok. + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- +t_log_and_pub(_) -> + %% Sub topic first + Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], + Clients = start_client(Subs), + emqx:subscribe("$SYS/brokers/+/slow_subs"), + timer:sleep(1000), + Now = ?NOW, + %% publish + + lists:foreach(fun(I) -> + Topic = list_to_binary(io_lib:format("/test1/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}) + end, + lists:seq(1, 10)), + + lists:foreach(fun(I) -> + Topic = list_to_binary(io_lib:format("/test2/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}) + end, + lists:seq(1, 10)), + + timer:sleep(1000), + Size = ets:info(?TOPK_TAB, size), + %% some time record maybe delete due to it expired + ?assert(Size =< 6 andalso Size >= 4), + + timer:sleep(1500), + Recs = try_receive([]), + RecSum = lists:sum(Recs), + ?assert(RecSum >= 5), + ?assert(lists:all(fun(E) -> E =< 3 end, Recs)), + + timer:sleep(2000), + ?assert(ets:info(?TOPK_TAB, size) =:= 0), + [Client ! stop || Client <- Clients], + ok. + +start_client(Subs) -> + [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. + +client(I, Subs) -> + {ok, C} = emqtt:start_link([{host, "localhost"}, + {clientid, io_lib:format("slow_subs_~p", [I])}, + {username, <<"plain">>}, + {password, <<"plain">>}]), + {ok, _} = emqtt:connect(C), + + Len = erlang:length(Subs), + Sub = lists:nth(I rem Len + 1, Subs), + _ = emqtt:subscribe(C, Sub), + + receive + stop -> + ok + end. + +try_receive(Acc) -> + receive + {deliver, _, #message{payload = Payload}} -> + #{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]), + try_receive([length(Logs) | Acc]) + after 500 -> + Acc + end. diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl new file mode 100644 index 000000000..009feda01 --- /dev/null +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -0,0 +1,174 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_management/include/emqx_mgmt.hrl"). +-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). + +-define(HOST, "http://127.0.0.1:18083/"). + +-define(API_VERSION, "v5"). + +-define(BASE_PATH, "api"). +-define(NOW, erlang:system_time(millisecond)). + +-define(CONF_DEFAULT, <<""" +emqx_slow_subs +{ + enable = true + top_k_num = 5, + expire_interval = 60000 + notice_interval = 0 + notice_qos = 0 + notice_batch_size = 3 +}""">>). + + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_config:init_load(emqx_slow_subs_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_slow_subs]), + {ok, _} = application:ensure_all_started(emqx_authn), + Config. + +end_per_suite(Config) -> + application:stop(emqx_authn), + emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]), + Config. + +init_per_testcase(_, Config) -> + application:ensure_all_started(emqx_slow_subs), + timer:sleep(500), + Config. + +end_per_testcase(_, Config) -> + application:stop(emqx_slow_subs), + Config. + +t_get_history(_) -> + Now = ?NOW, + Each = fun(I) -> + ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), + ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), + type = average, + last_update_time = Now}) + end, + + lists:foreach(Each, lists:seq(1, 5)), + + {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", + auth_header_()), + #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), + + RFirst = #{<<"clientid">> => <<"test_5">>, + <<"latency">> => 5, + <<"type">> => <<"average">>, + <<"last_update_time">> => Now}, + + ?assertEqual(RFirst, First). + +t_clear(_) -> + ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), + type = average, + last_update_time = ?NOW}), + + {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], + auth_header_()), + + ?assertEqual(0, ets:info(?TOPK_TAB, size)). + +t_settting(_) -> + Conf = emqx:get_config([emqx_slow_subs]), + Conf2 = Conf#{threshold => 1000}, + {ok, Data} = request_api(put, + api_path(["slow_subscriptions", "settings"]), + [], + auth_header_(), + Conf2), + + Return = decode_json(Data), + + ?assertEqual(Conf2, Return), + + {ok, GetData} = request_api(get, + api_path(["slow_subscriptions", "settings"]), + [], + auth_header_() + ), + + GetReturn = decode_json(GetData), + + ?assertEqual(Conf2, GetReturn), + + ?assertEqual(1000, + emqx_message_latency_stats:get_threshold()). + +decode_json(Data) -> + BinJosn = emqx_json:decode(Data, [return_maps]), + emqx_map_lib:unsafe_atom_key_map(BinJosn). + +request_api(Method, Url, Auth) -> + request_api(Method, Url, [], Auth, []). + +request_api(Method, Url, QueryParams, Auth) -> + request_api(Method, Url, QueryParams, Auth, []). + +request_api(Method, Url, QueryParams, Auth, []) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth]}); +request_api(Method, Url, QueryParams, Auth, Body) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). + +do_request_api(Method, Request)-> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } + when Code =:= 200 orelse Code =:= 204 -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +auth_header_() -> + AppId = <<"admin">>, + AppSecret = <<"public">>, + auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User,":",Pass])), + {"Authorization","Basic " ++ Encoded}. + +api_path(Parts)-> + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). diff --git a/rebar.config.erl b/rebar.config.erl index 2beabd147..a97d77333 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -305,6 +305,7 @@ relx_apps(ReleaseType, Edition) -> , emqx_statsd , emqx_prometheus , emqx_psk + , emqx_slow_subs ] ++ [quicer || is_quicer_supported()] %++ [emqx_license || is_enterprise(Edition)]