From 262c3a286954ca20923ff0db97b64c68f06cf7ba Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 15:03:18 +0100 Subject: [PATCH 1/5] refactor(buffer_worker): rename function from reply_after_query to handle_async_reply --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 50534df4f..58c835533 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([reply_after_query/8, batch_reply_after_query/8]). +-export([handle_async_reply/8, batch_reply_after_query/8]). -export([clear_disk_queue_dir/2]). @@ -898,7 +898,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re ?APPLY_RESOURCE( call_query_async, begin - ReplyFun = fun ?MODULE:reply_after_query/8, + ReplyFun = fun ?MODULE:handle_async_reply/8, Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts], IsRetriable = false, WorkerMRef = undefined, @@ -936,7 +936,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re Batch ). -reply_after_query( +handle_async_reply( Pid, Id, Index, From f793807bc1fd9e0216ee09f61bd863e5c8306086 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 15:04:28 +0100 Subject: [PATCH 2/5] refactor(buffer_worker): rename function batch_reply_after_query to handle_async_batch_reply --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 58c835533..1b837880e 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([handle_async_reply/8, batch_reply_after_query/8]). +-export([handle_async_reply/8, handle_async_batch_reply/8]). -export([clear_disk_queue_dir/2]). @@ -923,7 +923,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re ?APPLY_RESOURCE( call_batch_query_async, begin - ReplyFun = fun ?MODULE:batch_reply_after_query/8, + ReplyFun = fun ?MODULE:handle_async_batch_reply/8, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch], IsRetriable = false, @@ -1000,7 +1000,7 @@ do_reply_after_query( do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. -batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> +handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> ?tp( buffer_worker_reply_after_query_enter, #{batch_or_query => Batch, ref => Ref} From 578271ea3d8a3b3b3230cd0bbbe2a4b7294232b6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 15:15:46 +0100 Subject: [PATCH 3/5] refactor: use lists:map instead of lc for safty --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 1b837880e..026effcf8 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -73,9 +73,8 @@ -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). --type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()). +-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). --type from() :: pid() | reply_fun() | request_from(). -type request_from() :: undefined | gen_statem:from(). -type state() :: blocked | running. -type inflight_key() :: integer(). @@ -913,7 +912,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch], + Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ @@ -925,7 +924,9 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re begin ReplyFun = fun ?MODULE:handle_async_batch_reply/8, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, - Requests = [Request || ?QUERY(_ReplyTo, Request, _, _ExpireAt) <- Batch], + Requests = lists:map( + fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch + ), IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), From fc38ea9571337c9ba7bf3e72c423f4f7bd1e1c6f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 17:12:55 +0100 Subject: [PATCH 4/5] refactor(buffer_worker): do not keep request body in reply context the request body can be potentially very large the reply context is sent to the async call handler and kept in its memory until the async reply is received from bridge target service. this commit tries to minimize the size of the reply context by replacing the request body with `[]`. --- .../src/emqx_resource_buffer_worker.erl | 161 +++++++++++------- .../test/emqx_resource_SUITE.erl | 2 +- 2 files changed, 102 insertions(+), 61 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 026effcf8..355fb276c 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([handle_async_reply/8, handle_async_batch_reply/8]). +-export([handle_async_reply/2, handle_async_batch_reply/2]). -export([clear_disk_queue_dir/2]). @@ -124,7 +124,7 @@ simple_sync_query(Id, Request) -> Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), - Ref = make_message_ref(), + Ref = make_request_ref(), Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -135,7 +135,7 @@ simple_async_query(Id, Request) -> Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), - Ref = make_message_ref(), + Ref = make_request_ref(), Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -511,7 +511,7 @@ flush(Data0) -> buffer_worker_flush_potentially_partial, #{expired => Expired, not_expired => NotExpired} ), - Ref = make_message_ref(), + Ref = make_request_ref(), do_flush(Data2, #{ new_queue => Q1, is_batch => IsBatch, @@ -897,13 +897,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re ?APPLY_RESOURCE( call_query_async, begin - ReplyFun = fun ?MODULE:handle_async_reply/8, - Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts], + ReplyFun = fun ?MODULE:handle_async_reply/2, + ReplyContext = #{ + buffer_worker => self(), + resource_id => Id, + worker_index => Index, + inflight_tid => InflightTID, + request_ref => Ref, + query_opts => QueryOpts, + query => minimize(Query) + }, IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), - Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), + Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} end, Request @@ -922,8 +930,16 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re ?APPLY_RESOURCE( call_batch_query_async, begin - ReplyFun = fun ?MODULE:handle_async_batch_reply/8, - ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, + ReplyFun = fun ?MODULE:handle_async_batch_reply/2, + ReplyContext = #{ + buffer_worker => self(), + resource_id => Id, + worker_index => Index, + inflight_tid => InflightTID, + request_ref => Ref, + query_opts => QueryOpts, + batch => minimize(Batch) + }, Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch ), @@ -931,20 +947,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), - Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), + Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} end, Batch ). handle_async_reply( - Pid, - Id, - Index, - InflightTID, - Ref, - ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query, - QueryOpts, + #{ + request_ref := Ref, + inflight_tid := InflightTID, + resource_id := Id, + worker_index := Index, + buffer_worker := Pid, + query := ?QUERY(_, _, _, ExpireAt) = Query + } = ReplyContext, Result ) -> ?tp( @@ -961,47 +978,55 @@ handle_async_reply( ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), ok; false -> - do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result) + do_handle_async_reply(ReplyContext, Result) end. -do_reply_after_query( - Pid, - Id, - Index, - InflightTID, - Ref, - ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), - QueryOpts, +do_handle_async_reply( + #{ + query_opts := QueryOpts, + resource_id := Id, + request_ref := Ref, + worker_index := Index, + buffer_worker := Pid, + inflight_tid := InflightTID, + query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = Query + }, Result ) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = reply_caller_defer_metrics( - Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts + Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts ), + + ?tp(buffer_worker_reply_after_query, #{ + action => Action, + batch_or_query => [Query], + ref => Ref, + result => Result + }), + case Action of nack -> %% Keep retrying. - ?tp(buffer_worker_reply_after_query, #{ - action => Action, - batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), - ref => Ref, - result => Result - }), mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(buffer_worker_reply_after_query, #{ - action => Action, - batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), - ref => Ref, - result => Result - }), do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. -handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> +handle_async_batch_reply( + #{ + buffer_worker := Pid, + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + batch := Batch + } = ReplyContext, + Result +) -> ?tp( buffer_worker_reply_after_query_enter, #{batch_or_query => Batch, ref => Ref} @@ -1020,12 +1045,21 @@ handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Res emqx_resource_metrics:late_reply_inc(Id, NumExpired), NumExpired > 0 andalso ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), - do_batch_reply_after_query( - Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result - ) + do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) end. -do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> +do_handle_async_batch_reply( + #{ + buffer_worker := Pid, + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + batch := Batch, + query_opts := QueryOpts + }, + Result +) -> ?tp( buffer_worker_reply_after_query_enter, #{batch_or_query => Batch, ref => Ref} @@ -1034,24 +1068,18 @@ do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, R %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), + ?tp(buffer_worker_reply_after_query, #{ + action => Action, + batch_or_query => Batch, + ref => Ref, + result => Result + }), case Action of nack -> %% Keep retrying. - ?tp(buffer_worker_reply_after_query, #{ - action => nack, - batch_or_query => Batch, - ref => Ref, - result => Result - }), mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(buffer_worker_reply_after_query, #{ - action => ack, - batch_or_query => Batch, - ref => Ref, - result => Result - }), do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. @@ -1098,7 +1126,8 @@ append_queue(Id, Index, Q, Queries) -> emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(info, #{ msg => buffer_worker_overflow, - worker_id => Id, + resource_id => Id, + worker_index => Index, dropped => Dropped }), {Items2, Q1} @@ -1133,7 +1162,7 @@ inflight_new(InfltWinSZ, Id, Index) -> inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), inflight_append( - TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index + TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index ), TableId. @@ -1372,8 +1401,8 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> _ = erlang:cancel_timer(TRef), St#{tref => undefined}. --spec make_message_ref() -> inflight_key(). -make_message_ref() -> +-spec make_request_ref() -> inflight_key(). +make_request_ref() -> now_(). collect_requests(Acc, Limit) -> @@ -1459,3 +1488,15 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond), ExpireAt = now_() + TimeoutNS, Opts#{expire_at => ExpireAt}. + +%% no need to keep the request for async reply handler +minimize(?QUERY(_, _, _, _) = Q) -> + do_minimize(Q); +minimize(L) when is_list(L) -> + lists:map(fun do_minimize/1, L). + +-ifdef(TEST). +do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. +-else. +do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). +-endif. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 227b6fedc..fc201e048 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref}, + #{?snk_kind := buffer_worker_reply_after_query, action := nack}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) From d47941601d1d78cc4fe345299cf9a9df60c2fd83 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 17:27:27 +0100 Subject: [PATCH 5/5] refactor(buffer_worker): rename trace points --- .../src/emqx_resource_buffer_worker.erl | 29 +++++++------------ .../test/emqx_resource_SUITE.erl | 16 +++++----- .../kafka/emqx_bridge_impl_kafka_producer.erl | 2 +- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 4 +-- 4 files changed, 22 insertions(+), 29 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 355fb276c..0ac627d29 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -960,13 +960,13 @@ handle_async_reply( resource_id := Id, worker_index := Index, buffer_worker := Pid, - query := ?QUERY(_, _, _, ExpireAt) = Query + query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> ?tp( - buffer_worker_reply_after_query_enter, - #{batch_or_query => [Query], ref => Ref} + handle_async_reply_enter, + #{batch_or_query => [_Query], ref => Ref} ), Now = now_(), case is_expired(ExpireAt, Now) of @@ -975,7 +975,7 @@ handle_async_reply( IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), IsFullBefore andalso ?MODULE:flush_worker(Pid), - ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), + ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> do_handle_async_reply(ReplyContext, Result) @@ -989,7 +989,7 @@ do_handle_async_reply( worker_index := Index, buffer_worker := Pid, inflight_tid := InflightTID, - query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = Query + query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, Result ) -> @@ -1000,9 +1000,9 @@ do_handle_async_reply( Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts ), - ?tp(buffer_worker_reply_after_query, #{ + ?tp(handle_async_reply, #{ action => Action, - batch_or_query => [Query], + batch_or_query => [_Query], ref => Ref, result => Result }), @@ -1028,7 +1028,7 @@ handle_async_batch_reply( Result ) -> ?tp( - buffer_worker_reply_after_query_enter, + handle_async_reply_enter, #{batch_or_query => Batch, ref => Ref} ), Now = now_(), @@ -1038,13 +1038,13 @@ handle_async_batch_reply( IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), IsFullBefore andalso ?MODULE:flush_worker(Pid), - ?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}), + ?tp(handle_async_reply_expired, #{expired => Batch}), ok; {NotExpired, Expired} -> NumExpired = length(Expired), emqx_resource_metrics:late_reply_inc(Id, NumExpired), NumExpired > 0 andalso - ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), + ?tp(handle_async_reply_expired, #{expired => Expired}), do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) end. @@ -1060,15 +1060,8 @@ do_handle_async_batch_reply( }, Result ) -> - ?tp( - buffer_worker_reply_after_query_enter, - #{batch_or_query => Batch, ref => Ref} - ), - %% NOTE: 'inflight' is the count of messages that were sent async - %% but received no ACK, NOT the number of messages queued in the - %% inflight window. {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), - ?tp(buffer_worker_reply_after_query, #{ + ?tp(handle_async_reply, #{ action => Action, batch_or_query => Batch, ref => Ref, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index fc201e048..86336a38f 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1718,7 +1718,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) -> async -> {ok, _} = ?block_until( #{ - ?snk_kind := buffer_worker_reply_after_query, + ?snk_kind := handle_async_reply, action := ack, batch_or_query := [{query, _, {inc_counter, 99}, _, _}] }, @@ -1849,7 +1849,7 @@ do_t_expiration_async_after_reply(IsBatch) -> ?force_ordering( #{?snk_kind := delay}, #{ - ?snk_kind := buffer_worker_reply_after_query_enter, + ?snk_kind := handle_async_reply_enter, batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] } ), @@ -1874,7 +1874,7 @@ do_t_expiration_async_after_reply(IsBatch) -> #{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS ), {ok, _} = ?block_until( - #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), unlink(Pid0), @@ -1888,7 +1888,7 @@ do_t_expiration_async_after_reply(IsBatch) -> expired := [{query, _, {inc_counter, 199}, _, _}] } ], - ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ?of_kind(handle_async_reply_expired, Trace) ), wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), Metrics = tap_metrics(?LINE), @@ -1936,7 +1936,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?force_ordering( #{?snk_kind := delay}, #{ - ?snk_kind := buffer_worker_reply_after_query_enter, + ?snk_kind := handle_async_reply_enter, batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _] } ), @@ -1955,7 +1955,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> end), {ok, _} = ?block_until( - #{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS + #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), unlink(Pid0), @@ -1969,7 +1969,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> expired := [{query, _, {inc_counter, 199}, _, _}] } ], - ?of_kind(buffer_worker_reply_after_query_expired, Trace) + ?of_kind(handle_async_reply_expired, Trace) ), Metrics = tap_metrics(?LINE), ?assertMatch( @@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := buffer_worker_reply_after_query, action := nack}, + #{?snk_kind := handle_async_reply, action := nack}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace ) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 18e27b775..1ac619626 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -227,7 +227,7 @@ render_timestamp(Template, Message) -> %% Wolff producer never gives up retrying %% so there can only be 'ok' results. on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> - %% the ReplyFn is emqx_resource_worker:reply_after_query/8 + %% the ReplyFn is emqx_resource_worker:handle_async_reply/2 apply(ReplyFn, Args ++ [ok]); on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% wolff should bump the dropped_queue_full counter diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index cd7f848c2..bbde88cc7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -920,7 +920,7 @@ t_write_failure(Config) -> async -> ?wait_async_action( ?assertEqual(ok, send_message(Config, SentData)), - #{?snk_kind := buffer_worker_reply_after_query}, + #{?snk_kind := handle_async_reply}, 1_000 ) end @@ -938,7 +938,7 @@ t_write_failure(Config) -> #{got => Result} ); async -> - Trace = ?of_kind(buffer_worker_reply_after_query, Trace0), + Trace = ?of_kind(handle_async_reply, Trace0), ?assertMatch([#{action := nack} | _], Trace), [#{result := Result} | _] = Trace, ?assert(