chore: don't cancel inflight items upon worker death; retry them

This commit is contained in:
Thales Macedo Garitezi 2023-01-17 18:21:11 -03:00
parent 087b667263
commit 5c2ac0ac81
2 changed files with 14 additions and 41 deletions

View File

@ -771,7 +771,7 @@ handle_async_worker_down(Data0, Pid) ->
#{async_workers := AsyncWorkers0} = Data0,
{WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),
Data = Data0#{async_workers := AsyncWorkers},
cancel_inflight_items(Data, WorkerMRef),
mark_inflight_items_as_retriable(Data, WorkerMRef),
{keep_state, Data}.
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
@ -1118,37 +1118,19 @@ ack_inflight(InflightTID, Ref, Id, Index) ->
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
IsAcked.
cancel_inflight_items(Data, WorkerMRef) ->
mark_inflight_items_as_retriable(Data, WorkerMRef) ->
#{inflight_tid := InflightTID} = Data,
IsRetriable = true,
MatchSpec =
ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, _BatchOrQuery, _IsRetriable, WorkerMRef0)) when
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when
WorkerMRef =:= WorkerMRef0
->
Ref
?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0)
end
),
Refs = ets:select(InflightTID, MatchSpec),
lists:foreach(fun(Ref) -> do_cancel_inflight_item(Data, Ref) end, Refs).
do_cancel_inflight_item(Data, Ref) ->
#{id := Id, index := Index, inflight_tid := InflightTID} = Data,
{Count, Batch} =
case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _) = Query, _IsRetriable, _WorkerMRef)] ->
{1, [Query]};
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, _IsRetriable, _WorkerMRef)] ->
{length(Batch0), Batch0};
_ ->
{0, []}
end,
IsAcked = Count > 0,
IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
Result = {error, interrupted},
QueryOpts = #{simple_query => false},
_ = batch_reply_caller(Id, Result, Batch, QueryOpts),
?tp(resource_worker_cancelled_inflight, #{ref => Ref}),
_NumAffected = ets:select_replace(InflightTID, MatchSpec),
?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}),
ok.
%%==============================================================================

View File

@ -1432,6 +1432,7 @@ t_retry_async_inflight_batch(_Config) ->
%% requests if they die.
t_async_pool_worker_death(_Config) ->
ResumeInterval = 1_000,
NumBufferWorkers = 2,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
@ -1441,7 +1442,7 @@ t_async_pool_worker_death(_Config) ->
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 2,
worker_pool_size => NumBufferWorkers,
resume_interval => ResumeInterval
}
),
@ -1471,9 +1472,9 @@ t_async_pool_worker_death(_Config) ->
%% grab one of the worker pids and kill it
{ok, SRef1} =
snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_cancelled_inflight}),
NumReqs,
1_000
?match_event(#{?snk_kind := resource_worker_worker_down_update}),
NumBufferWorkers,
10_000
),
{ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state),
MRef = monitor(process, Pid0),
@ -1487,21 +1488,11 @@ t_async_pool_worker_death(_Config) ->
ct:fail("worker should have died")
end,
%% inflight requests should have been cancelled
%% inflight requests should have been marked as retriable
{ok, _} = snabbkaffe:receive_events(SRef1),
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(0, Inflight1),
?assertEqual(NumReqs, Inflight1),
?assert(
lists:all(
fun
({_, {error, interrupted}}) -> true;
(_) -> false
end,
ets:tab2list(Tab0)
),
#{tab => ets:tab2list(Tab0)}
),
ok
end,
[]