feat: port slow subs from v4.4
This commit is contained in:
parent
50859bbd83
commit
0c48bd92db
|
@ -134,10 +134,12 @@
|
||||||
latency_stats :: emqx_message_latency_stats:stats()
|
latency_stats :: emqx_message_latency_stats:stats()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% in the previous code, we will replace the message record with the pubrel atom
|
|
||||||
%% in the pubrec function, this will lose the creation time of the message,
|
-type inflight_data_phase() :: wait_ack | wait_comp.
|
||||||
%% but now we need this time to calculate latency, so now pubrel atom is changed to this record
|
|
||||||
-record(pubrel_await, {timestamp :: non_neg_integer()}).
|
-record(inflight_data, { phase :: inflight_data_phase()
|
||||||
|
, message :: emqx_types:message()
|
||||||
|
, timestamp :: non_neg_integer()}).
|
||||||
|
|
||||||
-type(session() :: #session{}).
|
-type(session() :: #session{}).
|
||||||
|
|
||||||
|
@ -168,7 +170,6 @@
|
||||||
, next_pkt_id
|
, next_pkt_id
|
||||||
, awaiting_rel_cnt
|
, awaiting_rel_cnt
|
||||||
, awaiting_rel_max
|
, awaiting_rel_max
|
||||||
, latency_stats
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(DEFAULT_BATCH_N, 1000).
|
-define(DEFAULT_BATCH_N, 1000).
|
||||||
|
@ -380,11 +381,11 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
|
||||||
| {error, emqx_types:reason_code()}).
|
| {error, emqx_types:reason_code()}).
|
||||||
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, #inflight_data{phase = wait_ack, message = Msg}} ->
|
||||||
|
on_delivery_completed(Msg, Session),
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
Session2 = update_latency(Msg, Session),
|
return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
|
||||||
return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1}));
|
{value, _} ->
|
||||||
{value, {_Pubrel, _Ts}} ->
|
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
||||||
none ->
|
none ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
@ -404,11 +405,11 @@ return_with(Msg, {ok, Publishes, Session}) ->
|
||||||
| {error, emqx_types:reason_code()}).
|
| {error, emqx_types:reason_code()}).
|
||||||
pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, #inflight_data{phase = wait_ack, message = Msg} = Data} ->
|
||||||
Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}),
|
Update = Data#inflight_data{phase = wait_comp},
|
||||||
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
||||||
{ok, Msg, Session#session{inflight = Inflight1}};
|
{ok, Msg, Session#session{inflight = Inflight1}};
|
||||||
{value, {_Pubrel, _Ts}} ->
|
{value, _} ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
||||||
none ->
|
none ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
||||||
|
@ -437,10 +438,10 @@ pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
|
||||||
| {error, emqx_types:reason_code()}).
|
| {error, emqx_types:reason_code()}).
|
||||||
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) ->
|
{value, #inflight_data{phase = wait_comp, message = Msg}} ->
|
||||||
Session2 = update_latency(Pubrel, Session),
|
on_delivery_completed(Msg, Session),
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
dequeue(ClientInfo, Session2#session{inflight = Inflight1});
|
dequeue(ClientInfo, Session#session{inflight = Inflight1});
|
||||||
{value, _Other} ->
|
{value, _Other} ->
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
||||||
none ->
|
none ->
|
||||||
|
@ -504,6 +505,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
|
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
|
||||||
|
on_delivery_completed(Msg, Session), %
|
||||||
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
||||||
|
|
||||||
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
||||||
|
@ -518,7 +520,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
||||||
{ok, Session1};
|
{ok, Session1};
|
||||||
false ->
|
false ->
|
||||||
Publish = {PacketId, maybe_ack(Msg)},
|
Publish = {PacketId, maybe_ack(Msg)},
|
||||||
Session1 = await(PacketId, Msg, Session),
|
Msg2 = mark_begin_deliver(Msg),
|
||||||
|
Session1 = await(PacketId, Msg2, Session),
|
||||||
{ok, [Publish], next_pkt_id(Session1)}
|
{ok, [Publish], next_pkt_id(Session1)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -621,29 +624,29 @@ retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = Retry
|
||||||
true -> {ok, Session};
|
true -> {ok, Session};
|
||||||
false ->
|
false ->
|
||||||
Now = erlang:system_time(millisecond),
|
Now = erlang:system_time(millisecond),
|
||||||
Session2 = check_expire_latency(Now, RetryInterval, Session),
|
retry_delivery(emqx_inflight:to_list(fun sort_fun/2, Inflight),
|
||||||
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now,
|
[], Now, Session, ClientInfo)
|
||||||
Session2, ClientInfo)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
|
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
|
||||||
{ok, lists:reverse(Acc), Interval, Session};
|
{ok, lists:reverse(Acc), Interval, Session};
|
||||||
|
|
||||||
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
|
retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More],
|
||||||
#session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
|
Acc, Now, Session = #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) ->
|
||||||
case (Age = age(Now, Ts)) >= Interval of
|
case (Age = age(Now, Ts)) >= Interval of
|
||||||
true ->
|
true ->
|
||||||
{Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo),
|
{Acc1, Inflight1} = do_retry_delivery(PacketId, Data, Now, Acc, Inflight, ClientInfo),
|
||||||
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
|
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo);
|
||||||
false ->
|
false ->
|
||||||
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) ->
|
do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) ->
|
||||||
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
|
Update = Data#inflight_data{timestamp = Now},
|
||||||
|
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
||||||
{[{pubrel, PacketId} | Acc], Inflight1};
|
{[{pubrel, PacketId} | Acc], Inflight1};
|
||||||
|
|
||||||
do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) ->
|
do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight, ClientInfo) ->
|
||||||
case emqx_message:is_expired(Msg) of
|
case emqx_message:is_expired(Msg) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
||||||
|
@ -651,7 +654,8 @@ do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) ->
|
||||||
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
||||||
false ->
|
false ->
|
||||||
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
||||||
Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
|
Update = Data#inflight_data{message = Msg1, timestamp = Now},
|
||||||
|
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
|
||||||
{[{PacketId, Msg1} | Acc], Inflight1}
|
{[{PacketId, Msg1} | Acc], Inflight1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -697,9 +701,9 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
|
||||||
|
|
||||||
-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
|
-spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}).
|
||||||
replay(ClientInfo, Session = #session{inflight = Inflight}) ->
|
replay(ClientInfo, Session = #session{inflight = Inflight}) ->
|
||||||
Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) ->
|
Pubs = lists:map(fun({PacketId, #inflight_data{phase = wait_comp}}) ->
|
||||||
{pubrel, PacketId};
|
{pubrel, PacketId};
|
||||||
({PacketId, {Msg, _Ts}}) ->
|
({PacketId, #inflight_data{message = Msg}}) ->
|
||||||
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
||||||
end, emqx_inflight:to_list(Inflight)),
|
end, emqx_inflight:to_list(Inflight)),
|
||||||
case dequeue(ClientInfo, Session) of
|
case dequeue(ClientInfo, Session) of
|
||||||
|
@ -755,37 +759,23 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Message Latency Stats
|
%% Message Latency Stats
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
update_latency(Msg,
|
on_delivery_completed(Msg,
|
||||||
#session{clientid = ClientId,
|
#session{created_at = CreateAt, clientid = ClientId}) ->
|
||||||
latency_stats = Stats,
|
emqx:run_hook('delivery.completed',
|
||||||
created_at = CreateAt} = S) ->
|
[Msg,
|
||||||
case get_birth_timestamp(Msg, CreateAt) of
|
#{session_birth_time => CreateAt, clientid => ClientId}]).
|
||||||
0 -> S;
|
|
||||||
Ts ->
|
|
||||||
Latency = erlang:system_time(millisecond) - Ts,
|
|
||||||
Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats),
|
|
||||||
S#session{latency_stats = Stats2}
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_expire_latency(Now, Interval,
|
mark_begin_deliver(Msg) ->
|
||||||
#session{clientid = ClientId, latency_stats = Stats} = S) ->
|
emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).
|
||||||
Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats),
|
|
||||||
S#session{latency_stats = Stats2}.
|
|
||||||
|
|
||||||
get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
|
|
||||||
Ts;
|
|
||||||
|
|
||||||
get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
|
|
||||||
Ts;
|
|
||||||
|
|
||||||
get_birth_timestamp(_, _) ->
|
|
||||||
0.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
sort_fun() ->
|
|
||||||
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end.
|
-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}).
|
||||||
|
|
||||||
|
sort_fun(A, B) ->
|
||||||
|
A#inflight_data.timestamp =< B#inflight_data.timestamp.
|
||||||
|
|
||||||
batch_n(Inflight) ->
|
batch_n(Inflight) ->
|
||||||
case emqx_inflight:max_size(Inflight) of
|
case emqx_inflight:max_size(Inflight) of
|
||||||
|
@ -794,7 +784,9 @@ batch_n(Inflight) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
with_ts(Msg) ->
|
with_ts(Msg) ->
|
||||||
{Msg, erlang:system_time(millisecond)}.
|
#inflight_data{phase = wait_ack,
|
||||||
|
message = Msg,
|
||||||
|
timestamp = erlang:system_time(millisecond)}.
|
||||||
|
|
||||||
age(Now, Ts) -> Now - Ts.
|
age(Now, Ts) -> Now - Ts.
|
||||||
|
|
||||||
|
|
|
@ -1,120 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_message_latency_stats).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([new/1, update/3, check_expire/4, latency/1]).
|
|
||||||
|
|
||||||
-export([get_threshold/0, update_threshold/1]).
|
|
||||||
|
|
||||||
-define(NOW, erlang:system_time(millisecond)).
|
|
||||||
-define(MINIMUM_INSERT_INTERVAL, 1000).
|
|
||||||
-define(MINIMUM_THRESHOLD, 100).
|
|
||||||
-define(DEFAULT_THRESHOLD, 500).
|
|
||||||
-define(DEFAULT_SAMPLES, 10).
|
|
||||||
-define(THRESHOLD_KEY, {?MODULE, threshold}).
|
|
||||||
|
|
||||||
-opaque stats() :: #{ ema := emqx_moving_average:ema()
|
|
||||||
, last_update_time := timestamp()
|
|
||||||
, last_access_time := timestamp() %% timestamp of last access top-k
|
|
||||||
, last_insert_value := non_neg_integer()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type timestamp() :: non_neg_integer().
|
|
||||||
-type timespan() :: number().
|
|
||||||
|
|
||||||
-type latency_type() :: average
|
|
||||||
| expire.
|
|
||||||
|
|
||||||
-type create_options() :: #{samples => pos_integer()}.
|
|
||||||
|
|
||||||
-export_type([stats/0, latency_type/0, create_options/0]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-spec new(non_neg_integer() | create_options()) -> stats().
|
|
||||||
new(SamplesT) when is_integer(SamplesT) ->
|
|
||||||
Samples = erlang:max(1, SamplesT),
|
|
||||||
#{ ema => emqx_moving_average:new(exponential, #{period => Samples})
|
|
||||||
, last_update_time => 0
|
|
||||||
, last_access_time => 0
|
|
||||||
, last_insert_value => 0
|
|
||||||
};
|
|
||||||
|
|
||||||
new(OptsT) ->
|
|
||||||
Opts = maps:merge(#{samples => ?DEFAULT_SAMPLES}, OptsT),
|
|
||||||
#{samples := Samples} = Opts,
|
|
||||||
new(Samples).
|
|
||||||
|
|
||||||
-spec update(emqx_types:clientid(), number(), stats()) -> stats().
|
|
||||||
update(ClientId, Val, #{ema := EMA} = Stats) ->
|
|
||||||
Now = ?NOW,
|
|
||||||
#{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA),
|
|
||||||
Stats2 = call_hook(ClientId, Now, average, Latency, Stats),
|
|
||||||
Stats2#{ ema := EMA2
|
|
||||||
, last_update_time := ?NOW}.
|
|
||||||
|
|
||||||
-spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats().
|
|
||||||
check_expire(_, Now, Interval, #{last_update_time := LUT} = S)
|
|
||||||
when LUT >= Now - Interval ->
|
|
||||||
S;
|
|
||||||
|
|
||||||
check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
|
|
||||||
Latency = Now - LUT,
|
|
||||||
call_hook(ClientId, Now, expire, Latency, S).
|
|
||||||
|
|
||||||
-spec latency(stats()) -> number().
|
|
||||||
latency(#{ema := #{average := Average}}) ->
|
|
||||||
Average.
|
|
||||||
|
|
||||||
-spec update_threshold(pos_integer()) -> pos_integer().
|
|
||||||
update_threshold(Threshold) ->
|
|
||||||
Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD),
|
|
||||||
persistent_term:put(?THRESHOLD_KEY, Val),
|
|
||||||
Val.
|
|
||||||
|
|
||||||
get_threshold() ->
|
|
||||||
persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
|
|
||||||
call_hook(_, _, _, Latency, S)
|
|
||||||
when Latency =< ?MINIMUM_THRESHOLD ->
|
|
||||||
S;
|
|
||||||
|
|
||||||
call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
|
|
||||||
when Now =< LIT + ?MINIMUM_INSERT_INTERVAL ->
|
|
||||||
S;
|
|
||||||
|
|
||||||
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
|
|
||||||
case Latency =< get_threshold() of
|
|
||||||
true ->
|
|
||||||
Stats#{last_access_time := Now};
|
|
||||||
_ ->
|
|
||||||
ToInsert = erlang:floor(Latency),
|
|
||||||
Arg = #{clientid => ClientId,
|
|
||||||
latency => ToInsert,
|
|
||||||
type => Type,
|
|
||||||
last_insert_value => LIV,
|
|
||||||
update_time => Now},
|
|
||||||
emqx:run_hook('message.slow_subs_stats', [Arg]),
|
|
||||||
Stats#{last_insert_value := ToInsert,
|
|
||||||
last_access_time := Now}
|
|
||||||
end.
|
|
|
@ -1,90 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @see https://en.wikipedia.org/wiki/Moving_average
|
|
||||||
|
|
||||||
-module(emqx_moving_average).
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([new/0, new/1, new/2, update/2]).
|
|
||||||
|
|
||||||
-type type() :: cumulative
|
|
||||||
| exponential.
|
|
||||||
|
|
||||||
-type ema() :: #{ type := exponential
|
|
||||||
, average := 0 | float()
|
|
||||||
, coefficient := float()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type cma() :: #{ type := cumulative
|
|
||||||
, average := 0 | float()
|
|
||||||
, count := non_neg_integer()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type moving_average() :: ema()
|
|
||||||
| cma().
|
|
||||||
|
|
||||||
-define(DEF_EMA_ARG, #{period => 10}).
|
|
||||||
-define(DEF_AVG_TYPE, exponential).
|
|
||||||
|
|
||||||
-export_type([type/0, moving_average/0, ema/0, cma/0]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
-spec new() -> moving_average().
|
|
||||||
new() ->
|
|
||||||
new(?DEF_AVG_TYPE, #{}).
|
|
||||||
|
|
||||||
-spec new(type()) -> moving_average().
|
|
||||||
new(Type) ->
|
|
||||||
new(Type, #{}).
|
|
||||||
|
|
||||||
-spec new(type(), Args :: map()) -> moving_average().
|
|
||||||
new(cumulative, _) ->
|
|
||||||
#{ type => cumulative
|
|
||||||
, average => 0
|
|
||||||
, count => 0
|
|
||||||
};
|
|
||||||
|
|
||||||
new(exponential, Arg) ->
|
|
||||||
#{period := Period} = maps:merge(?DEF_EMA_ARG, Arg),
|
|
||||||
#{ type => exponential
|
|
||||||
, average => 0
|
|
||||||
%% coefficient = 2/(N+1) is a common convention, see the wiki link for details
|
|
||||||
, coefficient => 2 / (Period + 1)
|
|
||||||
}.
|
|
||||||
|
|
||||||
-spec update(number(), moving_average()) -> moving_average().
|
|
||||||
|
|
||||||
update(Val, #{average := 0} = Avg) ->
|
|
||||||
Avg#{average := Val};
|
|
||||||
|
|
||||||
update(Val, #{ type := cumulative
|
|
||||||
, average := Average
|
|
||||||
, count := Count} = CMA) ->
|
|
||||||
NewCount = Count + 1,
|
|
||||||
CMA#{average := (Count * Average + Val) / NewCount,
|
|
||||||
count := NewCount};
|
|
||||||
|
|
||||||
update(Val, #{ type := exponential
|
|
||||||
, average := Average
|
|
||||||
, coefficient := Coefficient} = EMA) ->
|
|
||||||
EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
|
@ -20,21 +20,16 @@ slow_subs {
|
||||||
## Value: 10
|
## Value: 10
|
||||||
top_k_num = 10
|
top_k_num = 10
|
||||||
|
|
||||||
## The interval for pushing statistics table records to the system topic. When set to 0, push is disabled
|
## The ways to calculate the latency are as follows:
|
||||||
## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval
|
|
||||||
## publish is disabled if set to 0s.
|
|
||||||
##
|
##
|
||||||
## Value: 0s
|
## 1. whole
|
||||||
notice_interval = 0s
|
## From the time the message arrives at EMQX until the message completes transmission
|
||||||
|
|
||||||
## QoS of notification message
|
|
||||||
##
|
##
|
||||||
## Default: 0
|
## 2.internal
|
||||||
notice_qos = 0
|
## From when the message arrives at EMQX until when EMQX starts delivering the message
|
||||||
|
|
||||||
## Maximum information number in one notification
|
|
||||||
##
|
##
|
||||||
## Default: 100
|
## 3.response
|
||||||
notice_batch_size = 100
|
## From the time EMQX starts delivering the message, until the message completes transmission
|
||||||
|
## Default: whole
|
||||||
|
stats_type = whole
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,16 +15,24 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-define(TOPK_TAB, emqx_slow_subs_topk).
|
-define(TOPK_TAB, emqx_slow_subs_topk).
|
||||||
|
-define(INDEX_TAB, emqx_slow_subs_index).
|
||||||
|
|
||||||
-define(INDEX(Latency, ClientId), {Latency, ClientId}).
|
-define(ID(ClientId, Topic), {ClientId, Topic}).
|
||||||
|
-define(INDEX(TimeSpan, Id), {Id, TimeSpan}).
|
||||||
|
-define(TOPK_INDEX(TimeSpan, Id), {TimeSpan, Id}).
|
||||||
|
|
||||||
-define(MAX_TAB_SIZE, 1000).
|
-define(MAX_SIZE, 1000).
|
||||||
|
|
||||||
-record(top_k, { index :: index()
|
-record(top_k, { index :: topk_index()
|
||||||
, type :: emqx_message_latency_stats:latency_type()
|
|
||||||
, last_update_time :: pos_integer()
|
, last_update_time :: pos_integer()
|
||||||
, extra = []
|
, extra = []
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-record(index_tab, { index :: index()}).
|
||||||
|
|
||||||
-type top_k() :: #top_k{}.
|
-type top_k() :: #top_k{}.
|
||||||
-type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()).
|
-type index_tab() :: #index_tab{}.
|
||||||
|
|
||||||
|
-type id() :: {emqx_types:clientid(), emqx_types:topic()}.
|
||||||
|
-type index() :: ?INDEX(non_neg_integer(), id()).
|
||||||
|
-type topk_index() :: ?TOPK_INDEX(non_neg_integer(), id()).
|
||||||
|
|
|
@ -22,8 +22,8 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
|
||||||
|
|
||||||
-export([ start_link/0, on_stats_update/2, update_settings/1
|
-export([ start_link/0, on_delivery_completed/4, update_settings/1
|
||||||
, clear_history/0, init_topk_tab/0, post_config_update/5
|
, clear_history/0, init_tab/0, post_config_update/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -40,29 +40,19 @@
|
||||||
-type state() :: #{ enable := boolean()
|
-type state() :: #{ enable := boolean()
|
||||||
, last_tick_at := pos_integer()
|
, last_tick_at := pos_integer()
|
||||||
, expire_timer := undefined | reference()
|
, expire_timer := undefined | reference()
|
||||||
, notice_timer := undefined | reference()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type log() :: #{ rank := pos_integer()
|
|
||||||
, clientid := emqx_types:clientid()
|
|
||||||
, latency := non_neg_integer()
|
|
||||||
, type := emqx_message_latency_stats:latency_type()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type window_log() :: #{ last_tick_at := pos_integer()
|
|
||||||
, logs := [log()]
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-type message() :: #message{}.
|
-type message() :: #message{}.
|
||||||
|
|
||||||
-type stats_update_args() :: #{ clientid := emqx_types:clientid()
|
-type stats_type() :: whole %% whole = internal + response
|
||||||
, latency := non_neg_integer()
|
| internal %% timespan from message in to deliver
|
||||||
, type := emqx_message_latency_stats:latency_type()
|
| response. %% timespan from delivery to client response
|
||||||
, last_insert_value := non_neg_integer()
|
|
||||||
, update_time := timer:time()
|
|
||||||
}.
|
|
||||||
|
|
||||||
-type stats_update_env() :: #{max_size := pos_integer()}.
|
-type stats_update_args() :: #{session_birth_time := pos_integer()}.
|
||||||
|
|
||||||
|
-type stats_update_env() :: #{ threshold := non_neg_integer()
|
||||||
|
, stats_type := stats_type()
|
||||||
|
, max_size := pos_integer()}.
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
|
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
|
||||||
|
@ -71,7 +61,6 @@
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(NOW, erlang:system_time(millisecond)).
|
-define(NOW, erlang:system_time(millisecond)).
|
||||||
-define(NOTICE_TOPIC_NAME, "slow_subs").
|
|
||||||
-define(DEF_CALL_TIMEOUT, timer:seconds(10)).
|
-define(DEF_CALL_TIMEOUT, timer:seconds(10)).
|
||||||
|
|
||||||
%% erlang term order
|
%% erlang term order
|
||||||
|
@ -88,37 +77,32 @@
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
%% XXX NOTE:pay attention to the performance here
|
on_delivery_completed(_ClientInfo,
|
||||||
-spec on_stats_update(stats_update_args(), stats_update_env()) -> true.
|
#message{timestamp = Ts},
|
||||||
on_stats_update(#{clientid := ClientId,
|
#{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime ->
|
||||||
latency := Latency,
|
ok;
|
||||||
type := Type,
|
|
||||||
last_insert_value := LIV,
|
|
||||||
update_time := Ts},
|
|
||||||
#{max_size := MaxSize}) ->
|
|
||||||
|
|
||||||
LastIndex = ?INDEX(LIV, ClientId),
|
on_delivery_completed(ClientInfo, Msg, Env, Cfg) ->
|
||||||
Index = ?INDEX(Latency, ClientId),
|
on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg).
|
||||||
|
|
||||||
%% check whether the client is in the table
|
on_delivery_completed(#{clientid := ClientId},
|
||||||
case ets:lookup(?TOPK_TAB, LastIndex) of
|
#message{topic = Topic} = Msg,
|
||||||
[#top_k{index = Index}] ->
|
_Env,
|
||||||
%% if last value == the new value, update the type and last_update_time
|
Now,
|
||||||
%% XXX for clients whose latency are stable for a long time, is it
|
#{threshold := Threshold,
|
||||||
%% possible to reduce updates?
|
stats_type := StatsType,
|
||||||
ets:insert(?TOPK_TAB,
|
max_size := MaxSize}) ->
|
||||||
#top_k{index = Index, type = Type, last_update_time = Ts});
|
TimeSpan = calc_timespan(StatsType, Msg, Now),
|
||||||
[_] ->
|
case TimeSpan =< Threshold of
|
||||||
%% if Latency > minimum value, we should update it
|
true -> ok;
|
||||||
%% if Latency < minimum value, maybe it can replace the minimum value
|
_ ->
|
||||||
%% so always update at here
|
Id = ?ID(ClientId, Topic),
|
||||||
%% do we need check if Latency == minimum ???
|
LastUpdateValue = find_last_update_value(Id),
|
||||||
ets:insert(?TOPK_TAB,
|
case TimeSpan =< LastUpdateValue of
|
||||||
#top_k{index = Index, type = Type, last_update_time = Ts}),
|
true -> ok;
|
||||||
ets:delete(?TOPK_TAB, LastIndex);
|
_ ->
|
||||||
[] ->
|
try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
|
||||||
%% try to insert
|
end
|
||||||
try_insert_to_topk(MaxSize, Index, Latency, Type, Ts)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clear_history() ->
|
clear_history() ->
|
||||||
|
@ -127,21 +111,20 @@ clear_history() ->
|
||||||
update_settings(Conf) ->
|
update_settings(Conf) ->
|
||||||
emqx_conf:update([slow_subs], Conf, #{override_to => cluster}).
|
emqx_conf:update([slow_subs], Conf, #{override_to => cluster}).
|
||||||
|
|
||||||
init_topk_tab() ->
|
|
||||||
case ets:whereis(?TOPK_TAB) of
|
|
||||||
undefined ->
|
|
||||||
?TOPK_TAB = ets:new(?TOPK_TAB,
|
|
||||||
[ ordered_set, public, named_table
|
|
||||||
, {keypos, #top_k.index}, {write_concurrency, true}
|
|
||||||
, {read_concurrency, true}
|
|
||||||
]);
|
|
||||||
_ ->
|
|
||||||
?TOPK_TAB
|
|
||||||
end.
|
|
||||||
|
|
||||||
post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
|
post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
|
||||||
gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
|
gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
|
||||||
|
|
||||||
|
init_tab() ->
|
||||||
|
safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table
|
||||||
|
, {keypos, #top_k.index}, {write_concurrency, true}
|
||||||
|
, {read_concurrency, true}
|
||||||
|
]),
|
||||||
|
|
||||||
|
safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table
|
||||||
|
, {keypos, #index_tab.index}, {write_concurrency, true}
|
||||||
|
, {read_concurrency, true}
|
||||||
|
]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -151,8 +134,7 @@ init([]) ->
|
||||||
|
|
||||||
InitState = #{enable => false,
|
InitState = #{enable => false,
|
||||||
last_tick_at => 0,
|
last_tick_at => 0,
|
||||||
expire_timer => undefined,
|
expire_timer => undefined
|
||||||
notice_timer => undefined
|
|
||||||
},
|
},
|
||||||
|
|
||||||
Enable = emqx:get_config([slow_subs, enable]),
|
Enable = emqx:get_config([slow_subs, enable]),
|
||||||
|
@ -164,7 +146,7 @@ handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
|
||||||
{reply, ok, State2};
|
{reply, ok, State2};
|
||||||
|
|
||||||
handle_call(clear_history, _, State) ->
|
handle_call(clear_history, _, State) ->
|
||||||
ets:delete_all_objects(?TOPK_TAB),
|
do_clear_history(),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -181,12 +163,6 @@ handle_info(expire_tick, State) ->
|
||||||
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
||||||
{noreply, State1};
|
{noreply, State1};
|
||||||
|
|
||||||
handle_info(notice_tick, State) ->
|
|
||||||
Logs = ets:tab2list(?TOPK_TAB),
|
|
||||||
do_notification(Logs, State),
|
|
||||||
State1 = start_timer(notice_timer, fun notice_tick/0, State),
|
|
||||||
{noreply, State1#{last_tick_at := ?NOW}};
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -204,123 +180,127 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
expire_tick() ->
|
expire_tick() ->
|
||||||
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
|
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
|
||||||
|
|
||||||
notice_tick() ->
|
|
||||||
case emqx:get_config([slow_subs, notice_interval]) of
|
|
||||||
0 -> undefined;
|
|
||||||
Interval ->
|
|
||||||
erlang:send_after(Interval, self(), ?FUNCTION_NAME)
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec do_notification(list(), state()) -> ok.
|
|
||||||
do_notification([], _) ->
|
|
||||||
ok;
|
|
||||||
|
|
||||||
do_notification(Logs, #{last_tick_at := LastTickTime}) ->
|
|
||||||
start_publish(Logs, LastTickTime),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
start_publish(Logs, TickTime) ->
|
|
||||||
emqx_pool:async_submit({fun do_publish/3, [Logs, erlang:length(Logs), TickTime]}).
|
|
||||||
|
|
||||||
do_publish([], _, _) ->
|
|
||||||
ok;
|
|
||||||
|
|
||||||
do_publish(Logs, Rank, TickTime) ->
|
|
||||||
BatchSize = emqx:get_config([slow_subs, notice_batch_size]),
|
|
||||||
do_publish(Logs, BatchSize, Rank, TickTime, []).
|
|
||||||
|
|
||||||
do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 ->
|
|
||||||
Cache2 = [convert_to_notice(Rank, Log) | Cache],
|
|
||||||
do_publish(T, Size - 1, Rank - 1, TickTime, Cache2);
|
|
||||||
|
|
||||||
do_publish(Logs, Size, Rank, TickTime, Cache) when Size =:= 0 ->
|
|
||||||
publish(TickTime, Cache),
|
|
||||||
do_publish(Logs, Rank, TickTime);
|
|
||||||
|
|
||||||
do_publish([], _, _Rank, TickTime, Cache) ->
|
|
||||||
publish(TickTime, Cache),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId),
|
|
||||||
type = Type,
|
|
||||||
last_update_time = Ts}) ->
|
|
||||||
#{rank => Rank,
|
|
||||||
clientid => ClientId,
|
|
||||||
latency => Latency,
|
|
||||||
type => Type,
|
|
||||||
timestamp => Ts}.
|
|
||||||
|
|
||||||
publish(TickTime, Notices) ->
|
|
||||||
WindowLog = #{last_tick_at => TickTime,
|
|
||||||
logs => lists:reverse(Notices)},
|
|
||||||
Payload = emqx_json:encode(WindowLog),
|
|
||||||
Msg = #message{ id = emqx_guid:gen()
|
|
||||||
, qos = emqx:get_config([slow_subs, notice_qos])
|
|
||||||
, from = ?MODULE
|
|
||||||
, topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
|
|
||||||
, payload = Payload
|
|
||||||
, timestamp = ?NOW
|
|
||||||
},
|
|
||||||
_ = emqx_broker:safe_publish(Msg),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
load(State) ->
|
load(State) ->
|
||||||
MaxSizeT = emqx:get_config([slow_subs, top_k_num]),
|
#{top_k_num := MaxSizeT,
|
||||||
MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
|
stats_type := StatsType,
|
||||||
_ = emqx:hook('message.slow_subs_stats',
|
threshold := Threshold} = emqx:get_config([slow_subs]),
|
||||||
{?MODULE, on_stats_update, [#{max_size => MaxSize}]}
|
MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE),
|
||||||
),
|
_ = emqx:hook('delivery.completed',
|
||||||
|
{?MODULE, on_delivery_completed,
|
||||||
|
[#{max_size => MaxSize,
|
||||||
|
stats_type => StatsType,
|
||||||
|
threshold => Threshold
|
||||||
|
}]}),
|
||||||
|
|
||||||
State1 = start_timer(notice_timer, fun notice_tick/0, State),
|
State1 = start_timer(expire_timer, fun expire_tick/0, State),
|
||||||
State2 = start_timer(expire_timer, fun expire_tick/0, State1),
|
State1#{enable := true, last_tick_at => ?NOW}.
|
||||||
State2#{enable := true, last_tick_at => ?NOW}.
|
|
||||||
|
|
||||||
|
unload(#{expire_timer := ExpireTimer} = State) ->
|
||||||
unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) ->
|
emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}),
|
||||||
emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}),
|
State#{expire_timer := cancel_timer(ExpireTimer)}.
|
||||||
State#{notice_timer := cancel_timer(NoticeTimer),
|
|
||||||
expire_timer := cancel_timer(ExpireTimer)
|
|
||||||
}.
|
|
||||||
|
|
||||||
do_clear(Logs) ->
|
do_clear(Logs) ->
|
||||||
Now = ?NOW,
|
Now = ?NOW,
|
||||||
Interval = emqx:get_config([slow_subs, expire_interval]),
|
Interval = emqx:get_config([slow_subs, expire_interval]),
|
||||||
Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
|
Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) ->
|
||||||
case Now - Ts >= Interval of
|
case Now - Ts >= Interval of
|
||||||
true ->
|
true ->
|
||||||
ets:delete(?TOPK_TAB, Index);
|
delete_with_index(TimeSpan, Id);
|
||||||
_ ->
|
_ ->
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lists:foreach(Each, Logs).
|
lists:foreach(Each, Logs).
|
||||||
|
|
||||||
try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) ->
|
-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer().
|
||||||
|
calc_timespan(whole, #message{timestamp = Ts}, Now) ->
|
||||||
|
Now - Ts;
|
||||||
|
|
||||||
|
calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) ->
|
||||||
|
End = emqx_message:get_header(deliver_begin_at, Msg, Now),
|
||||||
|
End - Ts;
|
||||||
|
|
||||||
|
calc_timespan(response, Msg, Now) ->
|
||||||
|
Begin = emqx_message:get_header(deliver_begin_at, Msg, Now),
|
||||||
|
Now - Begin.
|
||||||
|
|
||||||
|
%% update_topk is safe, because each process has a unique clientid
|
||||||
|
%% insert or delete are bind to this clientid, so there is no race condition
|
||||||
|
%%
|
||||||
|
%% but, the delete_with_index in L249 may have a race condition
|
||||||
|
%% because the data belong to other clientid will be deleted here
|
||||||
|
%% (deleted the data written by other processes).
|
||||||
|
%% so it may appear that:
|
||||||
|
%% when deleting a record, the other process is performing an update operation on this recrod
|
||||||
|
%% in order to solve this race condition problem, the index table also uses the ordered_set type,
|
||||||
|
%% so that even if the above situation occurs, it will only cause the old data to be deleted twice
|
||||||
|
%% and the correctness of the data will not be affected
|
||||||
|
|
||||||
|
try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
case ets:info(?TOPK_TAB, size) of
|
case ets:info(?TOPK_TAB, size) of
|
||||||
Size when Size < MaxSize ->
|
Size when Size < MaxSize ->
|
||||||
%% if the size is under limit, insert it directly
|
update_topk(Now, LastUpdateValue, TimeSpan, Id);
|
||||||
ets:insert(?TOPK_TAB,
|
|
||||||
#top_k{index = Index, type = Type, last_update_time = Ts});
|
|
||||||
_Size ->
|
_Size ->
|
||||||
%% find the minimum value
|
|
||||||
?INDEX(Min, _) = First =
|
|
||||||
case ets:first(?TOPK_TAB) of
|
case ets:first(?TOPK_TAB) of
|
||||||
?INDEX(_, _) = I -> I;
|
'$end_of_table' ->
|
||||||
_ -> ?INDEX(Latency - 1, <<>>)
|
update_topk(Now, LastUpdateValue, TimeSpan, Id);
|
||||||
end,
|
?TOPK_INDEX(_, Id) ->
|
||||||
|
update_topk(Now, LastUpdateValue, TimeSpan, Id);
|
||||||
case Latency =< Min of
|
?TOPK_INDEX(Min, MinId) ->
|
||||||
true -> true;
|
case TimeSpan =< Min of
|
||||||
|
true -> false;
|
||||||
_ ->
|
_ ->
|
||||||
ets:insert(?TOPK_TAB,
|
update_topk(Now, LastUpdateValue, TimeSpan, Id),
|
||||||
#top_k{index = Index, type = Type, last_update_time = Ts}),
|
delete_with_index(Min, MinId)
|
||||||
|
end
|
||||||
ets:delete(?TOPK_TAB, First)
|
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-spec find_last_update_value(id()) -> non_neg_integer().
|
||||||
|
find_last_update_value(Id) ->
|
||||||
|
case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of
|
||||||
|
?INDEX(LastUpdateValue, Id) ->
|
||||||
|
LastUpdateValue;
|
||||||
|
_ ->
|
||||||
|
0
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true.
|
||||||
|
update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
|
%% update record
|
||||||
|
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id),
|
||||||
|
last_update_time = Now,
|
||||||
|
extra = []
|
||||||
|
}),
|
||||||
|
|
||||||
|
%% update index
|
||||||
|
ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}),
|
||||||
|
|
||||||
|
%% delete the old record & index
|
||||||
|
delete_with_index(LastUpdateValue, Id).
|
||||||
|
|
||||||
|
-spec delete_with_index(non_neg_integer(), id()) -> true.
|
||||||
|
delete_with_index(0, _) ->
|
||||||
|
true;
|
||||||
|
|
||||||
|
delete_with_index(TimeSpan, Id) ->
|
||||||
|
ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)),
|
||||||
|
ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).
|
||||||
|
|
||||||
|
safe_create_tab(Name, Opts) ->
|
||||||
|
case ets:whereis(Name) of
|
||||||
|
undefined ->
|
||||||
|
Name = ets:new(Name, Opts);
|
||||||
|
_ ->
|
||||||
|
Name
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_clear_history() ->
|
||||||
|
ets:delete_all_objects(?INDEX_TAB),
|
||||||
|
ets:delete_all_objects(?TOPK_TAB).
|
||||||
|
|
||||||
check_enable(Enable, #{enable := IsEnable} = State) ->
|
check_enable(Enable, #{enable := IsEnable} = State) ->
|
||||||
update_threshold(),
|
|
||||||
case Enable of
|
case Enable of
|
||||||
IsEnable ->
|
IsEnable ->
|
||||||
State;
|
State;
|
||||||
|
@ -330,11 +310,6 @@ check_enable(Enable, #{enable := IsEnable} = State) ->
|
||||||
unload(State)
|
unload(State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_threshold() ->
|
|
||||||
Threshold = emqx:get_config([slow_subs, threshold]),
|
|
||||||
emqx_message_latency_stats:update_threshold(Threshold),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
start_timer(Name, Fun, State) ->
|
start_timer(Name, Fun, State) ->
|
||||||
_ = cancel_timer(maps:get(Name, State)),
|
_ = cancel_timer(maps:get(Name, State)),
|
||||||
State#{Name := Fun()}.
|
State#{Name := Fun()}.
|
||||||
|
|
|
@ -23,14 +23,14 @@
|
||||||
|
|
||||||
-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
|
-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
|
||||||
|
|
||||||
-export([slow_subs/2, encode_record/1, settings/2]).
|
-export([slow_subs/2, get_history/0, settings/2]).
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, ref/1]).
|
-import(hoconsc, [mk/2, ref/1]).
|
||||||
-import(emqx_mgmt_util, [bad_request/0]).
|
-import(emqx_mgmt_util, [bad_request/0]).
|
||||||
|
|
||||||
-define(FORMAT_FUN, {?MODULE, encode_record}).
|
|
||||||
-define(APP, emqx_slow_subs).
|
-define(APP, emqx_slow_subs).
|
||||||
-define(APP_NAME, <<"emqx_slow_subs">>).
|
-define(APP_NAME, <<"emqx_slow_subs">>).
|
||||||
|
-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)).
|
||||||
|
|
||||||
namespace() -> "slow_subscribers_statistics".
|
namespace() -> "slow_subscribers_statistics".
|
||||||
|
|
||||||
|
@ -74,12 +74,13 @@ schema("/slow_subscriptions/settings") ->
|
||||||
fields(record) ->
|
fields(record) ->
|
||||||
[ {clientid,
|
[ {clientid,
|
||||||
mk(string(), #{desc => <<"the clientid">>})},
|
mk(string(), #{desc => <<"the clientid">>})},
|
||||||
{latency,
|
{node,
|
||||||
|
mk(string(), #{desc => <<"the node">>})},
|
||||||
|
{topic,
|
||||||
|
mk(string(), #{desc => <<"the topic">>})},
|
||||||
|
{timespan,
|
||||||
mk(integer(),
|
mk(integer(),
|
||||||
#{desc => <<"average time for message delivery or time for message expire">>})},
|
#{desc => <<"timespan for message transmission">>})},
|
||||||
{type,
|
|
||||||
mk(string(),
|
|
||||||
#{desc => <<"type of the latency, could be average or expire">>})},
|
|
||||||
{last_update_time,
|
{last_update_time,
|
||||||
mk(integer(), #{desc => <<"the timestamp of last update">>})}
|
mk(integer(), #{desc => <<"the timestamp of last update">>})}
|
||||||
].
|
].
|
||||||
|
@ -89,24 +90,49 @@ conf_schema() ->
|
||||||
hoconsc:mk(Ref, #{}).
|
hoconsc:mk(Ref, #{}).
|
||||||
|
|
||||||
slow_subs(delete, _) ->
|
slow_subs(delete, _) ->
|
||||||
ok = emqx_slow_subs:clear_history(),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
|
_ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT)
|
||||||
|
|| Node <- Nodes],
|
||||||
{204};
|
{204};
|
||||||
|
|
||||||
slow_subs(get, #{query_string := QST}) ->
|
slow_subs(get, _) ->
|
||||||
LimitT = maps:get(<<"limit">>, QST, ?MAX_TAB_SIZE),
|
Nodes = mria_mnesia:running_nodes(),
|
||||||
Limit = erlang:min(?MAX_TAB_SIZE, emqx_mgmt_api:b2i(LimitT)),
|
Fun = fun(Node, Acc) ->
|
||||||
Page = maps:get(<<"page">>, QST, 1),
|
NodeRankL = rpc_call(Node,
|
||||||
QS = QST#{<<"limit">> => Limit, <<"page">> => Page},
|
?MODULE,
|
||||||
Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN),
|
get_history,
|
||||||
{200, Data}.
|
[],
|
||||||
|
[],
|
||||||
|
?DEFAULT_RPC_TIMEOUT),
|
||||||
|
NodeRankL ++ Acc
|
||||||
|
end,
|
||||||
|
|
||||||
encode_record(#top_k{index = ?INDEX(Latency, ClientId),
|
RankL = lists:foldl(Fun, [], Nodes),
|
||||||
type = Type,
|
|
||||||
last_update_time = Ts}) ->
|
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
|
||||||
#{clientid => ClientId,
|
A > B
|
||||||
latency => Latency,
|
end,
|
||||||
type => Type,
|
|
||||||
last_update_time => Ts}.
|
SortedL = lists:sort(SortFun, RankL),
|
||||||
|
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
|
||||||
|
|
||||||
|
{200, SortedL2}.
|
||||||
|
|
||||||
|
get_history() ->
|
||||||
|
Node = node(),
|
||||||
|
RankL = ets:tab2list(?TOPK_TAB),
|
||||||
|
ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
|
||||||
|
last_update_time = LastUpdateTime
|
||||||
|
}) ->
|
||||||
|
#{ clientid => ClientId
|
||||||
|
, node => Node
|
||||||
|
, topic => Topic
|
||||||
|
, timespan => TimeSpan
|
||||||
|
, last_update_time => LastUpdateTime
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
|
||||||
|
lists:map(ConvFun, RankL).
|
||||||
|
|
||||||
settings(get, _) ->
|
settings(get, _) ->
|
||||||
{200, emqx:get_raw_config([slow_subs], #{})};
|
{200, emqx:get_raw_config([slow_subs], #{})};
|
||||||
|
@ -114,3 +140,12 @@ settings(get, _) ->
|
||||||
settings(put, #{body := Body}) ->
|
settings(put, #{body := Body}) ->
|
||||||
_ = emqx_slow_subs:update_settings(Body),
|
_ = emqx_slow_subs:update_settings(Body),
|
||||||
{200, emqx:get_raw_config([slow_subs], #{})}.
|
{200, emqx:get_raw_config([slow_subs], #{})}.
|
||||||
|
|
||||||
|
rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() ->
|
||||||
|
erlang:apply(M, F, A);
|
||||||
|
|
||||||
|
rpc_call(Node, M, F, A, ErrorR, T) ->
|
||||||
|
case rpc:call(Node, M, F, A, T) of
|
||||||
|
{badrpc, _} -> ErrorR;
|
||||||
|
Res -> Res
|
||||||
|
end.
|
||||||
|
|
|
@ -22,21 +22,10 @@ fields("slow_subs") ->
|
||||||
sc(integer(),
|
sc(integer(),
|
||||||
10,
|
10,
|
||||||
"The maximum number of records in the slow subscription statistics record table")}
|
"The maximum number of records in the slow subscription statistics record table")}
|
||||||
, {notice_interval,
|
, {stats_type,
|
||||||
sc(emqx_schema:duration_ms(),
|
sc(hoconsc:union([whole, internal, response]),
|
||||||
"0s",
|
whole,
|
||||||
"The interval for pushing statistics table records to the system topic. "
|
"The method to calculate the latency")}
|
||||||
"publish top-k list to $SYS/brokers/${node}/slow_subs per notice_interval. "
|
|
||||||
"publish is disabled if set to 0s."
|
|
||||||
)}
|
|
||||||
, {notice_qos,
|
|
||||||
sc(range(0, 2),
|
|
||||||
0,
|
|
||||||
"QoS of notification message in notice topic")}
|
|
||||||
, {notice_batch_size,
|
|
||||||
sc(integer(),
|
|
||||||
100,
|
|
||||||
"Maximum information number in one notification")}
|
|
||||||
].
|
].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue