From 731ac6567a7d7be8f714b9a3fcb23bc8fc3b2b0b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 16 Jan 2023 11:50:00 -0300 Subject: [PATCH] fix(buffer_worker): don't retry all kinds of inflight requests Some requests should not be retried during the blocked state. For example, if some async requests are just taking some time to process, we should avoid retrying them periodically, lest risk overloading the downstream further. --- .../src/emqx_resource_worker.erl | 209 ++++++++++++++---- .../test/emqx_connector_demo.erl | 49 +++- .../test/emqx_resource_SUITE.erl | 192 ++++++++++++++++ 3 files changed, 399 insertions(+), 51 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index d681c82c3..0cdd02f35 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -23,6 +23,7 @@ -include("emqx_resource_utils.hrl"). -include("emqx_resource_errors.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(gen_statem). @@ -65,6 +66,10 @@ ?REPLY(FROM, REQUEST, SENT, RESULT) || ?QUERY(FROM, REQUEST, SENT) <- BATCH ]). +-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerPid), + {Ref, BatchOrQuery, IsRetriable, WorkerPid} +). +-define(RETRY_IDX, 3). -type id() :: binary(). -type index() :: pos_integer(). @@ -282,7 +287,7 @@ pick_cast(Id, Key, Query) -> resume_from_blocked(Data) -> #{inflight_tid := InflightTID} = Data, - case inflight_get_first(InflightTID) of + case inflight_get_first_retriable(InflightTID) of empty -> {next_state, running, Data}; {Ref, FirstQuery} -> @@ -298,6 +303,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> index := Index, resume_interval := ResumeT } = Data0, + ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), QueryOpts = #{}, %% if we are retrying an inflight query, it has been sent HasBeenSent = true, @@ -306,7 +312,13 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% Send failed because resource is down {nack, PostFn} -> PostFn(), - ?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}), + ?tp( + resource_worker_retry_inflight_failed, + #{ + ref => Ref, + query_or_batch => QueryOrBatch + } + ), {keep_state, Data0, {state_timeout, ResumeT, unblock}}; %% Send ok or failed but the resource is working {ack, PostFn} -> @@ -318,7 +330,13 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> %% requests (repeated and original) have the safe `Ref', %% we bump the counter when removing it from the table. IsAcked andalso PostFn(), - ?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), + ?tp( + resource_worker_retry_inflight_succeeded, + #{ + ref => Ref, + query_or_batch => QueryOrBatch + } + ), resume_from_blocked(Data0) end. @@ -431,17 +449,37 @@ do_flush( %% And only in that case. nack -> ok = replayq:ack(Q1, QAckRef), - ShouldPreserveInInflight = - is_inflight_full_result(Result) orelse + %% We might get a retriable response without having added + %% the request to the inflight table (e.g.: sync request, + %% but resource health check failed prior to calling and + %% so we didn't even call it). In that case, we must then + %% add it to the inflight table. + IsRetriable = + is_recoverable_error_result(Result) orelse is_not_connected_result(Result), - ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Request, Id, Index), + ShouldPreserveInInflight = is_not_connected_result(Result), + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerPid), + ShouldPreserveInInflight andalso + inflight_append(InflightTID, InflightItem, Id, Index), + IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp( + resource_worker_flush_nack, + #{ + ref => Ref, + is_retriable => IsRetriable, + batch_or_query => Request, + result => Result + } + ), {next_state, blocked, Data0}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp(resource_worker_flush_ack, #{batch_or_query => Request}), case queue_count(Q1) > 0 of true -> {keep_state, Data0, [{next_event, internal, flush}]}; @@ -471,17 +509,37 @@ do_flush(Data0, #{ %% And only in that case. nack -> ok = replayq:ack(Q1, QAckRef), - ShouldPreserveInInflight = - is_inflight_full_result(Result) orelse + %% We might get a retriable response without having added + %% the request to the inflight table (e.g.: sync request, + %% but resource health check failed prior to calling and + %% so we didn't even call it). In that case, we must then + %% add it to the inflight table. + IsRetriable = + is_recoverable_error_result(Result) orelse is_not_connected_result(Result), - ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Batch, Id, Index), + ShouldPreserveInInflight = is_not_connected_result(Result), + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + ShouldPreserveInInflight andalso + inflight_append(InflightTID, InflightItem, Id, Index), + IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp( + resource_worker_flush_nack, + #{ + ref => Ref, + is_retriable => IsRetriable, + batch_or_query => Batch, + result => Result + } + ), {next_state, blocked, Data0}; %% Success; just ack. ack -> ok = replayq:ack(Q1, QAckRef), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + ?tp(resource_worker_flush_ack, #{batch_or_query => Batch}), CurrentCount = queue_count(Q1), case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> @@ -603,11 +661,6 @@ handle_query_result_pure(Id, Result, HasBeenSent) -> end, {ack, PostFn}. -is_inflight_full_result({async_return, inflight_full}) -> - true; -is_inflight_full_result(_) -> - false. - is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when Error =:= not_connected; Error =:= blocked -> @@ -615,6 +668,11 @@ is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when is_not_connected_result(_) -> false. +is_recoverable_error_result({error, {recoverable_error, _Reason}}) -> + true; +is_recoverable_error_result(_) -> + false. + call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of @@ -653,7 +711,7 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ). apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> - ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), + ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( @@ -664,13 +722,18 @@ apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, %% when resuming. {async_return, inflight_full}; false -> - ok = inflight_append(InflightTID, Ref, Query, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Mod:on_query(Id, Request, ResSt) end, Request ); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> - ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), + ?tp(call_query_async, #{ + id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async + }), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( @@ -681,14 +744,19 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt false -> ReplyFun = fun ?MODULE:reply_after_query/7, Args = [self(), Id, Index, InflightTID, Ref, Query], - ok = inflight_append(InflightTID, Ref, Query, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} end, Request ); apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> - ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), + ?tp(call_batch_query, #{ + id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync + }), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), Requests = [Request || ?QUERY(_From, Request, _) <- Batch], @@ -700,13 +768,18 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, %% when resuming. {async_return, inflight_full}; false -> - ok = inflight_append(InflightTID, Ref, Batch, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Mod:on_batch_query(Id, Requests, ResSt) end, Batch ); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> - ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), + ?tp(call_batch_query_async, #{ + id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async + }), InflightTID = maps:get(inflight_name, QueryOpts, undefined), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), ?APPLY_RESOURCE( @@ -718,7 +791,10 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ReplyFun = fun ?MODULE:batch_reply_after_query/7, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ok = inflight_append(InflightTID, Ref, Batch, Id, Index), + IsRetriable = false, + WorkerPid = undefined, + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + ok = inflight_append(InflightTID, InflightItem, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), {async_return, Result} end, @@ -738,8 +814,18 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee IsAcked andalso PostFn(), case Action of nack -> + ?tp(resource_worker_reply_after_query, #{ + action => nack, + batch_or_query => ?QUERY(From, Request, HasBeenSent), + result => Result + }), ?MODULE:block(Pid); ack -> + ?tp(resource_worker_reply_after_query, #{ + action => ack, + batch_or_query => ?QUERY(From, Request, HasBeenSent), + result => Result + }), ok end. @@ -756,8 +842,14 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), case Action of nack -> + ?tp(resource_worker_reply_after_query, #{ + action => nack, batch_or_query => Batch, result => Result + }), ?MODULE:block(Pid); ack -> + ?tp(resource_worker_reply_after_query, #{ + action => ack, batch_or_query => Batch, result => Result + }), ok end. @@ -802,24 +894,28 @@ inflight_new(InfltWinSZ, Id, Index) -> emqx_resource_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), - inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), + inflight_append(TableId, {?MAX_SIZE_REF, {max_size, InfltWinSZ}}, Id, Index), %% we use this counter because we might deal with batches as %% elements. - inflight_append(TableId, ?SIZE_REF, 0, Id, Index), + inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), TableId. -inflight_get_first(InflightTID) -> - case ets:next(InflightTID, ?MAX_SIZE_REF) of +-spec inflight_get_first_retriable(ets:tid()) -> + empty | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}. +inflight_get_first_retriable(InflightTID) -> + MatchSpec = + ets:fun2ms( + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerPid)) when + IsRetriable =:= true + -> + {Ref, BatchOrQuery} + end + ), + case ets:select(InflightTID, MatchSpec, _Limit = 1) of '$end_of_table' -> empty; - Ref -> - case ets:lookup(InflightTID, Ref) of - [Object] -> - Object; - [] -> - %% it might have been dropped - inflight_get_first(InflightTID) - end + {[{Ref, BatchOrQuery}], _Continuation} -> + {Ref, BatchOrQuery} end. is_inflight_full(undefined) -> @@ -844,37 +940,60 @@ inflight_num_msgs(InflightTID) -> [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), Size. -inflight_append(undefined, _Ref, _Query, _Id, _Index) -> +inflight_append(undefined, _InflightItem, _Id, _Index) -> ok; -inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> +inflight_append( + InflightTID, + ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerPid), + Id, + Index +) -> Batch = mark_as_sent(Batch0), - IsNew = ets:insert_new(InflightTID, {Ref, Batch}), + InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid), + IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{batch => Batch, is_new => IsNew}), + ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; -inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> +inflight_append( + InflightTID, + ?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerPid), + Id, + Index +) -> Query = mark_as_sent(Query0), - IsNew = ets:insert_new(InflightTID, {Ref, Query}), + InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid), + IsNew = ets:insert_new(InflightTID, InflightItem), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - ?tp(resource_worker_appended_to_inflight, #{query => Query, is_new => IsNew}), + ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; -inflight_append(InflightTID, Ref, Data, _Id, _Index) -> +inflight_append(InflightTID, {Ref, Data}, _Id, _Index) -> ets:insert(InflightTID, {Ref, Data}), %% this is a metadata row being inserted; therefore, we don't bump %% the inflight metric. ok. +%% a request was already appended and originally not retriable, but an +%% error occurred and it is now retriable. +mark_inflight_as_retriable(undefined, _Ref) -> + ok; +mark_inflight_as_retriable(InflightTID, Ref) -> + _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), + ok. + ack_inflight(undefined, _Ref, _Id, _Index) -> false; ack_inflight(InflightTID, Ref, Id, Index) -> Count = case ets:take(InflightTID, Ref) of - [{Ref, ?QUERY(_, _, _)}] -> 1; - [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); - _ -> 0 + [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerPid)] -> + 1; + [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerPid)] -> + length(Batch); + _ -> + 0 end, IsAcked = Count > 0, IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 6c03e93cc..bbd2ba058 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -85,6 +85,9 @@ on_query(_InstId, get_state_failed, State) -> on_query(_InstId, block, #{pid := Pid}) -> Pid ! block, ok; +on_query(_InstId, block_now, #{pid := Pid}) -> + Pid ! block, + {error, {resource_error, #{reason => blocked, msg => blocked}}}; on_query(_InstId, resume, #{pid := Pid}) -> Pid ! resume, ok; @@ -138,7 +141,13 @@ on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> ok; on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> Pid ! {get, ReplyFun}, - ok. + ok; +on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> + Pid ! {block_now, ReplyFun}, + {ok, Pid}; +on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> + Pid ! {big_payload, Payload, ReplyFun}, + {ok, Pid}. on_batch_query(InstId, BatchReq, State) -> %% Requests can be either 'get_counter' or 'inc_counter', but @@ -147,17 +156,22 @@ on_batch_query(InstId, BatchReq, State) -> {inc_counter, _} -> batch_inc_counter(sync, InstId, BatchReq, State); get_counter -> - batch_get_counter(sync, InstId, State) + batch_get_counter(sync, InstId, State); + {big_payload, _Payload} -> + batch_big_payload(sync, InstId, BatchReq, State) end. on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> - %% Requests can be either 'get_counter' or 'inc_counter', but - %% cannot be mixed. + %% Requests can be of multiple types, but cannot be mixed. case hd(BatchReq) of {inc_counter, _} -> batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State); get_counter -> - batch_get_counter({async, ReplyFunAndArgs}, InstId, State) + batch_get_counter({async, ReplyFunAndArgs}, InstId, State); + block_now -> + on_query_async(InstId, block_now, ReplyFunAndArgs, State); + {big_payload, _Payload} -> + batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State) end. batch_inc_counter(CallMode, InstId, BatchReq, State) -> @@ -184,6 +198,19 @@ batch_get_counter(sync, InstId, State) -> batch_get_counter({async, ReplyFunAndArgs}, InstId, State) -> on_query_async(InstId, get_counter, ReplyFunAndArgs, State). +batch_big_payload(sync, InstId, Batch, State) -> + [Res | _] = lists:map( + fun(Req = {big_payload, _}) -> on_query(InstId, Req, State) end, + Batch + ), + Res; +batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}) -> + lists:foreach( + fun(Req = {big_payload, _}) -> on_query_async(InstId, Req, ReplyFunAndArgs, State) end, + Batch + ), + {ok, Pid}. + on_get_status(_InstId, #{health_check_error := true}) -> disconnected; on_get_status(_InstId, #{pid := Pid}) -> @@ -199,7 +226,11 @@ spawn_counter_process(Name, Register) -> Pid. counter_loop() -> - counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}). + counter_loop(#{ + counter => 0, + status => running, + incorrect_status_count => 0 + }). counter_loop( #{ @@ -213,6 +244,12 @@ counter_loop( block -> ct:pal("counter recv: ~p", [block]), State#{status => blocked}; + {block_now, ReplyFun} -> + ct:pal("counter recv: ~p", [block_now]), + apply_reply( + ReplyFun, {error, {resource_error, #{reason => blocked, msg => blocked}}} + ), + State#{status => blocked}; resume -> {messages, Msgs} = erlang:process_info(self(), messages), ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index d837cbc8c..2d6856acf 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1254,6 +1254,174 @@ t_always_overflow(_Config) -> ), ok. +t_retry_sync_inflight(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 1, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% now really make the resource go into `blocked' state. + %% this results in a retriable error when sync. + ok = emqx_resource:simple_sync_query(?ID, block), + {{error, {recoverable_error, incorrect_status}}, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + #{?snk_kind := resource_worker_retry_inflight_failed}, + ResumeInterval * 2 + ), + {ok, {ok, _}} = + ?wait_async_action( + ok = emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_retry_inflight_succeeded}, + ResumeInterval * 3 + ), + ok + end, + [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1] + ), + ok. + +t_retry_sync_inflight_batch(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(always_sync), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => sync, + batch_size => 2, + batch_time => 200, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% now really make the resource go into `blocked' state. + %% this results in a retriable error when sync. + ok = emqx_resource:simple_sync_query(?ID, block), + {{error, {recoverable_error, incorrect_status}}, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts), + #{?snk_kind := resource_worker_retry_inflight_failed}, + ResumeInterval * 2 + ), + {ok, {ok, _}} = + ?wait_async_action( + ok = emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_retry_inflight_succeeded}, + ResumeInterval * 3 + ), + ok + end, + [fun ?MODULE:assert_retry_fail_then_succeed_inflight/1] + ), + ok. + +t_dont_retry_async_inflight(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 1, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% block, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, block_now), + #{?snk_kind := resource_worker_enter_blocked}, + ResumeInterval * 2 + ), + + %% then send an async request; that shouldn't be retriable. + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), + #{?snk_kind := resource_worker_flush_ack}, + ResumeInterval * 2 + ), + + %% will re-enter running because the single request is not retriable + {ok, _} = ?block_until( + #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2 + ), + ok + end, + [fun ?MODULE:assert_no_retry_inflight/1] + ), + ok. + +t_dont_retry_async_inflight_batch(_Config) -> + ResumeInterval = 1_000, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 2, + batch_time => 200, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + QueryOpts = #{}, + ?check_trace( + begin + %% block, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, block_now), + #{?snk_kind := resource_worker_enter_blocked}, + ResumeInterval * 2 + ), + + %% then send an async request; that shouldn't be retriable. + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts), + #{?snk_kind := resource_worker_flush_ack}, + ResumeInterval * 2 + ), + + %% will re-enter running because the single request is not retriable + {ok, _} = ?block_until( + #{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2 + ), + ok + end, + [fun ?MODULE:assert_no_retry_inflight/1] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -1317,3 +1485,27 @@ tap_metrics(Line) -> {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID), ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), #{counters => C, gauges => G}. + +assert_no_retry_inflight(Trace) -> + ?assertEqual([], ?of_kind(resource_worker_retry_inflight_failed, Trace)), + ?assertEqual([], ?of_kind(resource_worker_retry_inflight_succeeded, Trace)), + ok. + +assert_retry_fail_then_succeed_inflight(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := resource_worker_flush_nack, ref := _Ref}, + #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + Trace + ) + ), + %% not strict causality because it might retry more than once + %% before restoring the resource health. + ?assert( + ?causality( + #{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref}, + #{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref}, + Trace + ) + ), + ok.