improve(emqx_st_statistics): optimize the parameters of on_publish_done (#6151)
* fix(emqx_st_statistics): optimize the parameters of on_publish_done
This commit is contained in:
parent
87a2667e35
commit
0357f7ad85
|
@ -24,7 +24,7 @@
|
||||||
|
|
||||||
-logger_header("[SLOW TOPICS]").
|
-logger_header("[SLOW TOPICS]").
|
||||||
|
|
||||||
-export([ start_link/1, on_publish_done/5, enable/0
|
-export([ start_link/1, on_publish_done/3, enable/0
|
||||||
, disable/0, clear_history/0
|
, disable/0, clear_history/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@
|
||||||
-type state() :: #{ config := proplist:proplist()
|
-type state() :: #{ config := proplist:proplist()
|
||||||
, period := pos_integer()
|
, period := pos_integer()
|
||||||
, last_tick_at := pos_integer()
|
, last_tick_at := pos_integer()
|
||||||
, counter := counters:counter_ref()
|
, counter := counters:counters_ref()
|
||||||
, enable := boolean()
|
, enable := boolean()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -70,6 +70,13 @@
|
||||||
-type slow_log() :: #slow_log{}.
|
-type slow_log() :: #slow_log{}.
|
||||||
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
|
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
|
||||||
|
|
||||||
|
-type publish_done_env() :: #{ ignore_before_create := boolean()
|
||||||
|
, threshold := pos_integer()
|
||||||
|
, counter := counters:counters_ref()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type publish_done_args() :: #{session_rebirth_time => pos_integer()}.
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-define(TOPK_ACCESS, public).
|
-define(TOPK_ACCESS, public).
|
||||||
-else.
|
-else.
|
||||||
|
@ -90,13 +97,16 @@
|
||||||
start_link(Env) ->
|
start_link(Env) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
|
||||||
|
|
||||||
-spec on_publish_done(message(),
|
-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok.
|
||||||
pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok.
|
on_publish_done(#message{timestamp = Timestamp},
|
||||||
on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _)
|
#{session_rebirth_time := Created},
|
||||||
|
#{ignore_before_create := IgnoreBeforeCreate})
|
||||||
when IgnoreBeforeCreate, Timestamp < Created ->
|
when IgnoreBeforeCreate, Timestamp < Created ->
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) ->
|
on_publish_done(#message{timestamp = Timestamp} = Msg,
|
||||||
|
_,
|
||||||
|
#{threshold := Threshold, counter := Counter}) ->
|
||||||
case ?NOW - Timestamp of
|
case ?NOW - Timestamp of
|
||||||
Elapsed when Elapsed > Threshold ->
|
Elapsed when Elapsed > Threshold ->
|
||||||
case get_log_quota(Counter) of
|
case get_log_quota(Counter) of
|
||||||
|
@ -202,7 +212,7 @@ init_topk_tab(_) ->
|
||||||
, {read_concurrency, true}
|
, {read_concurrency, true}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-spec get_log_quota(counter:counter_ref()) -> boolean().
|
-spec get_log_quota(counters:counters_ref()) -> boolean().
|
||||||
get_log_quota(Counter) ->
|
get_log_quota(Counter) ->
|
||||||
case counters:get(Counter, ?QUOTA_IDX) of
|
case counters:get(Counter, ?QUOTA_IDX) of
|
||||||
Quota when Quota > 0 ->
|
Quota when Quota > 0 ->
|
||||||
|
@ -212,7 +222,7 @@ get_log_quota(Counter) ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok.
|
-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok.
|
||||||
set_log_quota(Cfg, Counter) ->
|
set_log_quota(Cfg, Counter) ->
|
||||||
MaxLogNum = get_value(max_log_num, Cfg),
|
MaxLogNum = get_value(max_log_num, Cfg),
|
||||||
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
|
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
|
||||||
|
@ -328,12 +338,15 @@ publish(TickTime, Cfg, Notices) ->
|
||||||
|
|
||||||
load(IgnoreBeforeCreate, Threshold, Counter) ->
|
load(IgnoreBeforeCreate, Threshold, Counter) ->
|
||||||
_ = emqx:hook('message.publish_done',
|
_ = emqx:hook('message.publish_done',
|
||||||
fun ?MODULE:on_publish_done/5,
|
fun ?MODULE:on_publish_done/3,
|
||||||
[IgnoreBeforeCreate, Threshold, Counter]),
|
[#{ignore_before_create => IgnoreBeforeCreate,
|
||||||
|
threshold => Threshold,
|
||||||
|
counter => Counter}
|
||||||
|
]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5).
|
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
|
||||||
|
|
||||||
-spec get_topic(proplists:proplist()) -> binary().
|
-spec get_topic(proplists:proplist()) -> binary().
|
||||||
get_topic(Cfg) ->
|
get_topic(Cfg) ->
|
||||||
|
|
|
@ -320,7 +320,8 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
|
||||||
puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
|
puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||||
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
|
emqx:run_hook('message.publish_done',
|
||||||
|
[Msg, #{session_rebirth_time => CreatedAt}]),
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
|
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
|
||||||
{value, {_Pubrel, _Ts}} ->
|
{value, {_Pubrel, _Ts}} ->
|
||||||
|
@ -346,7 +347,8 @@ pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||||
%% execute hook here, because message record will be replaced by pubrel
|
%% execute hook here, because message record will be replaced by pubrel
|
||||||
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
|
emqx:run_hook('message.publish_done',
|
||||||
|
[Msg, #{session_rebirth_time => CreatedAt}]),
|
||||||
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
|
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
|
||||||
{ok, Msg, Session#session{inflight = Inflight1}};
|
{ok, Msg, Session#session{inflight = Inflight1}};
|
||||||
{value, {pubrel, _Ts}} ->
|
{value, {pubrel, _Ts}} ->
|
||||||
|
@ -443,7 +445,8 @@ deliver([Msg | More], Acc, Session) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
|
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
|
||||||
emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]),
|
emqx:run_hook('message.publish_done',
|
||||||
|
[Msg, #{session_rebirth_time => Session#session.created_at}]),
|
||||||
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
||||||
|
|
||||||
deliver_msg(Msg = #message{qos = QoS}, Session =
|
deliver_msg(Msg = #message{qos = QoS}, Session =
|
||||||
|
|
Loading…
Reference in New Issue