fix(buffer_worker): make buffer worker enter `blocked` state when async worker dies

Fixes https://emqx.atlassian.net/browse/EMQX-10074

Otherwise, requests from those async workers, now retriable, might not
be retried until the buffer worker blocks for other reasons, which
might take a long time.
This commit is contained in:
Thales Macedo Garitezi 2023-05-30 14:41:59 -03:00
parent 5c8b73e829
commit 6be8ff378e
3 changed files with 18 additions and 3 deletions

View File

@ -247,7 +247,7 @@ running(info, Info, _St) ->
keep_state_and_data. keep_state_and_data.
blocked(enter, _, #{resume_interval := ResumeT} = St0) -> blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
?tp(buffer_worker_enter_blocked, #{}), ?tp(buffer_worker_enter_blocked, #{buffer_worker => self()}),
%% discard the old timer, new timer will be started when entering running state again %% discard the old timer, new timer will be started when entering running state again
St = cancel_flush_timer(St0), St = cancel_flush_timer(St0),
{keep_state, St, {state_timeout, ResumeT, unblock}}; {keep_state, St, {state_timeout, ResumeT, unblock}};
@ -976,7 +976,7 @@ handle_async_worker_down(Data0, Pid) ->
{AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), {AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
Data = Data0#{async_workers := AsyncWorkers}, Data = Data0#{async_workers := AsyncWorkers},
mark_inflight_items_as_retriable(Data, AsyncWorkerMRef), mark_inflight_items_as_retriable(Data, AsyncWorkerMRef),
{keep_state, Data}. {next_state, blocked, Data}.
-spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. -spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
call_query(QM, Id, Index, Ref, Query, QueryOpts) -> call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
@ -1563,7 +1563,7 @@ mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) ->
end end
), ),
_NumAffected = ets:select_replace(InflightTID, MatchSpec), _NumAffected = ets:select_replace(InflightTID, MatchSpec),
?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected}), ?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected, buffer_worker => self()}),
ok. ok.
%% used to update a batch after dropping expired individual queries. %% used to update a batch after dropping expired individual queries.

View File

@ -1832,6 +1832,18 @@ t_async_pool_worker_death(_Config) ->
NumReqs, NumReqs,
lists:sum([N || #{num_affected := N} <- Events]) lists:sum([N || #{num_affected := N} <- Events])
), ),
%% The `DOWN' signal must trigger the transition to the `blocked' state,
%% otherwise the request won't be retried until the buffer worker is `blocked'
%% for other reasons.
?assert(
?strict_causality(
#{?snk_kind := buffer_worker_async_agent_down, buffer_worker := _Pid0},
#{?snk_kind := buffer_worker_enter_blocked, buffer_worker := _Pid1},
_Pid0 =:= _Pid1,
Trace
)
),
ok ok
end end
), ),

View File

@ -0,0 +1,3 @@
Fixed a potential issue where requests to bridges might take a long time to be retried.
This only affected low throughput scenarios, where the buffering layer could take a long time to detect connectivity and driver problems.