fix(buffer_worker): correctly flush the buffer workers when inflight table room is made
The previous commit uncovered another bug that was hidden by it: `maybe_flush_after_async_reply` was sending a message to the wrong PID. It was sending a message to `self()` meaning to target a buffer worker, but `self()` in that context is never the buffer worker, it's the connector's worker. This change also revealed a race condition where the buffer workers could stop flushing messages. So we piggy-backed on the atomic update of the table size count to check if the buffer worker should be poked to continue flushing. This allows us to get rid of `maybe_flush_after_async_reply` altogether.
This commit is contained in:
parent
657df05ad9
commit
85089a3210
|
@ -70,18 +70,6 @@
|
||||||
-define(RETRY_IDX, 3).
|
-define(RETRY_IDX, 3).
|
||||||
-define(WORKER_MREF_IDX, 4).
|
-define(WORKER_MREF_IDX, 4).
|
||||||
|
|
||||||
-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR),
|
|
||||||
(fun() ->
|
|
||||||
IsFullBefore = is_inflight_full(InflightTID),
|
|
||||||
case (EXPR) of
|
|
||||||
blocked ->
|
|
||||||
ok;
|
|
||||||
ok ->
|
|
||||||
ok = maybe_flush_after_async_reply(IsFullBefore)
|
|
||||||
end
|
|
||||||
end)()
|
|
||||||
).
|
|
||||||
|
|
||||||
-type id() :: binary().
|
-type id() :: binary().
|
||||||
-type index() :: pos_integer().
|
-type index() :: pos_integer().
|
||||||
-type expire_at() :: infinity | integer().
|
-type expire_at() :: infinity | integer().
|
||||||
|
@ -337,7 +325,8 @@ resume_from_blocked(Data) ->
|
||||||
{next_state, running, Data}
|
{next_state, running, Data}
|
||||||
end;
|
end;
|
||||||
{expired, Ref, Batch} ->
|
{expired, Ref, Batch} ->
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
WorkerPid = self(),
|
||||||
|
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
|
||||||
IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
|
IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
|
||||||
?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
||||||
resume_from_blocked(Data);
|
resume_from_blocked(Data);
|
||||||
|
@ -389,7 +378,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
||||||
{keep_state, Data0, {state_timeout, ResumeT, unblock}};
|
{keep_state, Data0, {state_timeout, ResumeT, unblock}};
|
||||||
%% Send ok or failed but the resource is working
|
%% Send ok or failed but the resource is working
|
||||||
{ack, PostFn} ->
|
{ack, PostFn} ->
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
WorkerPid = self(),
|
||||||
|
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
|
||||||
%% we need to defer bumping the counters after
|
%% we need to defer bumping the counters after
|
||||||
%% `inflight_drop' to avoid the race condition when an
|
%% `inflight_drop' to avoid the race condition when an
|
||||||
%% inflight request might get completed concurrently with
|
%% inflight request might get completed concurrently with
|
||||||
|
@ -595,13 +585,14 @@ do_flush(
|
||||||
%% must ensure the async worker is being monitored for
|
%% must ensure the async worker is being monitored for
|
||||||
%% such requests.
|
%% such requests.
|
||||||
IsUnrecoverableError = is_unrecoverable_error(Result),
|
IsUnrecoverableError = is_unrecoverable_error(Result),
|
||||||
|
WorkerPid = self(),
|
||||||
case is_async_return(Result) of
|
case is_async_return(Result) of
|
||||||
true when IsUnrecoverableError ->
|
true when IsUnrecoverableError ->
|
||||||
ack_inflight(InflightTID, Ref, Id, Index);
|
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ack_inflight(InflightTID, Ref, Id, Index)
|
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
|
||||||
end,
|
end,
|
||||||
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
|
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
||||||
|
@ -679,13 +670,14 @@ do_flush(#{queue := Q1} = Data0, #{
|
||||||
%% must ensure the async worker is being monitored for
|
%% must ensure the async worker is being monitored for
|
||||||
%% such requests.
|
%% such requests.
|
||||||
IsUnrecoverableError = is_unrecoverable_error(Result),
|
IsUnrecoverableError = is_unrecoverable_error(Result),
|
||||||
|
WorkerPid = self(),
|
||||||
case is_async_return(Result) of
|
case is_async_return(Result) of
|
||||||
true when IsUnrecoverableError ->
|
true when IsUnrecoverableError ->
|
||||||
ack_inflight(InflightTID, Ref, Id, Index);
|
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid);
|
||||||
true ->
|
true ->
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
ack_inflight(InflightTID, Ref, Id, Index)
|
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid)
|
||||||
end,
|
end,
|
||||||
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
|
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
|
||||||
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
|
||||||
|
@ -1005,7 +997,7 @@ handle_async_reply(
|
||||||
discard ->
|
discard ->
|
||||||
ok;
|
ok;
|
||||||
continue ->
|
continue ->
|
||||||
?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result))
|
handle_async_reply1(ReplyContext, Result)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_async_reply1(
|
handle_async_reply1(
|
||||||
|
@ -1014,6 +1006,7 @@ handle_async_reply1(
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
worker_index := Index,
|
worker_index := Index,
|
||||||
|
buffer_worker := WorkerPid,
|
||||||
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
|
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
|
||||||
} = ReplyContext,
|
} = ReplyContext,
|
||||||
Result
|
Result
|
||||||
|
@ -1025,7 +1018,7 @@ handle_async_reply1(
|
||||||
Now = now_(),
|
Now = now_(),
|
||||||
case is_expired(ExpireAt, Now) of
|
case is_expired(ExpireAt, Now) of
|
||||||
true ->
|
true ->
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
|
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
|
||||||
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
||||||
?tp(handle_async_reply_expired, #{expired => [_Query]}),
|
?tp(handle_async_reply_expired, #{expired => [_Query]}),
|
||||||
ok;
|
ok;
|
||||||
|
@ -1039,7 +1032,7 @@ do_handle_async_reply(
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
worker_index := Index,
|
worker_index := Index,
|
||||||
buffer_worker := Pid,
|
buffer_worker := WorkerPid,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
|
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
|
||||||
},
|
},
|
||||||
|
@ -1062,10 +1055,10 @@ do_handle_async_reply(
|
||||||
nack ->
|
nack ->
|
||||||
%% Keep retrying.
|
%% Keep retrying.
|
||||||
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
||||||
ok = ?MODULE:block(Pid),
|
ok = ?MODULE:block(WorkerPid),
|
||||||
blocked;
|
blocked;
|
||||||
ack ->
|
ack ->
|
||||||
ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
|
ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_async_batch_reply(
|
handle_async_batch_reply(
|
||||||
|
@ -1080,7 +1073,7 @@ handle_async_batch_reply(
|
||||||
discard ->
|
discard ->
|
||||||
ok;
|
ok;
|
||||||
continue ->
|
continue ->
|
||||||
?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result))
|
handle_async_batch_reply1(ReplyContext, Result)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_async_batch_reply1(
|
handle_async_batch_reply1(
|
||||||
|
@ -1118,6 +1111,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
#{
|
#{
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
worker_index := Index,
|
worker_index := Index,
|
||||||
|
buffer_worker := WorkerPid,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
min_batch := Batch
|
min_batch := Batch
|
||||||
|
@ -1140,7 +1134,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
case RealNotExpired of
|
case RealNotExpired of
|
||||||
[] ->
|
[] ->
|
||||||
%% all expired, no need to update back the inflight batch
|
%% all expired, no need to update back the inflight batch
|
||||||
_ = ack_inflight(InflightTID, Ref, Id, Index),
|
_ = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
%% some queries are not expired, put them back to the inflight batch
|
%% some queries are not expired, put them back to the inflight batch
|
||||||
|
@ -1151,7 +1145,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
|
|
||||||
do_handle_async_batch_reply(
|
do_handle_async_batch_reply(
|
||||||
#{
|
#{
|
||||||
buffer_worker := Pid,
|
buffer_worker := WorkerPid,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
worker_index := Index,
|
worker_index := Index,
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
|
@ -1172,14 +1166,14 @@ do_handle_async_batch_reply(
|
||||||
nack ->
|
nack ->
|
||||||
%% Keep retrying.
|
%% Keep retrying.
|
||||||
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
ok = mark_inflight_as_retriable(InflightTID, Ref),
|
||||||
ok = ?MODULE:block(Pid),
|
ok = ?MODULE:block(WorkerPid),
|
||||||
blocked;
|
blocked;
|
||||||
ack ->
|
ack ->
|
||||||
ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts)
|
ok = do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
|
do_async_ack(InflightTID, Ref, Id, Index, WorkerPid, PostFn, QueryOpts) ->
|
||||||
IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index),
|
IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid),
|
||||||
case maps:get(simple_query, QueryOpts, false) of
|
case maps:get(simple_query, QueryOpts, false) of
|
||||||
true ->
|
true ->
|
||||||
PostFn();
|
PostFn();
|
||||||
|
@ -1190,18 +1184,6 @@ do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) ->
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) ->
|
|
||||||
%% inflight was not full before async reply is handled,
|
|
||||||
%% after it is handled, the inflight table must be even smaller
|
|
||||||
%% hance we can rely on the buffer worker's flush timer to trigger
|
|
||||||
%% the next flush
|
|
||||||
?tp(skip_flushing_worker, #{}),
|
|
||||||
ok;
|
|
||||||
maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) ->
|
|
||||||
%% the inflight table was full before handling aync reply
|
|
||||||
?tp(do_flushing_worker, #{}),
|
|
||||||
ok = ?MODULE:flush_worker(self()).
|
|
||||||
|
|
||||||
%% check if the async reply is valid.
|
%% check if the async reply is valid.
|
||||||
%% e.g. if a connector evaluates the callback more than once:
|
%% e.g. if a connector evaluates the callback more than once:
|
||||||
%% 1. If the request was previously deleted from inflight table due to
|
%% 1. If the request was previously deleted from inflight table due to
|
||||||
|
@ -1428,9 +1410,9 @@ store_async_worker_reference(InflightTID, Ref, WorkerMRef) when
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ack_inflight(undefined, _Ref, _Id, _Index) ->
|
ack_inflight(undefined, _Ref, _Id, _Index, _WorkerPid) ->
|
||||||
false;
|
false;
|
||||||
ack_inflight(InflightTID, Ref, Id, Index) ->
|
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) ->
|
||||||
{Count, Removed} =
|
{Count, Removed} =
|
||||||
case ets:take(InflightTID, Ref) of
|
case ets:take(InflightTID, Ref) of
|
||||||
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
|
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
|
||||||
|
@ -1440,7 +1422,11 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
|
||||||
[] ->
|
[] ->
|
||||||
{0, false}
|
{0, false}
|
||||||
end,
|
end,
|
||||||
ok = dec_inflight_remove(InflightTID, Count, Removed),
|
FlushCheck = dec_inflight_remove(InflightTID, Count, Removed),
|
||||||
|
case FlushCheck of
|
||||||
|
continue -> ok;
|
||||||
|
flush -> ?MODULE:flush_worker(WorkerPid)
|
||||||
|
end,
|
||||||
IsKnownRef = (Count > 0),
|
IsKnownRef = (Count > 0),
|
||||||
case IsKnownRef of
|
case IsKnownRef of
|
||||||
true ->
|
true ->
|
||||||
|
@ -1475,16 +1461,32 @@ inc_inflight(InflightTID, Count) ->
|
||||||
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
|
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
-spec dec_inflight_remove(undefined | ets:tid(), non_neg_integer(), Removed :: boolean()) ->
|
||||||
|
continue | flush.
|
||||||
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
|
dec_inflight_remove(_InflightTID, _Count = 0, _Removed = false) ->
|
||||||
ok;
|
continue;
|
||||||
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
|
dec_inflight_remove(InflightTID, _Count = 0, _Removed = true) ->
|
||||||
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
||||||
ok;
|
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
|
||||||
|
%% if the new value is Max - 1, it means that we've just made room
|
||||||
|
%% in the inflight table, so we should poke the buffer worker to
|
||||||
|
%% make it continue flushing.
|
||||||
|
case NewValue =:= MaxValue - 1 of
|
||||||
|
true -> flush;
|
||||||
|
false -> continue
|
||||||
|
end;
|
||||||
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
|
dec_inflight_remove(InflightTID, Count, _Removed = true) when Count > 0 ->
|
||||||
%% If Count > 0, it must have been removed
|
%% If Count > 0, it must have been removed
|
||||||
_ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
NewValue = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}),
|
||||||
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
|
_ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
|
||||||
ok.
|
MaxValue = emqx_utils_ets:lookup_value(InflightTID, ?MAX_SIZE_REF, 0),
|
||||||
|
%% if the new value is Max - 1, it means that we've just made room
|
||||||
|
%% in the inflight table, so we should poke the buffer worker to
|
||||||
|
%% make it continue flushing.
|
||||||
|
case NewValue =:= MaxValue - 1 of
|
||||||
|
true -> flush;
|
||||||
|
false -> continue
|
||||||
|
end.
|
||||||
|
|
||||||
dec_inflight_update(_InflightTID, _Count = 0) ->
|
dec_inflight_update(_InflightTID, _Count = 0) ->
|
||||||
ok;
|
ok;
|
||||||
|
|
|
@ -1627,7 +1627,11 @@ t_retry_async_inflight_full(_Config) ->
|
||||||
end
|
end
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
?retry(
|
||||||
|
_Sleep = 300,
|
||||||
|
_Attempts0 = 20,
|
||||||
|
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% this test case is to ensure the buffer worker will not go crazy even
|
%% this test case is to ensure the buffer worker will not go crazy even
|
||||||
|
|
Loading…
Reference in New Issue