diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 355fb276c..0ac627d29 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -960,13 +960,13 @@ handle_async_reply( resource_id := Id, worker_index := Index, buffer_worker := Pid, - query := ?QUERY(_, _, _, ExpireAt) = Query + query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> ?tp( - buffer_worker_reply_after_query_enter, - #{batch_or_query => [Query], ref => Ref} + handle_async_reply_enter, + #{batch_or_query => [_Query], ref => Ref} ), Now = now_(), case is_expired(ExpireAt, Now) of @@ -975,7 +975,7 @@ handle_async_reply( IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), IsFullBefore andalso ?MODULE:flush_worker(Pid), - ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), + ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> do_handle_async_reply(ReplyContext, Result) @@ -989,7 +989,7 @@ do_handle_async_reply( worker_index := Index, buffer_worker := Pid, inflight_tid := InflightTID, - query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = Query + query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, Result ) -> @@ -1000,9 +1000,9 @@ do_handle_async_reply( Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts ), - ?tp(buffer_worker_reply_after_query, #{ + ?tp(handle_async_reply, #{ action => Action, - batch_or_query => [Query], + batch_or_query => [_Query], ref => Ref, result => Result }), @@ -1028,7 +1028,7 @@ handle_async_batch_reply( Result ) -> ?tp( - buffer_worker_reply_after_query_enter, + handle_async_reply_enter, #{batch_or_query => Batch, ref => Ref} ), Now = now_(), @@ -1038,13 +1038,13 @@ handle_async_batch_reply( IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), IsFullBefore andalso ?MODULE:flush_worker(Pid), - ?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}), + ?tp(handle_async_reply_expired, #{expired => Batch}), ok; {NotExpired, Expired} -> NumExpired = length(Expired), emqx_resource_metrics:late_reply_inc(Id, NumExpired), NumExpired > 0 andalso - ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), + ?tp(handle_async_reply_expired, #{expired => Expired}), do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) end. @@ -1060,15 +1060,8 @@ do_handle_async_batch_reply( }, Result ) -> - ?tp( - buffer_worker_reply_after_query_enter, - #{batch_or_query => Batch, ref => Ref} - ), - %% NOTE: 'inflight' is the count of messages that were sent async - %% but received no ACK, NOT the number of messages queued in the - %% inflight window. {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), - ?tp(buffer_worker_reply_after_query, #{ + ?tp(handle_async_reply, #{ action => Action, batch_or_query => Batch, ref => Ref, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index fc201e048..86336a38f 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1718,7 +1718,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) -> async -> {ok, _} = ?block_until( #{ - ?snk_kind := buffer_worker_reply_after_query, + ?snk_kind := handle_async_reply, action := ack, batch_or_query := [{query, _, {inc_counter, 99}, _, _}] }, @@ -1849,7 +1849,7 @@ do_t_expiration_async_after_reply(IsBatch) -> ?force_ordering( #{?snk_kind := delay}, #{ - ?snk_kind := buffer_worker_reply_after_query_enter, + ?snk_kind := handle_async_reply_enter, batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] } ), @@ -1874,7 +1874,7 @@ do_t_expiration_async_after_reply(IsBatch) -> #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS ), {ok, _} = ?block_until( - #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), unlink(Pid0), @@ -1888,7 +1888,7 @@ do_t_expiration_async_after_reply(IsBatch) -> expired := [{query, _, {inc_counter, 199}, _, _}] } ], - ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ?of_kind(handle_async_reply_expired, Trace) ), wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), Metrics = tap_metrics(?LINE), @@ -1936,7 +1936,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?force_ordering( #{?snk_kind := delay}, #{ - ?snk_kind := buffer_worker_reply_after_query_enter, + ?snk_kind := handle_async_reply_enter, batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] } ), @@ -1955,7 +1955,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> end), {ok, _} = ?block_until( - #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), unlink(Pid0), @@ -1969,7 +1969,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> expired := [{query, _, {inc_counter, 199}, _, _}] } ], - ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ?of_kind(handle_async_reply_expired, Trace) ), Metrics = tap_metrics(?LINE), ?assertMatch( @@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := buffer_worker_reply_after_query, action := nack}, + #{?snk_kind := handle_async_reply, action := nack}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 18e27b775..1ac619626 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -227,7 +227,7 @@ render_timestamp(Template, Message) -> %% Wolff producer never gives up retrying %% so there can only be 'ok' results. on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> - %% the ReplyFn is emqx_resource_worker:reply_after_query/8 + %% the ReplyFn is emqx_resource_worker:handle_async_reply/2 apply(ReplyFn, Args ++ [ok]); on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% wolff should bump the dropped_queue_full counter diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index cd7f848c2..bbde88cc7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -920,7 +920,7 @@ t_write_failure(Config) -> async -> ?wait_async_action( ?assertEqual(ok, send_message(Config, SentData)), - #{?snk_kind := buffer_worker_reply_after_query}, + #{?snk_kind := handle_async_reply}, 1_000 ) end @@ -938,7 +938,7 @@ t_write_failure(Config) -> #{got => Result} ); async -> - Trace = ?of_kind(buffer_worker_reply_after_query, Trace0), + Trace = ?of_kind(handle_async_reply, Trace0), ?assertMatch([#{action := nack} | _], Trace), [#{result := Result} | _] = Trace, ?assert(