fix(buffer_worker): don't retry all kinds of inflight requests

Some requests should not be retried during the blocked state.  For
example, if some async requests are just taking some time to process,
we should avoid retrying them periodically, lest risk overloading the
downstream further.
This commit is contained in:
Thales Macedo Garitezi 2023-01-16 11:50:00 -03:00
parent 5425f3d88e
commit 731ac6567a
3 changed files with 399 additions and 51 deletions

View File

@ -23,6 +23,7 @@
-include("emqx_resource_utils.hrl"). -include("emqx_resource_utils.hrl").
-include("emqx_resource_errors.hrl"). -include("emqx_resource_errors.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(gen_statem). -behaviour(gen_statem).
@ -65,6 +66,10 @@
?REPLY(FROM, REQUEST, SENT, RESULT) ?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT) <- BATCH || ?QUERY(FROM, REQUEST, SENT) <- BATCH
]). ]).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerPid),
{Ref, BatchOrQuery, IsRetriable, WorkerPid}
).
-define(RETRY_IDX, 3).
-type id() :: binary(). -type id() :: binary().
-type index() :: pos_integer(). -type index() :: pos_integer().
@ -282,7 +287,7 @@ pick_cast(Id, Key, Query) ->
resume_from_blocked(Data) -> resume_from_blocked(Data) ->
#{inflight_tid := InflightTID} = Data, #{inflight_tid := InflightTID} = Data,
case inflight_get_first(InflightTID) of case inflight_get_first_retriable(InflightTID) of
empty -> empty ->
{next_state, running, Data}; {next_state, running, Data};
{Ref, FirstQuery} -> {Ref, FirstQuery} ->
@ -298,6 +303,7 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
index := Index, index := Index,
resume_interval := ResumeT resume_interval := ResumeT
} = Data0, } = Data0,
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{}, QueryOpts = #{},
%% if we are retrying an inflight query, it has been sent %% if we are retrying an inflight query, it has been sent
HasBeenSent = true, HasBeenSent = true,
@ -306,7 +312,13 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
%% Send failed because resource is down %% Send failed because resource is down
{nack, PostFn} -> {nack, PostFn} ->
PostFn(), PostFn(),
?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}), ?tp(
resource_worker_retry_inflight_failed,
#{
ref => Ref,
query_or_batch => QueryOrBatch
}
),
{keep_state, Data0, {state_timeout, ResumeT, unblock}}; {keep_state, Data0, {state_timeout, ResumeT, unblock}};
%% Send ok or failed but the resource is working %% Send ok or failed but the resource is working
{ack, PostFn} -> {ack, PostFn} ->
@ -318,7 +330,13 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
%% requests (repeated and original) have the safe `Ref', %% requests (repeated and original) have the safe `Ref',
%% we bump the counter when removing it from the table. %% we bump the counter when removing it from the table.
IsAcked andalso PostFn(), IsAcked andalso PostFn(),
?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}), ?tp(
resource_worker_retry_inflight_succeeded,
#{
ref => Ref,
query_or_batch => QueryOrBatch
}
),
resume_from_blocked(Data0) resume_from_blocked(Data0)
end. end.
@ -431,17 +449,37 @@ do_flush(
%% And only in that case. %% And only in that case.
nack -> nack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
ShouldPreserveInInflight = %% We might get a retriable response without having added
is_inflight_full_result(Result) orelse %% the request to the inflight table (e.g.: sync request,
%% but resource health check failed prior to calling and
%% so we didn't even call it). In that case, we must then
%% add it to the inflight table.
IsRetriable =
is_recoverable_error_result(Result) orelse
is_not_connected_result(Result), is_not_connected_result(Result),
ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Request, Id, Index), ShouldPreserveInInflight = is_not_connected_result(Result),
WorkerPid = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerPid),
ShouldPreserveInInflight andalso
inflight_append(InflightTID, InflightItem, Id, Index),
IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(
resource_worker_flush_nack,
#{
ref => Ref,
is_retriable => IsRetriable,
batch_or_query => Request,
result => Result
}
),
{next_state, blocked, Data0}; {next_state, blocked, Data0};
%% Success; just ack. %% Success; just ack.
ack -> ack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(resource_worker_flush_ack, #{batch_or_query => Request}),
case queue_count(Q1) > 0 of case queue_count(Q1) > 0 of
true -> true ->
{keep_state, Data0, [{next_event, internal, flush}]}; {keep_state, Data0, [{next_event, internal, flush}]};
@ -471,17 +509,37 @@ do_flush(Data0, #{
%% And only in that case. %% And only in that case.
nack -> nack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
ShouldPreserveInInflight = %% We might get a retriable response without having added
is_inflight_full_result(Result) orelse %% the request to the inflight table (e.g.: sync request,
%% but resource health check failed prior to calling and
%% so we didn't even call it). In that case, we must then
%% add it to the inflight table.
IsRetriable =
is_recoverable_error_result(Result) orelse
is_not_connected_result(Result), is_not_connected_result(Result),
ShouldPreserveInInflight andalso inflight_append(InflightTID, Ref, Batch, Id, Index), ShouldPreserveInInflight = is_not_connected_result(Result),
WorkerPid = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid),
ShouldPreserveInInflight andalso
inflight_append(InflightTID, InflightItem, Id, Index),
IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(
resource_worker_flush_nack,
#{
ref => Ref,
is_retriable => IsRetriable,
batch_or_query => Batch,
result => Result
}
),
{next_state, blocked, Data0}; {next_state, blocked, Data0};
%% Success; just ack. %% Success; just ack.
ack -> ack ->
ok = replayq:ack(Q1, QAckRef), ok = replayq:ack(Q1, QAckRef),
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(resource_worker_flush_ack, #{batch_or_query => Batch}),
CurrentCount = queue_count(Q1), CurrentCount = queue_count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} -> {false, _} ->
@ -603,11 +661,6 @@ handle_query_result_pure(Id, Result, HasBeenSent) ->
end, end,
{ack, PostFn}. {ack, PostFn}.
is_inflight_full_result({async_return, inflight_full}) ->
true;
is_inflight_full_result(_) ->
false.
is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when
Error =:= not_connected; Error =:= blocked Error =:= not_connected; Error =:= blocked
-> ->
@ -615,6 +668,11 @@ is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when
is_not_connected_result(_) -> is_not_connected_result(_) ->
false. false.
is_recoverable_error_result({error, {recoverable_error, _Reason}}) ->
true;
is_recoverable_error_result(_) ->
false.
call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}), ?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of case emqx_resource_manager:ets_lookup(Id) of
@ -653,7 +711,7 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
). ).
apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined), InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
@ -664,13 +722,18 @@ apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt,
%% when resuming. %% when resuming.
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ok = inflight_append(InflightTID, Ref, Query, Id, Index), IsRetriable = false,
WorkerPid = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_query(Id, Request, ResSt) Mod:on_query(Id, Request, ResSt)
end, end,
Request Request
); );
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined), InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
@ -681,14 +744,19 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
false -> false ->
ReplyFun = fun ?MODULE:reply_after_query/7, ReplyFun = fun ?MODULE:reply_after_query/7,
Args = [self(), Id, Index, InflightTID, Ref, Query], Args = [self(), Id, Index, InflightTID, Ref, Query],
ok = inflight_append(InflightTID, Ref, Query, Id, Index), IsRetriable = false,
WorkerPid = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
{async_return, Result} {async_return, Result}
end, end,
Request Request
); );
apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined), InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
@ -700,13 +768,18 @@ apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt,
%% when resuming. %% when resuming.
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ok = inflight_append(InflightTID, Ref, Batch, Id, Index), IsRetriable = false,
WorkerPid = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_batch_query(Id, Requests, ResSt) Mod:on_batch_query(Id, Requests, ResSt)
end, end,
Batch Batch
); );
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
}),
InflightTID = maps:get(inflight_name, QueryOpts, undefined), InflightTID = maps:get(inflight_name, QueryOpts, undefined),
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true), PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
@ -718,7 +791,10 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
ReplyFun = fun ?MODULE:batch_reply_after_query/7, ReplyFun = fun ?MODULE:batch_reply_after_query/7,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
ok = inflight_append(InflightTID, Ref, Batch, Id, Index), IsRetriable = false,
WorkerPid = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
{async_return, Result} {async_return, Result}
end, end,
@ -738,8 +814,18 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
IsAcked andalso PostFn(), IsAcked andalso PostFn(),
case Action of case Action of
nack -> nack ->
?tp(resource_worker_reply_after_query, #{
action => nack,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
result => Result
}),
?MODULE:block(Pid); ?MODULE:block(Pid);
ack -> ack ->
?tp(resource_worker_reply_after_query, #{
action => ack,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
result => Result
}),
ok ok
end. end.
@ -756,8 +842,14 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns), IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
case Action of case Action of
nack -> nack ->
?tp(resource_worker_reply_after_query, #{
action => nack, batch_or_query => Batch, result => Result
}),
?MODULE:block(Pid); ?MODULE:block(Pid);
ack -> ack ->
?tp(resource_worker_reply_after_query, #{
action => ack, batch_or_query => Batch, result => Result
}),
ok ok
end. end.
@ -802,24 +894,28 @@ inflight_new(InfltWinSZ, Id, Index) ->
emqx_resource_worker_inflight_tab, emqx_resource_worker_inflight_tab,
[ordered_set, public, {write_concurrency, true}] [ordered_set, public, {write_concurrency, true}]
), ),
inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), inflight_append(TableId, {?MAX_SIZE_REF, {max_size, InfltWinSZ}}, Id, Index),
%% we use this counter because we might deal with batches as %% we use this counter because we might deal with batches as
%% elements. %% elements.
inflight_append(TableId, ?SIZE_REF, 0, Id, Index), inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
TableId. TableId.
inflight_get_first(InflightTID) -> -spec inflight_get_first_retriable(ets:tid()) ->
case ets:next(InflightTID, ?MAX_SIZE_REF) of empty | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}.
inflight_get_first_retriable(InflightTID) ->
MatchSpec =
ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerPid)) when
IsRetriable =:= true
->
{Ref, BatchOrQuery}
end
),
case ets:select(InflightTID, MatchSpec, _Limit = 1) of
'$end_of_table' -> '$end_of_table' ->
empty; empty;
Ref -> {[{Ref, BatchOrQuery}], _Continuation} ->
case ets:lookup(InflightTID, Ref) of {Ref, BatchOrQuery}
[Object] ->
Object;
[] ->
%% it might have been dropped
inflight_get_first(InflightTID)
end
end. end.
is_inflight_full(undefined) -> is_inflight_full(undefined) ->
@ -844,37 +940,60 @@ inflight_num_msgs(InflightTID) ->
[{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
Size. Size.
inflight_append(undefined, _Ref, _Query, _Id, _Index) -> inflight_append(undefined, _InflightItem, _Id, _Index) ->
ok; ok;
inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> inflight_append(
InflightTID,
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerPid),
Id,
Index
) ->
Batch = mark_as_sent(Batch0), Batch = mark_as_sent(Batch0),
IsNew = ets:insert_new(InflightTID, {Ref, Batch}), InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerPid),
IsNew = ets:insert_new(InflightTID, InflightItem),
BatchSize = length(Batch), BatchSize = length(Batch),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
?tp(resource_worker_appended_to_inflight, #{batch => Batch, is_new => IsNew}), ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok; ok;
inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> inflight_append(
InflightTID,
?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerPid),
Id,
Index
) ->
Query = mark_as_sent(Query0), Query = mark_as_sent(Query0),
IsNew = ets:insert_new(InflightTID, {Ref, Query}), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerPid),
IsNew = ets:insert_new(InflightTID, InflightItem),
IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
?tp(resource_worker_appended_to_inflight, #{query => Query, is_new => IsNew}), ?tp(resource_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}),
ok; ok;
inflight_append(InflightTID, Ref, Data, _Id, _Index) -> inflight_append(InflightTID, {Ref, Data}, _Id, _Index) ->
ets:insert(InflightTID, {Ref, Data}), ets:insert(InflightTID, {Ref, Data}),
%% this is a metadata row being inserted; therefore, we don't bump %% this is a metadata row being inserted; therefore, we don't bump
%% the inflight metric. %% the inflight metric.
ok. ok.
%% a request was already appended and originally not retriable, but an
%% error occurred and it is now retriable.
mark_inflight_as_retriable(undefined, _Ref) ->
ok;
mark_inflight_as_retriable(InflightTID, Ref) ->
_ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}),
ok.
ack_inflight(undefined, _Ref, _Id, _Index) -> ack_inflight(undefined, _Ref, _Id, _Index) ->
false; false;
ack_inflight(InflightTID, Ref, Id, Index) -> ack_inflight(InflightTID, Ref, Id, Index) ->
Count = Count =
case ets:take(InflightTID, Ref) of case ets:take(InflightTID, Ref) of
[{Ref, ?QUERY(_, _, _)}] -> 1; [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerPid)] ->
[{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); 1;
_ -> 0 [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerPid)] ->
length(Batch);
_ ->
0
end, end,
IsAcked = Count > 0, IsAcked = Count > 0,
IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),

