Merge pull request #10717 from thalesmg/fix-bw-flush-timer-full-infl-r50

fix(buffer_worker): avoid setting flush timer when inflight is full
This commit is contained in:
Thales Macedo Garitezi 2023-05-17 14:49:27 -03:00 committed by GitHub
commit 10f6edd6ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 54 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.15"},
{vsn, "0.1.16"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [

View File

@ -70,18 +70,6 @@
-define(RETRY_IDX, 3).
-define(WORKER_MREF_IDX, 4).
-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR),
(fun() ->
IsFullBefore = is_inflight_full(InflightTID),
case (EXPR) of
blocked ->
ok;
ok ->
ok = maybe_flush_after_async_reply(IsFullBefore)
end
end)()
).
-type id() :: binary().
-type index() :: pos_integer().
-type expire_at() :: infinity | integer().
@ -337,7 +325,8 @@ resume_from_blocked(Data) ->
{next_state, running, Data}
end;
{expired, Ref, Batch} ->
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
WorkerPid = self(),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
?tp(buffer_worker_retry_expired, #{expired => Batch}),
resume_from_blocked(Data);
@ -389,7 +378,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
{keep_state, Data0, {state_timeout, ResumeT, unblock}};
%% Send ok or failed but the resource is working
{ack, PostFn} ->
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
WorkerPid = self(),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
%% we need to defer bumping the counters after
%% `inflight_drop' to avoid the race condition when an
%% inflight request might get completed concurrently with
@ -495,8 +485,7 @@ flush(Data0) ->
{keep_state, Data1};
{_, true} ->
?tp(buffer_worker_flush_but_inflight_full, #{}),
Data2 = ensure_flush_timer(Data1),
{keep_state, Data2};
{keep_state, Data1};
{_, false} ->
?tp(buffer_worker_flush_before_pop, #{}),
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
@ -596,13 +585,14 @@ do_flush(
%% must ensure the async worker is being monitored for
%% such requests.
IsUnrecoverableError = is_unrecoverable_error(Result),
WorkerPid = self(),
case is_async_return(Result) of
true when IsUnrecoverableError ->
ack_inflight(InflightTID, Ref, Id, Index);
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
true ->
ok;
false ->
ack_inflight(InflightTID, Ref, Id, Index)
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
end,
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
@ -680,13 +670,14 @@ do_flush(#{queue := Q1} = Data0, #{
%% must ensure the async worker is being monitored for
%% such requests.
IsUnrecoverableError = is_unrecoverable_error(Result),
WorkerPid = self(),
case is_async_return(Result) of
true when IsUnrecoverableError ->
ack_inflight(InflightTID, Ref, Id, Index);
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
true ->
ok;
false ->
ack_inflight(InflightTID, Ref, Id, Index)
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
end,
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
@ -1006,7 +997,7 @@ handle_async_reply(
discard ->
ok;
continue ->
?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result))
handle_async_reply1(ReplyContext, Result)
end.
handle_async_reply1(
@ -1015,6 +1006,7 @@ handle_async_reply1(
inflight_tid := InflightTID,
resource_id := Id,
worker_index := Index,
buffer_worker := WorkerPid,
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
} = ReplyContext,
Result
@ -1026,7 +1018,7 @@ handle_async_reply1(
Now = now_(),
case is_expired(ExpireAt, Now) of
true ->
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
?tp(handle_async_reply_expired, #{expired => [_Query]}),
ok;
@ -1040,7 +1032,7 @@ do_handle_async_reply(
resource_id := Id,
request_ref := Ref,
worker_index := Index,
buffer_worker := Pid,
buffer_worker := WorkerPid,
inflight_tid := InflightTID,
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
},
@ -1063,10 +1055,10 @@ do_handle_async_reply(
nack ->
%% Keep retrying.
ok = mark_inflight_as_retriable(InflightTID, Ref),
ok = ?MODULE:block(Pid),
ok = ?MODULE:block(WorkerPid),
blocked;
ack ->
ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
end.
handle_async_batch_reply(
@ -1081,7 +1073,7 @@ handle_async_batch_reply(
discard ->
ok;
continue ->
?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result))
handle_async_batch_reply1(ReplyContext, Result)
end.
handle_async_batch_reply1(
@ -1119,6 +1111,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
#{
resource_id := Id,
worker_index := Index,
buffer_worker := WorkerPid,
inflight_tid := InflightTID,
request_ref := Ref,
min_batch := Batch
@ -1141,7 +1134,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
case RealNotExpired of
[] ->
%% all expired, no need to update back the inflight batch
_ = ack_inflight(InflightTID, Ref, Id, Index),
_ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
ok;
_ ->
%% some queries are not expired, put them back to the inflight batch
@ -1152,7 +1145,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
do_handle_async_batch_reply(
#{
buffer_worker := Pid,
buffer_worker := WorkerPid,
resource_id := Id,
worker_index := Index,
inflight_tid := InflightTID,
@ -1173,14 +1166,14 @@ do_handle_async_batch_reply(
nack ->
%% Keep retrying.
ok = mark_inflight_as_retriable(InflightTID, Ref),
ok = ?MODULE:block(Pid),
ok = ?MODULE:block(WorkerPid),
blocked;
ack ->
ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
end.
do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) ->
IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
case maps:get(simple_query, QueryOpts, false) of
true ->
PostFn();
@ -1191,18 +1184,6 @@ do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
end,
ok.
maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
%% inflight was not full before async reply is handled,
%% after it is handled, the inflight table must be even smaller
%% hance we can rely on the buffer worker's flush timer to trigger
%% the next flush
?tp(skip_flushing_worker, #{}),
ok;
maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
%% the inflight table was full before handling aync reply
?tp(do_flushing_worker, #{}),
ok = ?MODULE:flush_worker(self()).
%% check if the async reply is valid.
%% e.g. if a connector evaluates the callback more than once:
%% 1. If the request was previously deleted from inflight table due to
@ -1429,9 +1410,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
),
ok.
ack_inflight(undefined, _Ref, _Id, _Index) ->
ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) ->
false;
ack_inflight(InflightTID, Ref, Id, Index) ->
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) ->
{Count, Removed} =
case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
@ -1441,7 +1422,11 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
[] ->
{0, false}
end,
ok = dec_inflight_remove(InflightTID, Count, Removed),
FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
case FlushCheck of
continue -> ok;
flush -> ?MODULE:flush_worker(WorkerPid)
end,
IsKnownRef = (Count > 0),
case IsKnownRef of
true ->
@ -1476,16 +1461,32 @@ inc_inflight(InflightTID, Count) ->
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
ok.
-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
continue | flush.
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
ok;
continue;
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
ok;
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
%% if the new value is Max - 1, it means that we've just made room
%% in the inflight table, so we should poke the buffer worker to
%% make it continue flushing.
case NewValue =:= MaxValue - 1 of
true -> flush;
false -> continue
end;
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
%% If Count > 0, it must have been removed
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
ok.
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
%% if the new value is Max - 1, it means that we've just made room
%% in the inflight table, so we should poke the buffer worker to
%% make it continue flushing.
case NewValue =:= MaxValue - 1 of
true -> flush;
false -> continue
end.
dec_inflight_update(_InflightTID, _Count = 0) ->
ok;

View File

@ -1627,7 +1627,11 @@ t_retry_async_inflight_full(_Config) ->
end
]
),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
?retry(
_Sleep = 300,
_Attempts0 = 20,
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
),
ok.
%% this test case is to ensure the buffer worker will not go crazy even

View File

@ -0,0 +1 @@
Fixed an issue where the buffering layer processes could use a lot of CPU when inflight window is full.