diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 35761822d..602551c33 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1256,6 +1256,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), + ?tp_ignore_side_effects_in_prod( + handle_async_reply_partially_expired, + #{ + inflight_count => inflight_count(InflightTID), + num_inflight_messages => inflight_num_msgs(InflightTID) + } + ), do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index b960b0526..56d878859 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2248,6 +2248,15 @@ do_t_expiration_async_after_reply(IsBatch) -> } ], ?of_kind(handle_async_reply_expired, Trace) + ), + ?assertMatch( + [ + #{ + inflight_count := 1, + num_inflight_messages := 1 + } + ], + ?of_kind(handle_async_reply_partially_expired, Trace) ); single -> ?assertMatch(