From 5c2ac0ac818e8fcec198fe18ea150a17f5039a66 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 17 Jan 2023 18:21:11 -0300 Subject: [PATCH] chore: don't cancel inflight items upon worker death; retry them --- .../src/emqx_resource_worker.erl | 32 ++++--------------- .../test/emqx_resource_SUITE.erl | 23 ++++--------- 2 files changed, 14 insertions(+), 41 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 6b739a44c..698d783f6 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -771,7 +771,7 @@ handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), Data = Data0#{async_workers := AsyncWorkers}, - cancel_inflight_items(Data, WorkerMRef), + mark_inflight_items_as_retriable(Data, WorkerMRef), {keep_state, Data}. call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> @@ -1118,37 +1118,19 @@ ack_inflight(InflightTID, Ref, Id, Index) -> emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), IsAcked. -cancel_inflight_items(Data, WorkerMRef) -> +mark_inflight_items_as_retriable(Data, WorkerMRef) -> #{inflight_tid := InflightTID} = Data, + IsRetriable = true, MatchSpec = ets:fun2ms( - fun(?INFLIGHT_ITEM(Ref, _BatchOrQuery, _IsRetriable, WorkerMRef0)) when + fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, _IsRetriable, WorkerMRef0)) when WorkerMRef =:= WorkerMRef0 -> - Ref + ?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef0) end ), - Refs = ets:select(InflightTID, MatchSpec), - lists:foreach(fun(Ref) -> do_cancel_inflight_item(Data, Ref) end, Refs). - -do_cancel_inflight_item(Data, Ref) -> - #{id := Id, index := Index, inflight_tid := InflightTID} = Data, - {Count, Batch} = - case ets:take(InflightTID, Ref) of - [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _) = Query, _IsRetriable, _WorkerMRef)] -> - {1, [Query]}; - [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, _IsRetriable, _WorkerMRef)] -> - {length(Batch0), Batch0}; - _ -> - {0, []} - end, - IsAcked = Count > 0, - IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - Result = {error, interrupted}, - QueryOpts = #{simple_query => false}, - _ = batch_reply_caller(Id, Result, Batch, QueryOpts), - ?tp(resource_worker_cancelled_inflight, #{ref => Ref}), + _NumAffected = ets:select_replace(InflightTID, MatchSpec), + ?tp(resource_worker_worker_down_update, #{num_affected => _NumAffected}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index f71dc4bb9..97bc8da66 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1432,6 +1432,7 @@ t_retry_async_inflight_batch(_Config) -> %% requests if they die. t_async_pool_worker_death(_Config) -> ResumeInterval = 1_000, + NumBufferWorkers = 2, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( ?ID, @@ -1441,7 +1442,7 @@ t_async_pool_worker_death(_Config) -> #{ query_mode => async, batch_size => 1, - worker_pool_size => 2, + worker_pool_size => NumBufferWorkers, resume_interval => ResumeInterval } ), @@ -1471,9 +1472,9 @@ t_async_pool_worker_death(_Config) -> %% grab one of the worker pids and kill it {ok, SRef1} = snabbkaffe:subscribe( - ?match_event(#{?snk_kind := resource_worker_cancelled_inflight}), - NumReqs, - 1_000 + ?match_event(#{?snk_kind := resource_worker_worker_down_update}), + NumBufferWorkers, + 10_000 ), {ok, #{pid := Pid0}} = emqx_resource:simple_sync_query(?ID, get_state), MRef = monitor(process, Pid0), @@ -1487,21 +1488,11 @@ t_async_pool_worker_death(_Config) -> ct:fail("worker should have died") end, - %% inflight requests should have been cancelled + %% inflight requests should have been marked as retriable {ok, _} = snabbkaffe:receive_events(SRef1), Inflight1 = emqx_resource_metrics:inflight_get(?ID), - ?assertEqual(0, Inflight1), + ?assertEqual(NumReqs, Inflight1), - ?assert( - lists:all( - fun - ({_, {error, interrupted}}) -> true; - (_) -> false - end, - ets:tab2list(Tab0) - ), - #{tab => ets:tab2list(Tab0)} - ), ok end, []