fix(eqmx_st_statistics): add ignore_before_create in config (#6140)

1. allows not to process the message before the session is created
to solve the problem caused by clean session = false
2. fix some elvis errors
This commit is contained in:
lafirest 2021-11-12 14:00:48 +08:00 committed by GitHub
parent 7193cd4275
commit a4a7cac647
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 30 deletions

View File

@ -20,11 +20,11 @@
-include_lib("include/emqx.hrl").
-include_lib("include/logger.hrl").
-include("include/emqx_st_statistics.hrl").
-include_lib("emqx_plugin_libs/include/emqx_st_statistics.hrl").
-logger_header("[SLOW TOPICS]").
-export([ start_link/1, on_publish_done/3, enable/0
-export([ start_link/1, on_publish_done/5, enable/0
, disable/0, clear_history/0
]).
@ -90,8 +90,13 @@
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
-spec on_publish_done(message(), pos_integer(), counters:counters_ref()) -> ok.
on_publish_done(#message{timestamp = Timestamp} = Msg, Threshold, Counter) ->
-spec on_publish_done(message(),
pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok.
on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _)
when IgnoreBeforeCreate, Timestamp < Created ->
ok;
on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) ->
case ?NOW - Timestamp of
Elapsed when Elapsed > Threshold ->
case get_log_quota(Counter) of
@ -125,7 +130,8 @@ init([Env]) ->
Counter = counters:new(1, [write_concurrency]),
set_log_quota(Env, Counter),
Threshold = get_value(threshold_time, Env),
load(Threshold, Counter),
IgnoreBeforeCreate = get_value(ignore_before_create, Env),
load(IgnoreBeforeCreate, Threshold, Counter),
{ok, #{config => Env,
period => 1,
last_tick_at => ?NOW,
@ -139,7 +145,8 @@ handle_call({enable, Enable}, _From,
State;
true ->
Threshold = get_value(threshold_time, Cfg),
load(Threshold, Counter),
IgnoreBeforeCreate = get_value(ignore_before_create, Cfg),
load(IgnoreBeforeCreate, Threshold, Counter),
State#{enable := true};
_ ->
unload(),
@ -319,12 +326,14 @@ publish(TickTime, Cfg, Notices) ->
}),
ok.
load(Threshold, Counter) ->
_ = emqx:hook('message.publish_done', fun ?MODULE:on_publish_done/3, [Threshold, Counter]),
load(IgnoreBeforeCreate, Threshold, Counter) ->
_ = emqx:hook('message.publish_done',
fun ?MODULE:on_publish_done/5,
[IgnoreBeforeCreate, Threshold, Counter]),
ok.
unload() ->
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5).
-spec get_topic(proplists:proplist()) -> binary().
get_topic(Cfg) ->

View File

@ -2219,6 +2219,11 @@ module.presence.qos = 1
## Default: 10 seconds
#module.st_statistics.threshold_time = 10S
## ignore the messages that before than session created
##
## Default: true
#module.st_statistics.ignore_before_create = true
## Time window of slow topics statistics
##
## Value: 5 minutes

View File

@ -2193,6 +2193,11 @@ end}.
{datatype, {duration, ms}}
]}.
{mapping, "module.st_statistics.ignore_before_create", "emqx.modules", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "module.st_statistics.time_window", "emqx.modules", [
{default, "5M"},
{datatype, {duration, ms}}

View File

@ -269,7 +269,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions
case maps:find(TopicFilter, Subs) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(TopicFilter),
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
ok = emqx_hooks:run('session.unsubscribed',
[ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
{ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
error ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}
@ -316,10 +317,10 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
-> {ok, emqx_types:message(), session()}
| {ok, emqx_types:message(), replies(), session()}
| {error, emqx_types:reason_code()}).
puback(PacketId, Session = #session{inflight = Inflight}) ->
puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
emqx:run_hook('message.publish_done', [Msg]),
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
@ -341,11 +342,11 @@ return_with(Msg, {ok, Publishes, Session}) ->
-spec(pubrec(emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()}
| {error, emqx_types:reason_code()}).
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
%% execute hook here, because message record will be replaced by pubrel
emqx:run_hook('message.publish_done', [Msg]),
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{value, {pubrel, _Ts}} ->
@ -408,7 +409,7 @@ dequeue(Cnt, Msgs, Q) ->
case emqx_message:is_expired(Msg) of
true -> ok = inc_expired_cnt(delivery),
dequeue(Cnt, Msgs, Q1);
false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1)
false -> dequeue(acc_cnt(Msg, Cnt), [Msg | Msgs], Q1)
end
end.
@ -438,11 +439,11 @@ deliver([Msg | More], Acc, Session) ->
{ok, Session1} ->
deliver(More, Acc, Session1);
{ok, [Publish], Session1} ->
deliver(More, [Publish|Acc], Session1)
deliver(More, [Publish | Acc], Session1)
end.
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
emqx:run_hook('message.publish_done', [Msg]),
emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]),
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =
@ -461,7 +462,7 @@ deliver_msg(Msg = #message{qos = QoS}, Session =
{ok, [Publish], next_pkt_id(Session1)}
end.
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(),
-spec(enqueue(list(emqx_types:deliver()) | emqx_types:message(),
session()) -> session()).
enqueue([Deliver], Session) -> %% Optimize
Enrich = enrich_fun(Session),
@ -512,23 +513,23 @@ get_subopts(Topic, SubMap) ->
end.
enrich_subopts([], Msg, _Session) -> Msg;
enrich_subopts([{nl, 1}|Opts], Msg, Session) ->
enrich_subopts([{nl, 1} | Opts], Msg, Session) ->
enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session);
enrich_subopts([{nl, 0}|Opts], Msg, Session) ->
enrich_subopts([{nl, 0} | Opts], Msg, Session) ->
enrich_subopts(Opts, Msg, Session);
enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
enrich_subopts([{qos, SubQoS} | Opts], Msg = #message{qos = PubQoS},
Session = #session{upgrade_qos = true}) ->
enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
enrich_subopts([{qos, SubQoS} | Opts], Msg = #message{qos = PubQoS},
Session = #session{upgrade_qos = false}) ->
enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
enrich_subopts([{rap, 1}|Opts], Msg, Session) ->
enrich_subopts([{rap, 1} | Opts], Msg, Session) ->
enrich_subopts(Opts, Msg, Session);
enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
enrich_subopts([{rap, 0} | Opts], Msg = #message{headers = #{retained := true}}, Session) ->
enrich_subopts(Opts, Msg, Session);
enrich_subopts([{rap, 0}|Opts], Msg, Session) ->
enrich_subopts([{rap, 0} | Opts], Msg, Session) ->
enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
Props = emqx_message:get_header(properties, Msg, #{}),
Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
enrich_subopts(Opts, Msg1, Session).
@ -556,8 +557,8 @@ retry(Session = #session{inflight = Inflight}) ->
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
{ok, lists:reverse(Acc), Interval, Session};
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
#session{retry_interval = Interval, inflight = Inflight}) ->
retry_delivery([{PacketId, {Msg, Ts}} | More], Acc, Now, Session =
#session{retry_interval = Interval, inflight = Inflight}) ->
case (Age = age(Now, Ts)) >= Interval of
true ->
{Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
@ -574,12 +575,12 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -
false ->
Msg1 = emqx_message:set_flag(dup, true, Msg),
Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
{[{PacketId, Msg1}|Acc], Inflight1}
{[{PacketId, Msg1} | Acc], Inflight1}
end;
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
{[{pubrel, PacketId}|Acc], Inflight1}.
{[{pubrel, PacketId} | Acc], Inflight1}.
%%--------------------------------------------------------------------
%% Expire Awaiting Rel