From fa01deb3ebdf27e0463fee6dc423265cb016dc81 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 17 Jan 2023 11:55:40 -0300 Subject: [PATCH] chore: retry as much as possible, don't reply to caller too soon --- apps/emqx_authz/src/emqx_authz_http.erl | 12 +- .../emqx_authz/test/emqx_authz_http_SUITE.erl | 2 +- apps/emqx_bridge/src/emqx_bridge.erl | 7 +- .../test/emqx_bridge_mqtt_SUITE.erl | 19 +- .../i18n/emqx_resource_schema_i18n.conf | 11 + .../src/emqx_resource_worker.erl | 295 +++++++++--------- .../src/schema/emqx_resource_schema.erl | 6 + .../test/emqx_connector_demo.erl | 6 + .../test/emqx_resource_SUITE.erl | 227 ++++++++------ .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 26 +- 10 files changed, 331 insertions(+), 280 deletions(-) diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index 852a667c8..a300291d1 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -82,7 +82,7 @@ authorize( } = Config ) -> Request = generate_request(PubSub, Topic, Client, Config), - case emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of + try emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of {ok, 204, _Headers} -> {matched, allow}; {ok, 200, Headers, Body} -> @@ -112,6 +112,16 @@ authorize( reason => Reason }), ignore + catch + error:timeout -> + Reason = timeout, + ?tp(authz_http_request_failure, #{error => Reason}), + ?SLOG(error, #{ + msg => "http_server_query_failed", + resource => ResourceID, + reason => Reason + }), + ignore end. log_nomtach_msg(Status, Headers, Body) -> diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index e91da9829..e5a72f680 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -172,7 +172,7 @@ t_response_handling(_Config) -> [ #{ ?snk_kind := authz_http_request_failure, - error := {recoverable_error, econnrefused} + error := timeout } ], ?of_kind(authz_http_request_failure, Trace) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index c86087014..38fe0a144 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -170,8 +170,11 @@ send_message(BridgeId, Message) -> case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of not_found -> {error, {bridge_not_found, BridgeId}}; - #{enable := true} -> - emqx_resource:query(ResId, {send_message, Message}); + #{enable := true} = Config -> + Timeout = emqx_map_lib:deep_get( + [resource_opts, request_timeout], Config, timer:seconds(15) + ), + emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout}); #{enable := false} -> {error, {bridge_stopped, BridgeId}} end. diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1f03863ae..d20d3bc10 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -145,10 +145,12 @@ set_special_configs(_) -> init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + ok = snabbkaffe:start_trace(), Config. end_per_testcase(_, _Config) -> clear_resources(), emqx_common_test_helpers:call_janitor(), + snabbkaffe:stop(), ok. clear_resources() -> @@ -478,8 +480,6 @@ t_egress_custom_clientid_prefix(_Config) -> end, {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - ok. t_mqtt_conn_bridge_ingress_and_egress(_) -> @@ -830,6 +830,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"resource_opts">> => #{ <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, + <<"request_timeout">> => <<"500ms">>, %% to make it check the healthy quickly <<"health_check_interval">> => <<"0.5s">> } @@ -880,17 +881,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> ok = emqx_listeners:stop_listener('tcp:default'), ct:sleep(1500), - %% PUBLISH 2 messages to the 'local' broker, the message should - ok = snabbkaffe:start_trace(), + %% PUBLISH 2 messages to the 'local' broker, the messages should + %% be enqueued and the resource will block {ok, SRef} = snabbkaffe:subscribe( fun - ( - #{ - ?snk_kind := call_query_enter, - query := {query, _From, {send_message, #{}}, _Sent} - } - ) -> + (#{?snk_kind := resource_worker_retry_inflight_failed}) -> + true; + (#{?snk_kind := resource_worker_flush_nack}) -> true; (_) -> false @@ -903,7 +901,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> emqx:publish(emqx_message:make(LocalTopic, Payload1)), emqx:publish(emqx_message:make(LocalTopic, Payload2)), {ok, _} = snabbkaffe:receive_events(SRef), - ok = snabbkaffe:stop(), %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 0b6cbd0a2..de76967ab 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } + request_timeout { + desc { + en: """Timeout for requests. If query_mode is sync, calls to the resource will be blocked for this amount of time before timing out.""" + zh: """请求的超时。 如果query_modesync,对资源的调用将在超时前被阻断这一时间。""" + } + label { + en: """Request timeout""" + zh: """请求超时""" + } + } + enable_batch { desc { en: """Batch mode enabled.""" diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index b1a34355b..2ef1cbed4 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -100,7 +100,7 @@ start_link(Id, Index, Opts) -> -spec sync_query(id(), request(), query_opts()) -> Result :: term(). sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), - Timeout = maps:get(timeout, Opts, infinity), + Timeout = maps:get(timeout, Opts, timer:seconds(15)), emqx_resource_metrics:matched_inc(Id), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). @@ -234,10 +234,7 @@ blocked(cast, flush, Data) -> blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) -> - #{id := Id} = Data0, - {Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), - Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = batch_reply_caller(Id, Error, Queries), + {_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> keep_state_and_data; @@ -337,10 +334,16 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> } = Data0, ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{}, - %% if we are retrying an inflight query, it has been sent - HasBeenSent = true, Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), - case handle_query_result_pure(Id, Result, HasBeenSent) of + ReplyResult = + case QueryOrBatch of + ?QUERY(From, CoreReq, HasBeenSent) -> + Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), + reply_caller_defer_metrics(Id, Reply); + [?QUERY(_, _, _) | _] = Batch -> + batch_reply_caller_defer_metrics(Id, Result, Batch) + end, + case ReplyResult of %% Send failed because resource is down {nack, PostFn} -> PostFn(), @@ -476,27 +479,20 @@ do_flush( Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), case reply_caller(Id, Reply) of %% Failed; remove the request from the queue, as we cannot pop - %% from it again. But we must ensure it's in the inflight - %% table, even if it's full, so we don't lose the request. - %% And only in that case. + %% from it again, but we'll retry it using the inflight table. nack -> ok = replayq:ack(Q1, QAckRef), - %% We might get a retriable response without having added - %% the request to the inflight table (e.g.: sync request, - %% but resource health check failed prior to calling and - %% so we didn't even call it). In that case, we must then - %% add it to the inflight table. - IsRetriable = - is_recoverable_error_result(Result) orelse - is_not_connected_result(Result), - ShouldPreserveInInflight = is_not_connected_result(Result), %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates + IsRetriable = true, WorkerMRef0 = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0), - ShouldPreserveInInflight andalso - inflight_append(InflightTID, InflightItem, Id, Index), - IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), + %% we must append again to the table to ensure that the + %% request will be retried (i.e., it might not have been + %% inserted during `call_query' if the resource was down + %% and/or if it was a sync request). + inflight_append(InflightTID, InflightItem, Id, Index), + mark_inflight_as_retriable(InflightTID, Ref), {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -513,11 +509,21 @@ do_flush( %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), + %% Async requests are acked later when the async worker + %% calls the corresponding callback function. Also, we + %% must ensure the async worker is being monitored for + %% such requests. is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - ?tp(resource_worker_flush_ack, #{batch_or_query => Request}), + ?tp( + resource_worker_flush_ack, + #{ + batch_or_query => Request, + result => Result + } + ), case queue_count(Q1) > 0 of true -> {keep_state, Data1, [{next_event, internal, flush}]}; @@ -542,27 +548,20 @@ do_flush(Data0, #{ Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), case batch_reply_caller(Id, Result, Batch) of %% Failed; remove the request from the queue, as we cannot pop - %% from it again. But we must ensure it's in the inflight - %% table, even if it's full, so we don't lose the request. - %% And only in that case. + %% from it again, but we'll retry it using the inflight table. nack -> ok = replayq:ack(Q1, QAckRef), - %% We might get a retriable response without having added - %% the request to the inflight table (e.g.: sync request, - %% but resource health check failed prior to calling and - %% so we didn't even call it). In that case, we must then - %% add it to the inflight table. - IsRetriable = - is_recoverable_error_result(Result) orelse - is_not_connected_result(Result), - ShouldPreserveInInflight = is_not_connected_result(Result), %% we set it atomically just below; a limitation of having %% to use tuples for atomic ets updates + IsRetriable = true, WorkerMRef0 = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0), - ShouldPreserveInInflight andalso - inflight_append(InflightTID, InflightItem, Id, Index), - IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), + %% we must append again to the table to ensure that the + %% request will be retried (i.e., it might not have been + %% inserted during `call_query' if the resource was down + %% and/or if it was a sync request). + inflight_append(InflightTID, InflightItem, Id, Index), + mark_inflight_as_retriable(InflightTID, Ref), {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -579,11 +578,21 @@ do_flush(Data0, #{ %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), + %% Async requests are acked later when the async worker + %% calls the corresponding callback function. Also, we + %% must ensure the async worker is being monitored for + %% such requests. is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), - ?tp(resource_worker_flush_ack, #{batch_or_query => Batch}), + ?tp( + resource_worker_flush_ack, + #{ + batch_or_query => Batch, + result => Result + } + ), CurrentCount = queue_count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> @@ -597,54 +606,79 @@ do_flush(Data0, #{ end. batch_reply_caller(Id, BatchResult, Batch) -> - {ShouldBlock, PostFns} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch), - lists:foreach(fun(F) -> F() end, PostFns), - ShouldBlock. - -batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> - lists:foldl( - fun(Reply, {_ShouldBlock, PostFns}) -> - {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply), - {ShouldBlock, [PostFn | PostFns]} - end, - {ack, []}, - %% the `Mod:on_batch_query/3` returns a single result for a batch, - %% so we need to expand - ?EXPAND(BatchResult, Batch) - ). - -reply_caller(Id, Reply) -> - {ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply), + {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch), PostFn(), ShouldBlock. +batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> + {ShouldAck, PostFns} = + lists:foldl( + fun(Reply, {_ShouldAck, PostFns}) -> + {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply), + {ShouldAck, [PostFn | PostFns]} + end, + {ack, []}, + %% the `Mod:on_batch_query/3` returns a single result for a batch, + %% so we need to expand + ?EXPAND(BatchResult, Batch) + ), + PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, + {ShouldAck, PostFn}. + +reply_caller(Id, Reply) -> + {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply), + PostFn(), + ShouldAck. + +%% Should only reply to the caller when the decision is final (not +%% retriable). See comment on `handle_query_result_pure'. reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) -> handle_query_result_pure(Id, Result, HasBeenSent); reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when is_function(ReplyFun) -> - _ = - case Result of - {async_return, _} -> no_reply_for_now; - _ -> apply(ReplyFun, Args ++ [Result]) - end, - handle_query_result_pure(Id, Result, HasBeenSent); + {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), + case {ShouldAck, Result} of + {nack, _} -> + ok; + {ack, {async_return, _}} -> + ok; + {ack, _} -> + apply(ReplyFun, Args ++ [Result]), + ok + end, + {ShouldAck, PostFn}; reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) -> - gen_statem:reply(From, Result), - handle_query_result_pure(Id, Result, HasBeenSent). + {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), + case {ShouldAck, Result} of + {nack, _} -> + ok; + {ack, {async_return, _}} -> + ok; + {ack, _} -> + gen_statem:reply(From, Result), + ok + end, + {ShouldAck, PostFn}. handle_query_result(Id, Result, HasBeenSent) -> {ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), PostFn(), ShouldBlock. -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent) -> +%% We should always retry (nack), except when: +%% * resource is not found +%% * resource is stopped +%% * the result is a success (or at least a delayed result) +%% We also retry even sync requests. In that case, we shouldn't reply +%% the caller until one of those final results above happen. +handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{msg => resource_exception, info => Msg}), - inc_sent_failed(Id, HasBeenSent), + %% inc_sent_failed(Id, HasBeenSent), ok end, - {ack, PostFn}; + {nack, PostFn}; handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when NotWorking == not_connected; NotWorking == blocked -> @@ -666,10 +700,12 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - emqx_resource_metrics:dropped_other_inc(Id), + %% emqx_resource_metrics:dropped_other_inc(Id), ok end, - {ack, PostFn}; + {nack, PostFn}; +%% TODO: invert this logic: we should differentiate errors that are +%% irrecoverable; all others are deemed recoverable. handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not @@ -679,22 +715,18 @@ handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ok end, {nack, PostFn}; -handle_query_result_pure(Id, {error, Reason}, HasBeenSent) -> +handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), ok end, - {ack, PostFn}; -handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent) -> - {nack, fun() -> ok end}; -handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) -> + {nack, PostFn}; +handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), - inc_sent_failed(Id, HasBeenSent), ok end, - {ack, PostFn}; + {nack, PostFn}; handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) -> {ack, fun() -> ok end}; handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) -> @@ -714,18 +746,6 @@ handle_async_worker_down(Data0, Pid) -> cancel_inflight_items(Data, WorkerMRef), {keep_state, Data}. -is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when - Error =:= not_connected; Error =:= blocked --> - true; -is_not_connected_result(_) -> - false. - -is_recoverable_error_result({error, {recoverable_error, _Reason}}) -> - true; -is_recoverable_error_result(_) -> - false. - call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of @@ -735,8 +755,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> configured -> maps:get(query_mode, Data); _ -> QM0 end, - CM = maps:get(callback_mode, Data), - apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Ref, Query, ResSt, QueryOpts); + CBM = maps:get(callback_mode, Data), + CallMode = call_mode(QM, CBM), + apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> @@ -763,20 +784,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> end ). -apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> - ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}), - InflightTID = maps:get(inflight_tid, QueryOpts, undefined), - ?APPLY_RESOURCE( - call_query, - begin - IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), - Mod:on_query(Id, Request, ResSt) - end, - Request - ); +apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) -> + ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), + ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -796,23 +806,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt end, Request ); -apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), - InflightTID = maps:get(inflight_tid, QueryOpts, undefined), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ?APPLY_RESOURCE( - call_batch_query, - begin - IsRetriable = false, - WorkerMRef = undefined, - InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), - ok = inflight_append(InflightTID, InflightItem, Id, Index), - Mod:on_batch_query(Id, Requests, ResSt) - end, - Batch - ); + ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async @@ -839,27 +838,27 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)), - %% Should always ack async inflight requests that - %% returned, otherwise the request will get retried. The - %% caller has just been notified of the failure and should - %% decide if it wants to retry or not. - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso PostFn(), case Action of nack -> + %% Keep retrying. ?tp(resource_worker_reply_after_query, #{ - action => nack, + action => Action, batch_or_query => ?QUERY(From, Request, HasBeenSent), + ref => Ref, result => Result }), + mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> ?tp(resource_worker_reply_after_query, #{ - action => ack, + action => Action, batch_or_query => ?QUERY(From, Request, HasBeenSent), + ref => Ref, result => Result }), + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso PostFn(), IsFullBefore andalso ?MODULE:flush_worker(Pid), ok end. @@ -868,24 +867,28 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - {Action, PostFns} = batch_reply_caller_defer_metrics(Id, Result, Batch), - %% Should always ack async inflight requests that - %% returned, otherwise the request will get retried. The - %% caller has just been notified of the failure and should - %% decide if it wants to retry or not. - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), + {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch), case Action of nack -> + %% Keep retrying. ?tp(resource_worker_reply_after_query, #{ - action => nack, batch_or_query => Batch, result => Result + action => nack, + batch_or_query => Batch, + ref => Ref, + result => Result }), + mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> ?tp(resource_worker_reply_after_query, #{ - action => ack, batch_or_query => Batch, result => Result + action => ack, + batch_or_query => Batch, + ref => Ref, + result => Result }), + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsAcked andalso PostFn(), IsFullBefore andalso ?MODULE:flush_worker(Pid), ok end. @@ -919,7 +922,14 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> Q1 end, emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)), - ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}), + ?tp( + resource_worker_appended_to_queue, + #{ + id => Id, + items => Queries, + queue_count => queue_count(Q2) + } + ), Q2. %%============================================================================== @@ -1110,11 +1120,6 @@ do_cancel_inflight_item(Data, Ref) -> %%============================================================================== -inc_sent_failed(Id, _HasBeenSent = true) -> - emqx_resource_metrics:retried_failed_inc(Id); -inc_sent_failed(Id, _HasBeenSent) -> - emqx_resource_metrics:failed_inc(Id). - inc_sent_success(Id, _HasBeenSent = true) -> emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasBeenSent) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index d105b21ef..ea5ee97ca 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -48,6 +48,7 @@ fields("creation_opts") -> {health_check_interval, fun health_check_interval/1}, {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, + {request_timeout, fun request_timeout/1}, {async_inflight_window, fun async_inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, @@ -80,6 +81,11 @@ query_mode(default) -> async; query_mode(required) -> false; query_mode(_) -> undefined. +request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); +request_timeout(desc) -> ?DESC("request_timeout"); +request_timeout(default) -> <<"15s">>; +request_timeout(_) -> undefined. + enable_batch(type) -> boolean(); enable_batch(required) -> false; enable_batch(default) -> true; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 15d4a3b46..c2b0c5733 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -259,6 +259,9 @@ counter_loop( apply_reply(ReplyFun, ok), ?tp(connector_demo_inc_counter_async, #{n => N}), State#{counter => Num + N}; + {big_payload, _Payload, ReplyFun} when Status == blocked -> + apply_reply(ReplyFun, {error, blocked}), + State; {{FromPid, ReqRef}, {inc, N}} when Status == running -> %ct:pal("sync counter recv: ~p", [{inc, N}]), FromPid ! {ReqRef, ok}, @@ -269,6 +272,9 @@ counter_loop( {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked -> FromPid ! {ReqRef, incorrect_status}, State#{incorrect_status_count := IncorrectCount + 1}; + {{FromPid, ReqRef}, {big_payload, _Payload}} when Status == running -> + FromPid ! {ReqRef, ok}, + State; {get, ReplyFun} -> apply_reply(ReplyFun, Num), State; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index da140489e..f71dc4bb9 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -411,35 +411,18 @@ t_query_counter_async_inflight(_) -> %% send async query to make the inflight window full ?check_trace( - begin - {ok, SRef} = snabbkaffe:subscribe( - ?match_event( - #{ - ?snk_kind := resource_worker_appended_to_inflight, - is_new := true - } - ), - WindowSize, - _Timeout = 5_000 + {_, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel(WindowSize, ReqOpts), + #{?snk_kind := resource_worker_flush_but_inflight_full}, + 1_000 ), - inc_counter_in_parallel(WindowSize, ReqOpts), - {ok, _} = snabbkaffe:receive_events(SRef), - ok - end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), tap_metrics(?LINE), - - %% this will block the resource_worker as the inflight window is full now - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:query(?ID, {inc_counter, 199}), - #{?snk_kind := resource_worker_flush_but_inflight_full}, - 1_000 - ), ?assertMatch(0, ets:info(Tab0, size)), tap_metrics(?LINE), @@ -464,9 +447,9 @@ t_query_counter_async_inflight(_) -> %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - %% +2 because the tmp_query above will be retried and succeed - %% this time, and there was the inc 199 request as well. - WindowSize + 2, + %% +1 because the tmp_query above will be retried and succeed + %% this time. + WindowSize + 1, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -504,8 +487,12 @@ t_query_counter_async_inflight(_) -> ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), %% again, send async query to make the inflight window full ?check_trace( - ?TRACE_OPTS, - inc_counter_in_parallel(WindowSize, ReqOpts), + {_, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel(WindowSize, ReqOpts), + #{?snk_kind := resource_worker_flush_but_inflight_full}, + 1_000 + ), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) @@ -584,7 +571,7 @@ t_query_counter_async_inflight_batch(_) -> end, ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, BatchSize = 2, - WindowSize = 3, + WindowSize = 15, {ok, _} = emqx_resource:create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, @@ -606,16 +593,12 @@ t_query_counter_async_inflight_batch(_) -> %% send async query to make the inflight window full NumMsgs = BatchSize * WindowSize, ?check_trace( - begin - {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := call_batch_query_async}), - WindowSize, - _Timeout = 60_000 + {_, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel(NumMsgs, ReqOpts), + #{?snk_kind := resource_worker_flush_but_inflight_full}, + 5_000 ), - inc_counter_in_parallel(NumMsgs, ReqOpts), - {ok, _} = snabbkaffe:receive_events(SRef), - ok - end, fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( @@ -674,7 +657,7 @@ t_query_counter_async_inflight_batch(_) -> %% +1 because the tmp_query above will be retried and succeed %% this time. WindowSize + 1, - _Timeout = 60_000 + 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), tap_metrics(?LINE), @@ -695,7 +678,7 @@ t_query_counter_async_inflight_batch(_) -> {ok, SRef} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), NumBatches1, - _Timeout = 60_000 + 10_000 ), inc_counter_in_parallel(NumMsgs1, ReqOpts), {ok, _} = snabbkaffe:receive_events(SRef), @@ -720,8 +703,12 @@ t_query_counter_async_inflight_batch(_) -> ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), %% again, send async query to make the inflight window full ?check_trace( - ?TRACE_OPTS, - inc_counter_in_parallel(WindowSize, ReqOpts), + {_, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel(NumMsgs, ReqOpts), + #{?snk_kind := resource_worker_flush_but_inflight_full}, + 5_000 + ), fun(Trace) -> QueryTrace = ?of_kind(call_batch_query_async, Trace), ?assertMatch( @@ -734,11 +721,11 @@ t_query_counter_async_inflight_batch(_) -> %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 1}), - Sent = NumMsgs + NumMsgs1 + WindowSize, + Sent = NumMsgs + NumMsgs1 + NumMsgs, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), WindowSize, - _Timeout = 60_000 + 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), {ok, _} = snabbkaffe:receive_events(SRef1), @@ -785,10 +772,8 @@ t_healthy_timeout(_) -> %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. #{health_check_interval => 200} ), - ?assertMatch( - ?RESOURCE_ERROR(not_connected), - emqx_resource:query(?ID, get_state) - ), + ?assertError(timeout, emqx_resource:query(?ID, get_state, #{timeout => 1_000})), + ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)), ok = emqx_resource:remove_local(?ID). t_healthy(_) -> @@ -1131,6 +1116,7 @@ t_retry_batch(_Config) -> ok. t_delete_and_re_create_with_same_name(_Config) -> + NumBufferWorkers = 2, {ok, _} = emqx_resource:create( ?ID, ?DEFAULT_RESOURCE_GROUP, @@ -1139,7 +1125,7 @@ t_delete_and_re_create_with_same_name(_Config) -> #{ query_mode => sync, batch_size => 1, - worker_pool_size => 2, + worker_pool_size => NumBufferWorkers, queue_seg_bytes => 100, resume_interval => 1_000 } @@ -1154,19 +1140,21 @@ t_delete_and_re_create_with_same_name(_Config) -> ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), NumRequests = 10, {ok, SRef} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_appended_to_queue}), - NumRequests, + ?match_event(#{?snk_kind := resource_worker_enter_blocked}), + NumBufferWorkers, _Timeout = 5_000 ), %% ensure replayq offloads to disk Payload = binary:copy(<<"a">>, 119), lists:foreach( fun(N) -> - {error, _} = - emqx_resource:query( - ?ID, - {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>} - ) + spawn_link(fun() -> + {error, _} = + emqx_resource:query( + ?ID, + {big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>} + ) + end) end, lists:seq(1, NumRequests) ), @@ -1177,10 +1165,11 @@ t_delete_and_re_create_with_same_name(_Config) -> tap_metrics(?LINE), Queuing1 = emqx_resource_metrics:queuing_get(?ID), Inflight1 = emqx_resource_metrics:inflight_get(?ID), - ?assertEqual(NumRequests - 1, Queuing1), - ?assertEqual(1, Inflight1), + ?assert(Queuing1 > 0), + ?assertEqual(2, Inflight1), %% now, we delete the resource + process_flag(trap_exit, true), ok = emqx_resource:remove_local(?ID), ?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)), @@ -1275,9 +1264,13 @@ t_retry_sync_inflight(_Config) -> %% now really make the resource go into `blocked' state. %% this results in a retriable error when sync. ok = emqx_resource:simple_sync_query(?ID, block), - {{error, {recoverable_error, incorrect_status}}, {ok, _}} = + TestPid = self(), + {_, {ok, _}} = ?wait_async_action( - emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + spawn_link(fun() -> + Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + TestPid ! {res, Res} + end), #{?snk_kind := resource_worker_retry_inflight_failed}, ResumeInterval * 2 ), @@ -1287,9 +1280,15 @@ t_retry_sync_inflight(_Config) -> #{?snk_kind := resource_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), + receive + {res, Res} -> + ?assertEqual(ok, Res) + after 5_000 -> + ct:fail("no response") + end, ok end, - [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1] + [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1] ), ok. @@ -1312,12 +1311,17 @@ t_retry_sync_inflight_batch(_Config) -> QueryOpts = #{}, ?check_trace( begin - %% now really make the resource go into `blocked' state. - %% this results in a retriable error when sync. + %% make the resource go into `blocked' state. this + %% results in a retriable error when sync. ok = emqx_resource:simple_sync_query(?ID, block), - {{error, {recoverable_error, incorrect_status}}, {ok, _}} = + process_flag(trap_exit, true), + TestPid = self(), + {_, {ok, _}} = ?wait_async_action( - emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + spawn_link(fun() -> + Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + TestPid ! {res, Res} + end), #{?snk_kind := resource_worker_retry_inflight_failed}, ResumeInterval * 2 ), @@ -1327,13 +1331,19 @@ t_retry_sync_inflight_batch(_Config) -> #{?snk_kind := resource_worker_retry_inflight_succeeded}, ResumeInterval * 3 ), + receive + {res, Res} -> + ?assertEqual(ok, Res) + after 5_000 -> + ct:fail("no response") + end, ok end, - [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1] + [fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1] ), ok. -t_dont_retry_async_inflight(_Config) -> +t_retry_async_inflight(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( @@ -1351,33 +1361,31 @@ t_dont_retry_async_inflight(_Config) -> QueryOpts = #{}, ?check_trace( begin - %% block, - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:query(?ID, block_now), - #{?snk_kind := resource_worker_enter_blocked}, - ResumeInterval * 2 - ), + %% block + ok = emqx_resource:simple_sync_query(?ID, block), - %% then send an async request; that shouldn't be retriable. + %% then send an async request; that should be retriable. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), - #{?snk_kind := resource_worker_flush_ack}, + #{?snk_kind := resource_worker_retry_inflight_failed}, ResumeInterval * 2 ), - %% will re-enter running because the single request is not retriable - {ok, _} = ?block_until( - #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2 - ), + %% will reply with success after the resource is healed + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_enter_running}, + ResumeInterval * 2 + ), ok end, - [fun ?MODULE:assert_no_retry_inflight/1] + [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1] ), ok. -t_dont_retry_async_inflight_batch(_Config) -> +t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( @@ -1396,29 +1404,27 @@ t_dont_retry_async_inflight_batch(_Config) -> QueryOpts = #{}, ?check_trace( begin - %% block, - {ok, {ok, _}} = - ?wait_async_action( - emqx_resource:query(?ID, block_now), - #{?snk_kind := resource_worker_enter_blocked}, - ResumeInterval * 2 - ), + %% block + ok = emqx_resource:simple_sync_query(?ID, block), - %% then send an async request; that shouldn't be retriable. + %% then send an async request; that should be retriable. {ok, {ok, _}} = ?wait_async_action( emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), - #{?snk_kind := resource_worker_flush_ack}, + #{?snk_kind := resource_worker_retry_inflight_failed}, ResumeInterval * 2 ), - %% will re-enter running because the single request is not retriable - {ok, _} = ?block_until( - #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2 - ), + %% will reply with success after the resource is healed + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_enter_running}, + ResumeInterval * 2 + ), ok end, - [fun ?MODULE:assert_no_retry_inflight/1] + [fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1] ), ok. @@ -1529,7 +1535,8 @@ inc_counter_in_parallel(N, Opts0) -> ct:fail({wait_for_query_timeout, Pid}) end || Pid <- Pids - ]. + ], + ok. inc_counter_in_parallel_increasing(N, StartN, Opts0) -> Parent = self(), @@ -1566,12 +1573,8 @@ tap_metrics(Line) -> ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), #{counters => C, gauges => G}. -assert_no_retry_inflight(Trace) -> - ?assertEqual([], ?of_kind(resource_worker_retry_inflight_failed, Trace)), - ?assertEqual([], ?of_kind(resource_worker_retry_inflight_succeeded, Trace)), - ok. - -assert_retry_fail_then_succeed_inflight(Trace) -> +assert_sync_retry_fail_then_succeed_inflight(Trace) -> + ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := resource_worker_flush_nack, ref := _Ref}, @@ -1589,3 +1592,23 @@ assert_retry_fail_then_succeed_inflight(Trace) -> ) ), ok. + +assert_async_retry_fail_then_succeed_inflight(Trace) -> + ct:pal(" ~p", [Trace]), + ?assert( + ?strict_causality( + #{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref}, + #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + Trace + ) + ), + %% not strict causality because it might retry more than once + %% before restoring the resource health. + ?assert( + ?causality( + #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + Trace + ) + ), + ok. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 4c20705b6..22a5dc859 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -108,6 +108,7 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config0) when TestCase =:= t_publish_success_batch -> + ct:timetrap({seconds, 30}), case ?config(batch_size, Config0) of 1 -> [{skip_due_to_no_batching, true}]; @@ -120,6 +121,7 @@ init_per_testcase(TestCase, Config0) when [{telemetry_table, Tid} | Config] end; init_per_testcase(TestCase, Config0) -> + ct:timetrap({seconds, 30}), {ok, _} = start_echo_http_server(), delete_all_bridges(), Tid = install_telemetry_handler(TestCase), @@ -283,6 +285,7 @@ gcp_pubsub_config(Config) -> " pool_size = 1\n" " pipelining = ~b\n" " resource_opts = {\n" + " request_timeout = 500ms\n" " worker_pool_size = 1\n" " query_mode = ~s\n" " batch_size = ~b\n" @@ -1266,7 +1269,6 @@ t_failure_no_body(Config) -> t_unrecoverable_error(Config) -> ResourceId = ?config(resource_id, Config), - TelemetryTable = ?config(telemetry_table, Config), QueryMode = ?config(query_mode, Config), TestPid = self(), FailureNoBodyHandler = @@ -1328,26 +1330,14 @@ t_unrecoverable_error(Config) -> ok end ), - wait_telemetry_event(TelemetryTable, failed, ResourceId), - ExpectedInflightEvents = - case QueryMode of - sync -> 1; - async -> 3 - end, - wait_telemetry_event( - TelemetryTable, - inflight, - ResourceId, - #{n_events => ExpectedInflightEvents, timeout => 5_000} - ), - %% even waiting, hard to avoid flakiness... simpler to just sleep - %% a bit until stabilization. - ct:sleep(200), + + wait_until_gauge_is(queuing, 0, _Timeout = 400), + wait_until_gauge_is(inflight, 1, _Timeout = 400), assert_metrics( #{ dropped => 0, - failed => 1, - inflight => 0, + failed => 0, + inflight => 1, matched => 1, queuing => 0, retried => 0,