diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index c74d6cdd1..11014a596 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -541,7 +541,9 @@ t_write_failure(Config) -> end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), - Trace = ?of_kind(buffer_worker_flush_nack, Trace0), + Trace = ?of_kind( + [buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0 + ), [#{result := Result} | _] = Trace, case Result of {async_return, {error, {resource_error, _}}} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 602551c33..167dcc02e 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -247,7 +247,7 @@ running(info, Info, _St) -> keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = St0) -> - ?tp(buffer_worker_enter_blocked, #{}), + ?tp(buffer_worker_enter_blocked, #{buffer_worker => self()}), %% discard the old timer, new timer will be started when entering running state again St = cancel_flush_timer(St0), {keep_state, St, {state_timeout, ResumeT, unblock}}; @@ -403,7 +403,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> buffer_worker_retry_inflight_failed, #{ ref => Ref, - query_or_batch => QueryOrBatch + query_or_batch => QueryOrBatch, + result => Result } ), {keep_state, Data1, {state_timeout, ResumeT, unblock}}; @@ -976,7 +977,7 @@ handle_async_worker_down(Data0, Pid) -> {AsyncWorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), Data = Data0#{async_workers := AsyncWorkers}, mark_inflight_items_as_retriable(Data, AsyncWorkerMRef), - {keep_state, Data}. + {next_state, blocked, Data}. -spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _. call_query(QM, Id, Index, Ref, Query, QueryOpts) -> @@ -1563,7 +1564,7 @@ mark_inflight_items_as_retriable(Data, AsyncWorkerMRef) -> end ), _NumAffected = ets:select_replace(InflightTID, MatchSpec), - ?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected}), + ?tp(buffer_worker_async_agent_down, #{num_affected => _NumAffected, buffer_worker => self()}), ok. %% used to update a batch after dropping expired individual queries. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 56d878859..508b8d96b 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1832,6 +1832,18 @@ t_async_pool_worker_death(_Config) -> NumReqs, lists:sum([N || #{num_affected := N} <- Events]) ), + + %% The `DOWN' signal must trigger the transition to the `blocked' state, + %% otherwise the request won't be retried until the buffer worker is `blocked' + %% for other reasons. + ?assert( + ?strict_causality( + #{?snk_kind := buffer_worker_async_agent_down, buffer_worker := _Pid0}, + #{?snk_kind := buffer_worker_enter_blocked, buffer_worker := _Pid1}, + _Pid0 =:= _Pid1, + Trace + ) + ), ok end ), diff --git a/changes/ce/fix-10887.en.md b/changes/ce/fix-10887.en.md new file mode 100644 index 000000000..5189864a8 --- /dev/null +++ b/changes/ce/fix-10887.en.md @@ -0,0 +1,3 @@ +Fixed a potential issue where requests to bridges might take a long time to be retried. + +This only affected low throughput scenarios, where the buffering layer could take a long time to detect connectivity and driver problems.