diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 3e264cb3e..3b92f1200 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.15"}, + {vsn, "0.1.16"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2dd14c46b..6145c3d87 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -70,18 +70,6 @@ -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). --define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR), - (fun() -> - IsFullBefore = is_inflight_full(InflightTID), - case (EXPR) of - blocked -> - ok; - ok -> - ok = maybe_flush_after_async_reply(IsFullBefore) - end - end)() -). - -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). @@ -337,7 +325,8 @@ resume_from_blocked(Data) -> {next_state, running, Data} end; {expired, Ref, Batch} -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + WorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), ?tp(buffer_worker_retry_expired, #{expired => Batch}), resume_from_blocked(Data); @@ -389,7 +378,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> {keep_state, Data0, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working {ack, PostFn} -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + WorkerPid = self(), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), %% we need to defer bumping the counters after %% `inflight_drop' to avoid the race condition when an %% inflight request might get completed concurrently with @@ -495,8 +485,7 @@ flush(Data0) -> {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), - Data2 = ensure_flush_timer(Data1), - {keep_state, Data2}; + {keep_state, Data1}; {_, false} -> ?tp(buffer_worker_flush_before_pop, #{}), {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), @@ -596,13 +585,14 @@ do_flush( %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), + WorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, Id, Index); + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, Id, Index) + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) end, {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), @@ -680,13 +670,14 @@ do_flush(#{queue := Q1} = Data0, #{ %% must ensure the async worker is being monitored for %% such requests. IsUnrecoverableError = is_unrecoverable_error(Result), + WorkerPid = self(), case is_async_return(Result) of true when IsUnrecoverableError -> - ack_inflight(InflightTID, Ref, Id, Index); + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid); true -> ok; false -> - ack_inflight(InflightTID, Ref, Id, Index) + ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) end, {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), @@ -1006,7 +997,7 @@ handle_async_reply( discard -> ok; continue -> - ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result)) + handle_async_reply1(ReplyContext, Result) end. handle_async_reply1( @@ -1015,6 +1006,7 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, worker_index := Index, + buffer_worker := WorkerPid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result @@ -1026,7 +1018,7 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; @@ -1040,7 +1032,7 @@ do_handle_async_reply( resource_id := Id, request_ref := Ref, worker_index := Index, - buffer_worker := Pid, + buffer_worker := WorkerPid, inflight_tid := InflightTID, min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, @@ -1063,10 +1055,10 @@ do_handle_async_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(Pid), + ok = ?MODULE:block(WorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) end. handle_async_batch_reply( @@ -1081,7 +1073,7 @@ handle_async_batch_reply( discard -> ok; continue -> - ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result)) + handle_async_batch_reply1(ReplyContext, Result) end. handle_async_batch_reply1( @@ -1119,6 +1111,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> #{ resource_id := Id, worker_index := Index, + buffer_worker := WorkerPid, inflight_tid := InflightTID, request_ref := Ref, min_batch := Batch @@ -1141,7 +1134,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - _ = ack_inflight(InflightTID, Ref, Id, Index), + _ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), ok; _ -> %% some queries are not expired, put them back to the inflight batch @@ -1152,7 +1145,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> do_handle_async_batch_reply( #{ - buffer_worker := Pid, + buffer_worker := WorkerPid, resource_id := Id, worker_index := Index, inflight_tid := InflightTID, @@ -1173,14 +1166,14 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(Pid), + ok = ?MODULE:block(WorkerPid), blocked; ack -> - ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) end. -do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> - IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), +do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) -> + IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), case maps:get(simple_query, QueryOpts, false) of true -> PostFn(); @@ -1191,18 +1184,6 @@ do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> end, ok. -maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> - %% inflight was not full before async reply is handled, - %% after it is handled, the inflight table must be even smaller - %% hance we can rely on the buffer worker's flush timer to trigger - %% the next flush - ?tp(skip_flushing_worker, #{}), - ok; -maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> - %% the inflight table was full before handling aync reply - ?tp(do_flushing_worker, #{}), - ok = ?MODULE:flush_worker(self()). - %% check if the async reply is valid. %% e.g. if a connector evaluates the callback more than once: %% 1. If the request was previously deleted from inflight table due to @@ -1429,9 +1410,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when ), ok. -ack_inflight(undefined, _Ref, _Id, _Index) -> +ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) -> false; -ack_inflight(InflightTID, Ref, Id, Index) -> +ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) -> {Count, Removed} = case ets:take(InflightTID, Ref) of [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] -> @@ -1441,7 +1422,11 @@ ack_inflight(InflightTID, Ref, Id, Index) -> [] -> {0, false} end, - ok = dec_inflight_remove(InflightTID, Count, Removed), + FlushCheck = dec_inflight_remove(InflightTID, Count, Removed), + case FlushCheck of + continue -> ok; + flush -> ?MODULE:flush_worker(WorkerPid) + end, IsKnownRef = (Count > 0), case IsKnownRef of true -> @@ -1476,16 +1461,32 @@ inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}), ok. +-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) -> + continue | flush. dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) -> - ok; + continue; dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) -> - _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), - ok; + NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), + MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), + %% if the new value is Max - 1, it means that we've just made room + %% in the inflight table, so we should poke the buffer worker to + %% make it continue flushing. + case NewValue =:= MaxValue - 1 of + true -> flush; + false -> continue + end; dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 -> %% If Count > 0, it must have been removed - _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), + NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), - ok. + MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0), + %% if the new value is Max - 1, it means that we've just made room + %% in the inflight table, so we should poke the buffer worker to + %% make it continue flushing. + case NewValue =:= MaxValue - 1 of + true -> flush; + false -> continue + end. dec_inflight_update(_InflightTID, _Count = 0) -> ok; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 809f101a8..fc338b512 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1627,7 +1627,11 @@ t_retry_async_inflight_full(_Config) -> end ] ), - ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + ?retry( + _Sleep = 300, + _Attempts0 = 20, + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) + ), ok. %% this test case is to ensure the buffer worker will not go crazy even diff --git a/changes/ce/fix-10717.en.md b/changes/ce/fix-10717.en.md new file mode 100644 index 000000000..4c33d6971 --- /dev/null +++ b/changes/ce/fix-10717.en.md @@ -0,0 +1 @@ +Fixed an issue where the buffering layer processes could use a lot of CPU when inflight window is full.