refactor: call local function queue_count everywhere
This commit is contained in:
parent
249c4c1c79
commit
618b97870b
|
@ -466,7 +466,7 @@ flush(Data0) ->
|
||||||
queue := Q0
|
queue := Q0
|
||||||
} = Data0,
|
} = Data0,
|
||||||
Data1 = cancel_flush_timer(Data0),
|
Data1 = cancel_flush_timer(Data0),
|
||||||
case replayq:count(Q0) of
|
case queue_count(Q0) of
|
||||||
0 ->
|
0 ->
|
||||||
{keep_state, Data1};
|
{keep_state, Data1};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -530,7 +530,7 @@ do_flush(
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index),
|
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
case replayq:count(Q1) > 0 of
|
case queue_count(Q1) > 0 of
|
||||||
true ->
|
true ->
|
||||||
{keep_state, Data0, [{next_event, internal, flush}]};
|
{keep_state, Data0, [{next_event, internal, flush}]};
|
||||||
false ->
|
false ->
|
||||||
|
@ -570,7 +570,7 @@ do_flush(Data0, #{
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index),
|
is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
CurrentCount = replayq:count(Q1),
|
CurrentCount = queue_count(Q1),
|
||||||
case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
{keep_state, Data0};
|
{keep_state, Data0};
|
||||||
|
@ -883,14 +883,14 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
||||||
Q1
|
Q1
|
||||||
end,
|
end,
|
||||||
Q3 = replayq:append(Q2, Queries),
|
Q3 = replayq:append(Q2, Queries),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q3)),
|
||||||
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
|
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
|
||||||
Q3.
|
Q3.
|
||||||
|
|
||||||
-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
|
-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
|
||||||
empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}.
|
empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}.
|
||||||
get_first_n_from_queue(Q, N) ->
|
get_first_n_from_queue(Q, N) ->
|
||||||
case replayq:count(Q) of
|
case queue_count(Q) of
|
||||||
0 ->
|
0 ->
|
||||||
empty;
|
empty;
|
||||||
_ ->
|
_ ->
|
||||||
|
|
Loading…
Reference in New Issue