From 3a6dbbdd058efdf596e16750304522ee9e43c8eb Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 16:25:38 +0100 Subject: [PATCH] refactor(buffer_worker): ensure flsh message is never missed --- .../src/emqx_resource_buffer_worker.erl | 80 +++++++++++-------- .../test/emqx_connector_demo.erl | 2 + .../test/emqx_resource_SUITE.erl | 35 ++++---- 3 files changed, 67 insertions(+), 50 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 77494f4ba..e6fa1c537 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -70,6 +70,18 @@ -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). +-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR), + (fun() -> + IsFullBefore = is_inflight_full(InflightTID), + case (EXPR) of + blocked -> + ok; + ok -> + maybe_flush_after_async_reply(IsFullBefore) + end + end)() +). + -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). @@ -194,8 +206,8 @@ init({Id, Index, Opts}) -> ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. -running(enter, _, Data) -> - ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}), +running(enter, _, #{tref := _Tref} = Data) -> + ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}), %% According to `gen_statem' laws, we mustn't call `maybe_flush' %% directly because it may decide to return `{next_state, blocked, _}', %% and that's an invalid response for a state enter call. @@ -212,9 +224,8 @@ running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) -> handle_query_requests(Request0, Data); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); -running(internal, flush, St) -> - flush(St); running(info, {flush, _Ref}, _St) -> + ?tp(discarded_stale_flush, #{}), keep_state_and_data; running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) @@ -472,10 +483,15 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), + InflightCount = inflight_num_batches(InflightTID), + ?tp(buffer_worker_flush, #{ + queued => CurrentCount, + is_inflight_full => IsFull, + inflight => InflightCount + }), case {CurrentCount, IsFull} of {0, _} -> - ?tp(buffer_worker_queue_drained, #{inflight => inflight_num_batches(InflightTID)}), + ?tp(buffer_worker_queue_drained, #{inflight => InflightCount}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -714,18 +730,18 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> end, Batch ), - {Action, PostFn1} = reply_caller_defer_metrics(Id, hd(Replies), QueryOpts), - PostFns = + {ShouldAck, PostFns} = lists:foldl( - fun(Reply, PostFns) -> - {_, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), - [PostFn | PostFns] + fun(Reply, {_ShouldAck, PostFns}) -> + %% _ShouldAck should be the same as ShouldAck starting from the second reply + {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), + {ShouldAck, [PostFn | PostFns]} end, - [PostFn1], - tl(Replies) + {ack, []}, + Replies ), PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, - {Action, PostFn}. + {ShouldAck, PostFn}. reply_caller(Id, Reply, QueryOpts) -> {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), @@ -978,7 +994,7 @@ handle_async_reply( discard -> ok; continue -> - handle_async_reply1(ReplyContext, Result) + ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result)) end. handle_async_reply1( @@ -999,10 +1015,8 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), - IsFullBefore andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> @@ -1034,16 +1048,15 @@ do_handle_async_reply( ref => Ref, result => Result }), - IsFullBefore = is_inflight_full(InflightTID), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ?MODULE:block(Pid); + ok = ?MODULE:block(Pid), + blocked; ack -> - do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) - end, - ok = maybe_flush_after_async_reply(IsFullBefore). + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + end. handle_async_batch_reply( #{ @@ -1056,7 +1069,7 @@ handle_async_batch_reply( discard -> ok; continue -> - handle_async_batch_reply1(ReplyContext, Result) + ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result)) end. handle_async_batch_reply1( @@ -1072,21 +1085,19 @@ handle_async_batch_reply1( #{batch_or_query => Batch, ref => Ref, result => Result} ), Now = now_(), - IsFullBefore = is_inflight_full(InflightTID), case sieve_expired_requests(Batch, Now) of {_NotExpired, []} -> %% this is the critical code path, %% we try not to do ets:lookup in this case %% because the batch can be quite big - ok = do_handle_async_batch_reply(ReplyContext, Result); + do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) - end, - ok = maybe_flush_after_async_reply(IsFullBefore). + handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end. handle_async_batch_reply2([], _, _, _) -> %% should have caused the unknown_async_reply_discarded @@ -1124,9 +1135,8 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - ok = do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) - end, - ok. + do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) + end. do_handle_async_batch_reply( #{ @@ -1151,7 +1161,8 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(Pid); + ok = ?MODULE:block(Pid), + blocked; ack -> ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. @@ -1173,9 +1184,11 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> %% after it is handled, the inflight table must be even smaller %% hance we can rely on the buffer worker's flush timer to trigger %% the next flush + ?tp(skip_flushing_worker, #{}), ok; maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> %% the inflight table was full before handling aync reply + ?tp(do_flushing_worker, #{}), ok = ?MODULE:flush_worker(self()). %% check if the async reply is valid. @@ -1189,7 +1202,6 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> maybe_handle_unknown_async_reply(InflightTID, Ref) -> try ets:member(InflightTID, Ref) of true -> - %% NOTE: this does not mean the continue; false -> ?tp( @@ -1446,7 +1458,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), ok = dec_inflight(InflightTID, NumExpired), ok. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index a6b7b2339..3b5f83d05 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -314,6 +314,8 @@ counter_loop( %% drain the buffer (and inflights table) ReplyCount = 1 + (RandNum rem 3), Results = random_replies(ReplyCount), + %% add a delay to trigger inflight full + timer:sleep(5), lists:foreach( fun(Result) -> apply_reply(ReplyFun, Result) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 1362cd1cc..dfe64de24 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1510,8 +1510,9 @@ t_retry_async_inflight_full(_Config) -> %% this test case is to ensure the buffer worker will not go crazy even %% if the underlying connector is misbehaving: evaluate async callbacks multiple times t_async_reply_multi_eval(_Config) -> - ResumeInterval = 20, - AsyncInflightWindow = 5, + ResumeInterval = 5, + TotalTime = 5_000, + AsyncInflightWindow = 3, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( ?ID, @@ -1528,29 +1529,31 @@ t_async_reply_multi_eval(_Config) -> } ), ?check_trace( - #{timetrap => 15_000}, + #{timetrap => 30_000}, begin %% block ok = emqx_resource:simple_sync_query(?ID, block), - {ok, {ok, _}} = - ?wait_async_action( - inc_counter_in_parallel( - AsyncInflightWindow * 2, - fun() -> - Rand = rand:uniform(1000), - {random_reply, Rand} - end, - #{} - ), - #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, - ResumeInterval * 200 + ?wait_async_action( + inc_counter_in_parallel( + AsyncInflightWindow * 5, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} + end, + #{} ), + #{?snk_kind := buffer_worker_flush, inflight := 0, queued := 0}, + TotalTime + ), ok end, [ fun(Trace) -> - ?assertMatch([#{inflight := 0}], ?of_kind(buffer_worker_queue_drained, Trace)) + ?assertMatch( + [#{inflight := 0} | _], + lists:reverse(?of_kind(buffer_worker_queue_drained, Trace)) + ) end ] ),