diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0a6adf3d6..77494f4ba 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -475,6 +475,7 @@ flush(Data0) -> ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), case {CurrentCount, IsFull} of {0, _} -> + ?tp(buffer_worker_queue_drained, #{inflight => inflight_num_batches(InflightTID)}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -918,7 +919,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, - query => minimize(Query) + min_query => minimize(Query) }, IsRetriable = false, WorkerMRef = undefined, @@ -951,7 +952,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, - batch => minimize(Batch) + min_batch => minimize(Batch) }, Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch @@ -967,19 +968,33 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re ). handle_async_reply( + #{ + request_ref := Ref, + inflight_tid := InflightTID + } = ReplyContext, + Result +) -> + case maybe_handle_unknown_async_reply(InflightTID, Ref) of + discard -> + ok; + continue -> + handle_async_reply1(ReplyContext, Result) + end. + +handle_async_reply1( #{ request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, worker_index := Index, buffer_worker := Pid, - query := ?QUERY(_, _, _, ExpireAt) = _Query + min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, - #{batch_or_query => [_Query], ref => Ref} + #{batch_or_query => [_Query], ref => Ref, result => Result} ), Now = now_(), case is_expired(ExpireAt, Now) of @@ -1002,7 +1017,7 @@ do_handle_async_reply( worker_index := Index, buffer_worker := Pid, inflight_tid := InflightTID, - query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, Result ) -> @@ -1031,16 +1046,30 @@ do_handle_async_reply( ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply( + #{ + inflight_tid := InflightTID, + request_ref := Ref + } = ReplyContext, + Result +) -> + case maybe_handle_unknown_async_reply(InflightTID, Ref) of + discard -> + ok; + continue -> + handle_async_batch_reply1(ReplyContext, Result) + end. + +handle_async_batch_reply1( #{ inflight_tid := InflightTID, request_ref := Ref, - batch := Batch + min_batch := Batch } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, - #{batch_or_query => Batch, ref => Ref} + #{batch_or_query => Batch, ref => Ref, result => Result} ), Now = now_(), IsFullBefore = is_inflight_full(InflightTID), @@ -1060,8 +1089,7 @@ handle_async_batch_reply( ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply2([], _, _, _) -> - %% e.g. if the driver evaluates the callback more than once - %% which should really be a bug + %% should have caused the unknown_async_reply_discarded ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, @@ -1070,7 +1098,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, - batch := Batch + min_batch := Batch } = ReplyContext, %% All batch items share the same HasBeenSent flag %% So we just take the original flag from the ReplyContext batch @@ -1096,7 +1124,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + ok = do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) end, ok. @@ -1107,7 +1135,7 @@ do_handle_async_batch_reply( worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, - batch := Batch, + min_batch := Batch, query_opts := QueryOpts }, Result @@ -1123,7 +1151,7 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ?MODULE:block(Pid); + ok = ?MODULE:block(Pid); ack -> ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. @@ -1150,6 +1178,32 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> %% the inflight table was full before handling aync reply ok = ?MODULE:flush_worker(self()). +%% check if the async reply is valid. +%% e.g. if a connector evaluates the callback more than once: +%% 1. If the request was previously deleted from inflight table due to +%% either succeeded previously or expired, this function logs a +%% warning message and returns 'discard' instruction. +%% 2. If the request was previously failed and now pending on a retry, +%% then this function will return 'continue' as there is no way to +%% tell if this reply is stae or not. +maybe_handle_unknown_async_reply(InflightTID, Ref) -> + try ets:member(InflightTID, Ref) of + true -> + %% NOTE: this does not mean the + continue; + false -> + ?tp( + warning, + unknown_async_reply_discarded, + #{inflight_key => Ref} + ), + discard + catch + error:badarg -> + %% shutdown ? + discard + end. + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) -> @@ -1287,7 +1341,7 @@ inflight_append( InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), - IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), + IsNew andalso inc_inflight(InflightTID, BatchSize), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; @@ -1302,7 +1356,7 @@ inflight_append( Query = mark_as_sent(Query0), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), - IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), + IsNew andalso inc_inflight(InflightTID, 1), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; @@ -1318,6 +1372,8 @@ mark_inflight_as_retriable(undefined, _Ref) -> ok; mark_inflight_as_retriable(InflightTID, Ref) -> _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), + %% the old worker's DOWN should not affect this inflight any more + _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}), ok. %% Track each worker pid only once. @@ -1367,7 +1423,7 @@ ack_inflight(InflightTID, Ref, Id, Index) -> IsKnownRef = (Count > 0), case IsKnownRef of true -> - ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + ok = dec_inflight(InflightTID, Count), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); false -> ok @@ -1390,9 +1446,17 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), - _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}), + ok = dec_inflight(InflightTID, NumExpired), + ok. + +inc_inflight(InflightTID, Count) -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), + ok. + +dec_inflight(InflightTID, Count) when Count > 0 -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 1d96fa083..a6b7b2339 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -135,11 +135,11 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> after 1000 -> {error, timeout} end; -on_query(_InstId, {sleep, For}, #{pid := Pid}) -> +on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => sync, for => For}), ReqRef = make_ref(), From = {self(), ReqRef}, - Pid ! {From, {sleep, For}}, + Pid ! {From, {sleep_before_reply, For}}, receive {ReqRef, Result} -> Result @@ -159,9 +159,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> Pid ! {big_payload, Payload, ReplyFun}, {ok, Pid}; -on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) -> +on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => async, for => For}), - Pid ! {{sleep, For}, ReplyFun}, + Pid ! {{sleep_before_reply, For}, ReplyFun}, {ok, Pid}. on_batch_query(InstId, BatchReq, State) -> @@ -173,10 +173,13 @@ on_batch_query(InstId, BatchReq, State) -> get_counter -> batch_get_counter(sync, InstId, State); {big_payload, _Payload} -> - batch_big_payload(sync, InstId, BatchReq, State) + batch_big_payload(sync, InstId, BatchReq, State); + {random_reply, Num} -> + %% async batch retried + random_reply(Num) end. -on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> +on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -> %% Requests can be of multiple types, but cannot be mixed. case hd(BatchReq) of {inc_counter, _} -> @@ -186,7 +189,11 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> block_now -> on_query_async(InstId, block_now, ReplyFunAndArgs, State); {big_payload, _Payload} -> - batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State) + batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State); + {random_reply, Num} -> + %% only take the first Num in the batch should be random enough + Pid ! {{random_reply, Num}, ReplyFunAndArgs}, + {ok, Pid} end. batch_inc_counter(CallMode, InstId, BatchReq, State) -> @@ -299,16 +306,31 @@ counter_loop( {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State; - {{sleep, _} = SleepQ, ReplyFun} -> + {{random_reply, RandNum}, ReplyFun} -> + %% usually a behaving connector should reply once and only once for + %% each (batch) request + %% but we try to reply random results a random number of times + %% with 'ok' in the result, the buffer worker should eventually + %% drain the buffer (and inflights table) + ReplyCount = 1 + (RandNum rem 3), + Results = random_replies(ReplyCount), + lists:foreach( + fun(Result) -> + apply_reply(ReplyFun, Result) + end, + Results + ), + State; + {{sleep_before_reply, _} = SleepQ, ReplyFun} -> apply_reply(ReplyFun, handle_query(async, SleepQ, Status)), State; - {{FromPid, ReqRef}, {sleep, _} = SleepQ} -> + {{FromPid, ReqRef}, {sleep_before_reply, _} = SleepQ} -> FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)}, State end, counter_loop(NewState). -handle_query(Mode, {sleep, For} = Query, Status) -> +handle_query(Mode, {sleep_before_reply, For} = Query, Status) -> ok = timer:sleep(For), Result = case Status of @@ -329,3 +351,18 @@ maybe_register(_Name, _Pid, false) -> apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> apply(ReplyFun, Args ++ [Result]). + +random_replies(0) -> + []; +random_replies(N) -> + [random_reply(N) | random_replies(N - 1)]. + +random_reply(N) -> + case rand:uniform(3) of + 1 -> + {ok, N}; + 2 -> + {error, {recoverable_error, N}}; + 3 -> + {error, {unrecoverable_error, N}} + end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index bc146cc8e..1362cd1cc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1482,7 +1482,7 @@ t_retry_async_inflight_full(_Config) -> AsyncInflightWindow * 2, fun() -> For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4), - {sleep, For} + {sleep_before_reply, For} end, #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}} ), @@ -1507,6 +1507,68 @@ t_retry_async_inflight_full(_Config) -> ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok. +%% this test case is to ensure the buffer worker will not go crazy even +%% if the underlying connector is misbehaving: evaluate async callbacks multiple times +t_async_reply_multi_eval(_Config) -> + ResumeInterval = 20, + AsyncInflightWindow = 5, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => ?FUNCTION_NAME}, + #{ + query_mode => async, + async_inflight_window => AsyncInflightWindow, + batch_size => 3, + batch_time => 10, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + #{timetrap => 15_000}, + begin + %% block + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel( + AsyncInflightWindow * 2, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} + end, + #{} + ), + #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, + ResumeInterval * 200 + ), + ok + end, + [ + fun(Trace) -> + ?assertMatch([#{inflight := 0}], ?of_kind(buffer_worker_queue_drained, Trace)) + end + ] + ), + Metrics = tap_metrics(?LINE), + #{ + counters := Counters, + gauges := #{queuing := 0, inflight := 0} + } = Metrics, + #{ + matched := Matched, + success := Success, + dropped := Dropped, + late_reply := LateReply, + failed := Failed + } = Counters, + ?assertEqual(Matched, Success + Dropped + LateReply + Failed), + ok. + t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible),