From fc614e16e55d0f3e484ab0ad53a3c03932a9d797 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 20:07:35 +0100 Subject: [PATCH] fix(bridge): update inflight items after partial expiry --- .../src/emqx_resource_buffer_worker.erl | 98 ++++++++++++------- .../test/emqx_resource_SUITE.erl | 2 +- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2f83a347a..6aa13092a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -335,11 +335,13 @@ resume_from_blocked(Data) -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, Query, Data); + {batch, Ref, NotExpired, []} -> + retry_inflight_sync(Ref, NotExpired, Data); {batch, Ref, NotExpired, Expired} -> NumExpired = length(Expired), - update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), + ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), - NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), + ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, NotExpired, Data) @@ -496,7 +498,7 @@ flush(Data0) -> {NotExpired, Expired} -> NumExpired = length(Expired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), - IsBatch = BatchSize =/= 1, + IsBatch = (BatchSize > 1), %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. %% Maybe we could re-open the queue? @@ -506,7 +508,6 @@ flush(Data0) -> ), Ref = make_request_ref(), do_flush(Data2, #{ - new_queue => Q1, is_batch => IsBatch, batch => NotExpired, ref => Ref, @@ -519,18 +520,16 @@ flush(Data0) -> is_batch := boolean(), batch := [queue_query()], ack_ref := replayq:ack_ref(), - ref := inflight_key(), - new_queue := replayq:q() + ref := inflight_key() }) -> gen_statem:event_handler_result(state(), data()). do_flush( - Data0, + #{queue := Q1} = Data0, #{ is_batch := false, batch := Batch, ref := Ref, - ack_ref := QAckRef, - new_queue := Q1 + ack_ref := QAckRef } ) -> #{ @@ -610,12 +609,11 @@ do_flush( end, {keep_state, Data1} end; -do_flush(Data0, #{ +do_flush(#{queue := Q1} = Data0, #{ is_batch := true, batch := Batch, ref := Ref, - ack_ref := QAckRef, - new_queue := Q1 + ack_ref := QAckRef }) -> #{ id := Id, @@ -715,17 +713,18 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> end, Batch ), - {ShouldAck, PostFns} = + {Action, PostFn1} = reply_caller_defer_metrics(Id, hd(Replies), QueryOpts), + PostFns = lists:foldl( - fun(Reply, {_ShouldAck, PostFns}) -> - {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), - {ShouldAck, [PostFn | PostFns]} + fun(Reply, PostFns) -> + {_, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), + [PostFn | PostFns] end, - {ack, []}, - Replies + [PostFn1], + tl(Replies) ), - PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, - {ShouldAck, PostFn}. + PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, + {Action, PostFn}. reply_caller(Id, Reply, QueryOpts) -> {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), @@ -1024,7 +1023,7 @@ do_handle_async_reply( case Action of nack -> %% Keep retrying. - mark_inflight_as_retriable(InflightTID, Ref), + ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) @@ -1051,15 +1050,40 @@ handle_async_batch_reply( IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)), - IsFullBefore andalso ?MODULE:flush_worker(Pid), + IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => Batch}), ok; - {NotExpired, Expired} -> - NumExpired = length(Expired), - emqx_resource_metrics:late_reply_inc(Id, NumExpired), - NumExpired > 0 andalso - ?tp(handle_async_reply_expired, #{expired => Expired}), - do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) + {_NotExpired, []} -> + do_handle_async_batch_reply(ReplyContext, Result); + {_NotExpired, _Expired} -> + %% partial expire + %% the batch from reply context is minimized, so it cannot be used + %% to update the inflight items, hence discard Batch and lookup the RealBatch + ?tp(handle_async_reply_expired, #{expired => _Expired}), + case ets:lookup(InflightTID, Ref) of + [] -> + %% e.g. if the driver evaluates it more than once + %% which should really be a bug, TODO: add a unknown_reply counter + ok; + [?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef)] -> + %% All batch items share the same HasBeenSent flag + %% So we just take the original flag from the ReplyContext batch + %% and put it back to the batch found in inflight table + %% which must have already been set to `false` + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), + RealNotExpired = + lists:map( + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + end, + RealNotExpired0 + ), + NumExpired = length(RealExpired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), + do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + end end. do_handle_async_batch_reply( @@ -1084,7 +1108,7 @@ do_handle_async_batch_reply( case Action of nack -> %% Keep retrying. - mark_inflight_as_retriable(InflightTID, Ref), + ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) @@ -1320,10 +1344,15 @@ ack_inflight(InflightTID, Ref, Id, Index) -> [] -> 0 end, - IsAcked = (Count > 0), - IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - IsAcked. + IsKnownRef = (Count > 0), + case IsKnownRef of + true -> + ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); + false -> + ok + end, + IsKnownRef. mark_inflight_items_as_retriable(Data, WorkerMRef) -> #{inflight_tid := InflightTID} = Data, @@ -1341,10 +1370,9 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}), - ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 984b3b04a..92f069739 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1997,6 +1997,7 @@ do_t_expiration_async_after_reply(IsBatch) -> {ok, _} = ?block_until( #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), + wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), unlink(Pid0), exit(Pid0, kill), @@ -2011,7 +2012,6 @@ do_t_expiration_async_after_reply(IsBatch) -> ], ?of_kind(handle_async_reply_expired, Trace) ), - wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), Metrics = tap_metrics(?LINE), ?assertMatch( #{