From fc38ea9571337c9ba7bf3e72c423f4f7bd1e1c6f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 27 Jan 2023 17:12:55 +0100 Subject: [PATCH] refactor(buffer_worker): do not keep request body in reply context the request body can be potentially very large the reply context is sent to the async call handler and kept in its memory until the async reply is received from bridge target service. this commit tries to minimize the size of the reply context by replacing the request body with `[]`. --- .../src/emqx_resource_buffer_worker.erl | 161 +++++++++++------- .../test/emqx_resource_SUITE.erl | 2 +- 2 files changed, 102 insertions(+), 61 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 026effcf8..355fb276c 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -52,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([handle_async_reply/8, handle_async_batch_reply/8]). +-export([handle_async_reply/2, handle_async_batch_reply/2]). -export([clear_disk_queue_dir/2]). @@ -124,7 +124,7 @@ simple_sync_query(Id, Request) -> Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), - Ref = make_message_ref(), + Ref = make_request_ref(), Result = call_query(sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -135,7 +135,7 @@ simple_async_query(Id, Request) -> Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), - Ref = make_message_ref(), + Ref = make_request_ref(), Result = call_query(async, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -511,7 +511,7 @@ flush(Data0) -> buffer_worker_flush_potentially_partial, #{expired => Expired, not_expired => NotExpired} ), - Ref = make_message_ref(), + Ref = make_request_ref(), do_flush(Data2, #{ new_queue => Q1, is_batch => IsBatch, @@ -897,13 +897,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re ?APPLY_RESOURCE( call_query_async, begin - ReplyFun = fun ?MODULE:handle_async_reply/8, - Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts], + ReplyFun = fun ?MODULE:handle_async_reply/2, + ReplyContext = #{ + buffer_worker => self(), + resource_id => Id, + worker_index => Index, + inflight_tid => InflightTID, + request_ref => Ref, + query_opts => QueryOpts, + query => minimize(Query) + }, IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), - Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), + Result = Mod:on_query_async(Id, Request, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} end, Request @@ -922,8 +930,16 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re ?APPLY_RESOURCE( call_batch_query_async, begin - ReplyFun = fun ?MODULE:handle_async_batch_reply/8, - ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, + ReplyFun = fun ?MODULE:handle_async_batch_reply/2, + ReplyContext = #{ + buffer_worker => self(), + resource_id => Id, + worker_index => Index, + inflight_tid => InflightTID, + request_ref => Ref, + query_opts => QueryOpts, + batch => minimize(Batch) + }, Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch ), @@ -931,20 +947,21 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), ok = inflight_append(InflightTID, InflightItem, Id, Index), - Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), + Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [ReplyContext]}, ResSt), {async_return, Result} end, Batch ). handle_async_reply( - Pid, - Id, - Index, - InflightTID, - Ref, - ?QUERY(_ReplyTo, _Request, _HasBeenSent, ExpireAt) = Query, - QueryOpts, + #{ + request_ref := Ref, + inflight_tid := InflightTID, + resource_id := Id, + worker_index := Index, + buffer_worker := Pid, + query := ?QUERY(_, _, _, ExpireAt) = Query + } = ReplyContext, Result ) -> ?tp( @@ -961,47 +978,55 @@ handle_async_reply( ?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}), ok; false -> - do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result) + do_handle_async_reply(ReplyContext, Result) end. -do_reply_after_query( - Pid, - Id, - Index, - InflightTID, - Ref, - ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), - QueryOpts, +do_handle_async_reply( + #{ + query_opts := QueryOpts, + resource_id := Id, + request_ref := Ref, + worker_index := Index, + buffer_worker := Pid, + inflight_tid := InflightTID, + query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = Query + }, Result ) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = reply_caller_defer_metrics( - Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts + Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts ), + + ?tp(buffer_worker_reply_after_query, #{ + action => Action, + batch_or_query => [Query], + ref => Ref, + result => Result + }), + case Action of nack -> %% Keep retrying. - ?tp(buffer_worker_reply_after_query, #{ - action => Action, - batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), - ref => Ref, - result => Result - }), mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(buffer_worker_reply_after_query, #{ - action => Action, - batch_or_query => ?QUERY(ReplyTo, _Request, HasBeenSent, _ExpireAt), - ref => Ref, - result => Result - }), do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. -handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> +handle_async_batch_reply( + #{ + buffer_worker := Pid, + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + batch := Batch + } = ReplyContext, + Result +) -> ?tp( buffer_worker_reply_after_query_enter, #{batch_or_query => Batch, ref => Ref} @@ -1020,12 +1045,21 @@ handle_async_batch_reply(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Res emqx_resource_metrics:late_reply_inc(Id, NumExpired), NumExpired > 0 andalso ?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}), - do_batch_reply_after_query( - Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result - ) + do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) end. -do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> +do_handle_async_batch_reply( + #{ + buffer_worker := Pid, + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + batch := Batch, + query_opts := QueryOpts + }, + Result +) -> ?tp( buffer_worker_reply_after_query_enter, #{batch_or_query => Batch, ref => Ref} @@ -1034,24 +1068,18 @@ do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, R %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), + ?tp(buffer_worker_reply_after_query, #{ + action => Action, + batch_or_query => Batch, + ref => Ref, + result => Result + }), case Action of nack -> %% Keep retrying. - ?tp(buffer_worker_reply_after_query, #{ - action => nack, - batch_or_query => Batch, - ref => Ref, - result => Result - }), mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - ?tp(buffer_worker_reply_after_query, #{ - action => ack, - batch_or_query => Batch, - ref => Ref, - result => Result - }), do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) end. @@ -1098,7 +1126,8 @@ append_queue(Id, Index, Q, Queries) -> emqx_resource_metrics:dropped_queue_full_inc(Id), ?SLOG(info, #{ msg => buffer_worker_overflow, - worker_id => Id, + resource_id => Id, + worker_index => Index, dropped => Dropped }), {Items2, Q1} @@ -1133,7 +1162,7 @@ inflight_new(InfltWinSZ, Id, Index) -> inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), inflight_append( - TableId, {?INITIAL_MONOTONIC_TIME_REF, make_message_ref()}, Id, Index + TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index ), TableId. @@ -1372,8 +1401,8 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> _ = erlang:cancel_timer(TRef), St#{tref => undefined}. --spec make_message_ref() -> inflight_key(). -make_message_ref() -> +-spec make_request_ref() -> inflight_key(). +make_request_ref() -> now_(). collect_requests(Acc, Limit) -> @@ -1459,3 +1488,15 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) -> TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond), ExpireAt = now_() + TimeoutNS, Opts#{expire_at => ExpireAt}. + +%% no need to keep the request for async reply handler +minimize(?QUERY(_, _, _, _) = Q) -> + do_minimize(Q); +minimize(L) when is_list(L) -> + lists:map(fun do_minimize/1, L). + +-ifdef(TEST). +do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. +-else. +do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). +-endif. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 227b6fedc..fc201e048 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2335,7 +2335,7 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( - #{?snk_kind := buffer_worker_reply_after_query, action := nack, ref := _Ref}, + #{?snk_kind := buffer_worker_reply_after_query, action := nack}, #{?snk_kind := buffer_worker_retry_inflight_failed, ref := _Ref}, Trace )