refactor: move the the per-message overflow log from error to info level
This commit is contained in:
parent
bb26632c8a
commit
25b4821adc
|
@ -437,7 +437,7 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
|
||||||
end,
|
end,
|
||||||
Requests
|
Requests
|
||||||
),
|
),
|
||||||
NewQ = append_queue(Id, Index, Q, Queries),
|
{_Overflow, NewQ} = append_queue(Id, Index, Q, Queries),
|
||||||
Data = Data0#{queue := NewQ},
|
Data = Data0#{queue := NewQ},
|
||||||
{Queries, Data}.
|
{Queries, Data}.
|
||||||
|
|
||||||
|
@ -1089,18 +1089,22 @@ append_queue(Id, Index, Q, Queries) ->
|
||||||
%% because the marshaller will get lost.
|
%% because the marshaller will get lost.
|
||||||
false = is_binary(hd(Queries)),
|
false = is_binary(hd(Queries)),
|
||||||
Q0 = replayq:append(Q, Queries),
|
Q0 = replayq:append(Q, Queries),
|
||||||
Q2 =
|
{Overflow, Q2} =
|
||||||
case replayq:overflow(Q0) of
|
case replayq:overflow(Q0) of
|
||||||
Overflow when Overflow =< 0 ->
|
OverflowBytes when OverflowBytes =< 0 ->
|
||||||
Q0;
|
{[], Q0};
|
||||||
Overflow ->
|
OverflowBytes ->
|
||||||
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
|
PopOpts = #{bytes_limit => OverflowBytes, count_limit => 999999999},
|
||||||
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
|
{Q1, QAckRef, Items2} = replayq:pop(Q0, PopOpts),
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
Dropped = length(Items2),
|
Dropped = length(Items2),
|
||||||
emqx_resource_metrics:dropped_queue_full_inc(Id),
|
emqx_resource_metrics:dropped_queue_full_inc(Id),
|
||||||
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
|
?SLOG(info, #{
|
||||||
Q1
|
msg => buffer_worker_overflow,
|
||||||
|
worker_id => Id,
|
||||||
|
dropped => Dropped
|
||||||
|
}),
|
||||||
|
{Items2, Q1}
|
||||||
end,
|
end,
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
|
||||||
?tp(
|
?tp(
|
||||||
|
@ -1108,10 +1112,11 @@ append_queue(Id, Index, Q, Queries) ->
|
||||||
#{
|
#{
|
||||||
id => Id,
|
id => Id,
|
||||||
items => Queries,
|
items => Queries,
|
||||||
queue_count => queue_count(Q2)
|
queue_count => queue_count(Q2),
|
||||||
|
overflow => length(Overflow)
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
Q2.
|
{Overflow, Q2}.
|
||||||
|
|
||||||
%%==============================================================================
|
%%==============================================================================
|
||||||
%% the inflight queue for async query
|
%% the inflight queue for async query
|
||||||
|
|
Loading…
Reference in New Issue