View File

@ -85,6 +85,9 @@ on_query(_InstId, get_state_failed, State) ->
on_query(_InstId, block, #{pid := Pid}) -> on_query(_InstId, block, #{pid := Pid}) ->
Pid ! block, Pid ! block,
ok; ok;
on_query(_InstId, block_now, #{pid := Pid}) ->
Pid ! block,
{error, {resource_error, #{reason => blocked, msg => blocked}}};
on_query(_InstId, resume, #{pid := Pid}) -> on_query(_InstId, resume, #{pid := Pid}) ->
Pid ! resume, Pid ! resume,
ok; ok;
@ -138,7 +141,13 @@ on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) ->
ok; ok;
on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) ->
Pid ! {get, ReplyFun}, Pid ! {get, ReplyFun},
ok. ok;
on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) ->
Pid ! {block_now, ReplyFun},
{ok, Pid};
on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) ->
Pid ! {big_payload, Payload, ReplyFun},
{ok, Pid}.
on_batch_query(InstId, BatchReq, State) -> on_batch_query(InstId, BatchReq, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but %% Requests can be either 'get_counter' or 'inc_counter', but
@ -147,17 +156,22 @@ on_batch_query(InstId, BatchReq, State) ->
{inc_counter, _} -> {inc_counter, _} ->
batch_inc_counter(sync, InstId, BatchReq, State); batch_inc_counter(sync, InstId, BatchReq, State);
get_counter -> get_counter ->
batch_get_counter(sync, InstId, State) batch_get_counter(sync, InstId, State);
{big_payload, _Payload} ->
batch_big_payload(sync, InstId, BatchReq, State)
end. end.
on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) ->
%% Requests can be either 'get_counter' or 'inc_counter', but %% Requests can be of multiple types, but cannot be mixed.
%% cannot be mixed.
case hd(BatchReq) of case hd(BatchReq) of
{inc_counter, _} -> {inc_counter, _} ->
batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State); batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State);
get_counter -> get_counter ->
batch_get_counter({async, ReplyFunAndArgs}, InstId, State) batch_get_counter({async, ReplyFunAndArgs}, InstId, State);
block_now ->
on_query_async(InstId, block_now, ReplyFunAndArgs, State);
{big_payload, _Payload} ->
batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State)
end. end.
batch_inc_counter(CallMode, InstId, BatchReq, State) -> batch_inc_counter(CallMode, InstId, BatchReq, State) ->
@ -184,6 +198,19 @@ batch_get_counter(sync, InstId, State) ->
batch_get_counter({async, ReplyFunAndArgs}, InstId, State) -> batch_get_counter({async, ReplyFunAndArgs}, InstId, State) ->
on_query_async(InstId, get_counter, ReplyFunAndArgs, State). on_query_async(InstId, get_counter, ReplyFunAndArgs, State).
batch_big_payload(sync, InstId, Batch, State) ->
[Res | _] = lists:map(
fun(Req = {big_payload, _}) -> on_query(InstId, Req, State) end,
Batch
),
Res;
batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}) ->
lists:foreach(
fun(Req = {big_payload, _}) -> on_query_async(InstId, Req, ReplyFunAndArgs, State) end,
Batch
),
{ok, Pid}.
on_get_status(_InstId, #{health_check_error := true}) -> on_get_status(_InstId, #{health_check_error := true}) ->
disconnected; disconnected;
on_get_status(_InstId, #{pid := Pid}) -> on_get_status(_InstId, #{pid := Pid}) ->
@ -199,7 +226,11 @@ spawn_counter_process(Name, Register) ->
Pid. Pid.
counter_loop() -> counter_loop() ->
counter_loop(#{counter => 0, status => running, incorrect_status_count => 0}). counter_loop(#{
counter => 0,
status => running,
incorrect_status_count => 0
}).
counter_loop( counter_loop(
#{ #{
@ -213,6 +244,12 @@ counter_loop(
block -> block ->
ct:pal("counter recv: ~p", [block]), ct:pal("counter recv: ~p", [block]),
State#{status => blocked}; State#{status => blocked};
{block_now, ReplyFun} ->
ct:pal("counter recv: ~p", [block_now]),
apply_reply(
ReplyFun, {error, {resource_error, #{reason => blocked, msg => blocked}}}
),
State#{status => blocked};
resume -> resume ->
{messages, Msgs} = erlang:process_info(self(), messages), {messages, Msgs} = erlang:process_info(self(), messages),
ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]),

View File

@ -1254,6 +1254,174 @@ t_always_overflow(_Config) ->
), ),
ok. ok.
t_retry_sync_inflight(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% now really make the resource go into `blocked' state.
%% this results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
{{error, {recoverable_error, incorrect_status}}, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
{ok, {ok, _}} =
?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
ok
end,
[fun ?MODULE:assert_retry_fail_then_succeed_inflight/1]
),
ok.
t_retry_sync_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 200,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% now really make the resource go into `blocked' state.
%% this results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
{{error, {recoverable_error, incorrect_status}}, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
{ok, {ok, _}} =
?wait_async_action(
ok = emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
ok
end,
[fun ?MODULE:assert_retry_fail_then_succeed_inflight/1]
),
ok.
t_dont_retry_async_inflight(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% block,
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, block_now),
#{?snk_kind := resource_worker_enter_blocked},
ResumeInterval * 2
),
%% then send an async request; that shouldn't be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := resource_worker_flush_ack},
ResumeInterval * 2
),
%% will re-enter running because the single request is not retriable
{ok, _} = ?block_until(
#{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2
),
ok
end,
[fun ?MODULE:assert_no_retry_inflight/1]
),
ok.
t_dont_retry_async_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 200,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
QueryOpts = #{},
?check_trace(
begin
%% block,
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, block_now),
#{?snk_kind := resource_worker_enter_blocked},
ResumeInterval * 2
),
%% then send an async request; that shouldn't be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := resource_worker_flush_ack},
ResumeInterval * 2
),
%% will re-enter running because the single request is not retriable
{ok, _} = ?block_until(
#{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2
),
ok
end,
[fun ?MODULE:assert_no_retry_inflight/1]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -1317,3 +1485,27 @@ tap_metrics(Line) ->
{ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID), {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID),
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}. #{counters => C, gauges => G}.
assert_no_retry_inflight(Trace) ->
?assertEqual([], ?of_kind(resource_worker_retry_inflight_failed, Trace)),
?assertEqual([], ?of_kind(resource_worker_retry_inflight_succeeded, Trace)),
ok.
assert_retry_fail_then_succeed_inflight(Trace) ->
?assert(
?strict_causality(
#{?snk_kind := resource_worker_flush_nack, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
Trace
)
),
%% not strict causality because it might retry more than once
%% before restoring the resource health.
?assert(
?causality(
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
Trace
)
),
ok.