fix(bridge): fix dropped counter and inflight gauge

Prior to this fix there were two metrics issues
1. if a batch is all requests expired when receiving a reply
   it only bumped 1 instead of the batch size for 'late_reply'
2. when a batch is partially delivered (or expired), the
   dropped requests were not decremented from the inflight size gauge
This commit is contained in:
Zaiming (Stone) Shi 2023-02-22 13:20:58 +01:00
parent 4cc92c0368
commit bb13d0708f
1 changed files with 6 additions and 5 deletions

View File

@ -336,8 +336,8 @@ resume_from_blocked(Data) ->
%% async, they will be appended to the end of inflight window again.
retry_inflight_sync(Ref, Query, Data);
{batch, Ref, NotExpired, Expired} ->
update_inflight_item(InflightTID, Ref, NotExpired),
NumExpired = length(Expired),
update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
%% We retry msgs in inflight window sync, as if we send them
@ -1050,7 +1050,7 @@ handle_async_batch_reply(
all_expired ->
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(handle_async_reply_expired, #{expired => Batch}),
ok;
@ -1317,10 +1317,10 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
1;
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
length(Batch);
_ ->
[] ->
0
end,
IsAcked = Count > 0,
IsAcked = (Count > 0),
IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
IsAcked.
@ -1341,8 +1341,9 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
ok.
%% used to update a batch after dropping expired individual queries.
update_inflight_item(InflightTID, Ref, NewBatch) ->
update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) ->
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}),
?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}),
ok.