diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index d301ce03c..02edba2b9 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -466,7 +466,7 @@ flush(Data0) -> queue := Q0 } = Data0, Data1 = cancel_flush_timer(Data0), - case replayq:count(Q0) of + case queue_count(Q0) of 0 -> {keep_state, Data1}; _ -> @@ -530,7 +530,7 @@ do_flush( ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - case replayq:count(Q1) > 0 of + case queue_count(Q1) > 0 of true -> {keep_state, Data0, [{next_event, internal, flush}]}; false -> @@ -570,7 +570,7 @@ do_flush(Data0, #{ ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse inflight_drop(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - CurrentCount = replayq:count(Q1), + CurrentCount = queue_count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> {keep_state, Data0}; @@ -883,14 +883,14 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> Q1 end, Q3 = replayq:append(Q2, Queries), - emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q3)), ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}), Q3. -spec get_first_n_from_queue(replayq:q(), pos_integer()) -> empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}. get_first_n_from_queue(Q, N) -> - case replayq:count(Q) of + case queue_count(Q) of 0 -> empty; _ ->