Merge pull request #10887 from thalesmg/fix-async-worker-down-buffer-worker-20230530-v50
fix: block buffer workers so they may retry requests
This commit is contained in:
commit
a7f4f81c38
|
@ -541,7 +541,9 @@ t_write_failure(Config) ->
|
||||||
end),
|
end),
|
||||||
fun(Trace0) ->
|
fun(Trace0) ->
|
||||||
ct:pal("trace: ~p", [Trace0]),
|
ct:pal("trace: ~p", [Trace0]),
|
||||||
Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
|
Trace = ?of_kind(
|
||||||
|
[buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0
|
||||||
|
),
|
||||||
[#{result := Result} | _] = Trace,
|
[#{result := Result} | _] = Trace,
|
||||||
case Result of
|
case Result of
|
||||||
{async_return, {error, {resource_error, _}}} ->
|
{async_return, {error, {resource_error, _}}} ->
|
||||||
|
|
|
@ -247,7 +247,7 @@ running(info, Info, _St) ->
|
||||||
keep_state_and_data.
|
keep_state_and_data.
|
||||||
|
|
||||||
blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
|
blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
|
||||||
?tp(buffer_worker_enter_blocked, #{}),
|
?tp(buffer_worker_enter_blocked, #{buffer_worker => self()}),
|
||||||
%% discard the old timer, new timer will be started when entering running state again
|
%% discard the old timer, new timer will be started when entering running state again
|
||||||
St = cancel_flush_timer(St0),
|
St = cancel_flush_timer(St0),
|
||||||
{keep_state, St, {state_timeout, ResumeT, unblock}};
|
{keep_state, St, {state_timeout, ResumeT, unblock}};
|
||||||
|
@ -403,7 +403,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
||||||
buffer_worker_retry_inflight_failed,
|
buffer_worker_retry_inflight_failed,
|
||||||
#{
|
#{
|
||||||
ref => Ref,
|
ref => Ref,
|
||||||
query_or_batch => QueryOrBatch
|
query_or_batch => QueryOrBatch,
|
||||||
|
result => Result
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{keep_state, Data1, {state_timeout, ResumeT, unblock}};
|
{keep_state, Data1, {state_timeout, ResumeT, unblock}};
|
||||||
|
@ -976,7 +977,7 @@ handle_async_worker_down(Data0, Pid) ->
|
||||||
{AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
|
{AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
|
||||||
Data = Data0#{async_workers := AsyncWorkers},
|
Data = Data0#{async_workers := AsyncWorkers},
|
||||||
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef),
|
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef),
|
||||||
{keep_state, Data}.
|
{next_state, blocked, Data}.
|
||||||
|
|
||||||
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
|
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
|
||||||
call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
|
call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
|
||||||
|
@ -1563,7 +1564,7 @@ mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) ->
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
|
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
|
||||||
?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected}),
|
?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected, buffer_worker => self()}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% used to update a batch after dropping expired individual queries.
|
%% used to update a batch after dropping expired individual queries.
|
||||||
|
|
|
@ -1832,6 +1832,18 @@ t_async_pool_worker_death(_Config) ->
|
||||||
NumReqs,
|
NumReqs,
|
||||||
lists:sum([N || #{num_affected := N} <- Events])
|
lists:sum([N || #{num_affected := N} <- Events])
|
||||||
),
|
),
|
||||||
|
|
||||||
|
%% The `DOWN' signal must trigger the transition to the `blocked' state,
|
||||||
|
%% otherwise the request won't be retried until the buffer worker is `blocked'
|
||||||
|
%% for other reasons.
|
||||||
|
?assert(
|
||||||
|
?strict_causality(
|
||||||
|
#{?snk_kind := buffer_worker_async_agent_down, buffer_worker := _Pid0},
|
||||||
|
#{?snk_kind := buffer_worker_enter_blocked, buffer_worker := _Pid1},
|
||||||
|
_Pid0 =:= _Pid1,
|
||||||
|
Trace
|
||||||
|
)
|
||||||
|
),
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Fixed a potential issue where requests to bridges might take a long time to be retried.
|
||||||
|
|
||||||
|
This only affected low throughput scenarios, where the buffering layer could take a long time to detect connectivity and driver problems.
|
Loading…
Reference in New Issue