From bb13d0708f387e05af18479ba88ab101befb2233 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 13:20:58 +0100 Subject: [PATCH] fix(bridge): fix dropped counter and inflight gauge Prior to this fix there were two metrics issues 1. if a batch is all requests expired when receiving a reply it only bumped 1 instead of the batch size for 'late_reply' 2. when a batch is partially delivered (or expired), the dropped requests were not decremented from the inflight size gauge --- .../emqx_resource/src/emqx_resource_buffer_worker.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index bb4eee57d..2f83a347a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -336,8 +336,8 @@ resume_from_blocked(Data) -> %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, Query, Data); {batch, Ref, NotExpired, Expired} -> - update_inflight_item(InflightTID, Ref, NotExpired), NumExpired = length(Expired), + update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them @@ -1050,7 +1050,7 @@ handle_async_batch_reply( all_expired -> IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)), IsFullBefore andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => Batch}), ok; @@ -1317,10 +1317,10 @@ ack_inflight(InflightTID, Ref, Id, Index) -> 1; [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> length(Batch); - _ -> + [] -> 0 end, - IsAcked = Count > 0, + IsAcked = (Count > 0), IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), IsAcked. @@ -1341,8 +1341,9 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}), ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), ok.