diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 976b0bc1c..63e74fa11 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -2,7 +2,7 @@ version: '3.9' services: zookeeper: - image: wurstmeister/zookeeper + image: docker.io/library/zookeeper:3.6 ports: - "2181:2181" container_name: zookeeper diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index fbeec8724..18ecc644a 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -720,4 +720,4 @@ pub_props_to_packet(Properties) -> safe_filename(Filename) when is_binary(Filename) -> binary:replace(Filename, <<":">>, <<"-">>, [global]); safe_filename(Filename) when is_list(Filename) -> - string:replace(Filename, ":", "-", all). + lists:flatten(string:replace(Filename, ":", "-", all)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index bb4eee57d..a8ae4454d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -70,6 +70,18 @@ -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). +-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR), + (fun() -> + IsFullBefore = is_inflight_full(InflightTID), + case (EXPR) of + blocked -> + ok; + ok -> + ok = maybe_flush_after_async_reply(IsFullBefore) + end + end)() +). + -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). @@ -97,6 +109,7 @@ start_link(Id, Index, Opts) -> -spec sync_query(id(), request(), query_opts()) -> Result :: term(). sync_query(Id, Request, Opts0) -> + ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, sync), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), @@ -106,6 +119,7 @@ sync_query(Id, Request, Opts0) -> -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts0) -> + ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, async), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), @@ -121,6 +135,7 @@ simple_sync_query(Id, Request) -> %% call ends up calling buffering functions, that's a bug and %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. + ?tp(simple_sync_query, #{id => Id, request => Request}), Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), @@ -132,6 +147,7 @@ simple_sync_query(Id, Request) -> %% simple async-query the resource without batching and queuing. -spec simple_async_query(id(), request(), query_opts()) -> term(). simple_async_query(Id, Request, QueryOpts0) -> + ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}), Index = undefined, QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), @@ -194,8 +210,8 @@ init({Id, Index, Opts}) -> ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. -running(enter, _, Data) -> - ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}), +running(enter, _, #{tref := _Tref} = Data) -> + ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}), %% According to `gen_statem' laws, we mustn't call `maybe_flush' %% directly because it may decide to return `{next_state, blocked, _}', %% and that's an invalid response for a state enter call. @@ -212,9 +228,8 @@ running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) -> handle_query_requests(Request0, Data); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); -running(internal, flush, St) -> - flush(St); running(info, {flush, _Ref}, _St) -> + ?tp(discarded_stale_flush, #{}), keep_state_and_data; running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) @@ -225,21 +240,24 @@ running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}), keep_state_and_data. -blocked(enter, _, #{resume_interval := ResumeT} = _St) -> +blocked(enter, _, #{resume_interval := ResumeT} = St0) -> ?tp(buffer_worker_enter_blocked, #{}), - {keep_state_and_data, {state_timeout, ResumeT, unblock}}; + %% discard the old timer, new timer will be started when entering running state again + St = cancel_flush_timer(St0), + {keep_state, St, {state_timeout, ResumeT, unblock}}; blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> resume_from_blocked(St); -blocked(cast, flush, Data) -> - resume_from_blocked(Data); +blocked(cast, flush, St) -> + resume_from_blocked(St); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> + %% ignore stale timer keep_state_and_data; blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) @@ -335,11 +353,13 @@ resume_from_blocked(Data) -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, Query, Data); + {batch, Ref, NotExpired, []} -> + retry_inflight_sync(Ref, NotExpired, Data); {batch, Ref, NotExpired, Expired} -> - update_inflight_item(InflightTID, Ref, NotExpired), NumExpired = length(Expired), + ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), - NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), + ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, NotExpired, Data) @@ -470,9 +490,14 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), + ?tp(buffer_worker_flush, #{ + queued => CurrentCount, + is_inflight_full => IsFull, + inflight => inflight_count(InflightTID) + }), case {CurrentCount, IsFull} of {0, _} -> + ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -487,7 +512,7 @@ flush(Data0) -> %% if the request has expired, the caller is no longer %% waiting for a response. case sieve_expired_requests(Batch, Now) of - all_expired -> + {[], _AllExpired} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -496,7 +521,7 @@ flush(Data0) -> {NotExpired, Expired} -> NumExpired = length(Expired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), - IsBatch = BatchSize =/= 1, + IsBatch = (BatchSize > 1), %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. %% Maybe we could re-open the queue? @@ -506,7 +531,6 @@ flush(Data0) -> ), Ref = make_request_ref(), do_flush(Data2, #{ - new_queue => Q1, is_batch => IsBatch, batch => NotExpired, ref => Ref, @@ -519,18 +543,16 @@ flush(Data0) -> is_batch := boolean(), batch := [queue_query()], ack_ref := replayq:ack_ref(), - ref := inflight_key(), - new_queue := replayq:q() + ref := inflight_key() }) -> gen_statem:event_handler_result(state(), data()). do_flush( - Data0, + #{queue := Q1} = Data0, #{ is_batch := false, batch := Batch, ref := Ref, - ack_ref := QAckRef, - new_queue := Q1 + ack_ref := QAckRef } ) -> #{ @@ -606,16 +628,18 @@ do_flush( }), flush_worker(self()); false -> + ?tp(buffer_worker_queue_drained, #{ + inflight => inflight_count(InflightTID) + }), ok end, {keep_state, Data1} end; -do_flush(Data0, #{ +do_flush(#{queue := Q1} = Data0, #{ is_batch := true, batch := Batch, ref := Ref, - ack_ref := QAckRef, - new_queue := Q1 + ack_ref := QAckRef }) -> #{ id := Id, @@ -685,6 +709,9 @@ do_flush(Data0, #{ Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> + ?tp(buffer_worker_queue_drained, #{ + inflight => inflight_count(InflightTID) + }), Data1; {true, true} -> ?tp(buffer_worker_flush_ack_reflush, #{ @@ -718,13 +745,14 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> {ShouldAck, PostFns} = lists:foldl( fun(Reply, {_ShouldAck, PostFns}) -> + %% _ShouldAck should be the same as ShouldAck starting from the second reply {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), {ShouldAck, [PostFn | PostFns]} end, {ack, []}, Replies ), - PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, + PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, {ShouldAck, PostFn}. reply_caller(Id, Reply, QueryOpts) -> @@ -853,7 +881,7 @@ handle_async_worker_down(Data0, Pid) -> {keep_state, Data}. call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> - ?tp(call_query_enter, #{id => Id, query => Query}), + ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -919,7 +947,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, - query => minimize(Query) + min_query => minimize(Query) }, IsRetriable = false, WorkerMRef = undefined, @@ -952,7 +980,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, - batch => minimize(Batch) + min_batch => minimize(Batch) }, Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch @@ -968,27 +996,39 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re ). handle_async_reply( + #{ + request_ref := Ref, + inflight_tid := InflightTID, + query_opts := Opts + } = ReplyContext, + Result +) -> + case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of + discard -> + ok; + continue -> + ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result)) + end. + +handle_async_reply1( #{ request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, worker_index := Index, - buffer_worker := Pid, - query := ?QUERY(_, _, _, ExpireAt) = _Query + min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, - #{batch_or_query => [_Query], ref => Ref} + #{batch_or_query => [_Query], ref => Ref, result => Result} ), Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), - IsFullBefore andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> @@ -1003,7 +1043,7 @@ do_handle_async_reply( worker_index := Index, buffer_worker := Pid, inflight_tid := InflightTID, - query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, Result ) -> @@ -1020,46 +1060,95 @@ do_handle_async_reply( ref => Ref, result => Result }), - case Action of nack -> %% Keep retrying. - mark_inflight_as_retriable(InflightTID, Ref), - ?MODULE:block(Pid); + ok = mark_inflight_as_retriable(InflightTID, Ref), + ok = ?MODULE:block(Pid), + blocked; ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. handle_async_batch_reply( #{ - buffer_worker := Pid, - resource_id := Id, - worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, - batch := Batch + query_opts := Opts + } = ReplyContext, + Result +) -> + case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of + discard -> + ok; + continue -> + ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result)) + end. + +handle_async_batch_reply1( + #{ + inflight_tid := InflightTID, + request_ref := Ref, + min_batch := Batch } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, - #{batch_or_query => Batch, ref => Ref} + #{batch_or_query => Batch, ref => Ref, result => Result} ), Now = now_(), case sieve_expired_requests(Batch, Now) of - all_expired -> - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), - IsFullBefore andalso ?MODULE:flush_worker(Pid), - ?tp(handle_async_reply_expired, #{expired => Batch}), + {_NotExpired, []} -> + %% this is the critical code path, + %% we try not to do ets:lookup in this case + %% because the batch can be quite big + do_handle_async_batch_reply(ReplyContext, Result); + {_NotExpired, _Expired} -> + %% at least one is expired + %% the batch from reply context is minimized, so it cannot be used + %% to update the inflight items, hence discard Batch and lookup the RealBatch + ?tp(handle_async_reply_expired, #{expired => _Expired}), + handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end. + +handle_async_batch_reply2([], _, _, _) -> + %% this usually should never happen unless the async callback is being evaluated concurrently + ok; +handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> + ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, + #{ + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + min_batch := Batch + } = ReplyContext, + %% All batch items share the same HasBeenSent flag + %% So we just take the original flag from the ReplyContext batch + %% and put it back to the batch found in inflight table + %% which must have already been set to `false` + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), + RealNotExpired = + lists:map( + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + end, + RealNotExpired0 + ), + NumExpired = length(RealExpired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + case RealNotExpired of + [] -> + %% all expired, no need to update back the inflight batch + _ = ack_inflight(InflightTID, Ref, Id, Index), ok; - {NotExpired, Expired} -> - NumExpired = length(Expired), - emqx_resource_metrics:late_reply_inc(Id, NumExpired), - NumExpired > 0 andalso - ?tp(handle_async_reply_expired, #{expired => Expired}), - do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) + _ -> + %% some queries are not expired, put them back to the inflight batch + %% so it can be either acked now or retried later + ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), + do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) end. do_handle_async_batch_reply( @@ -1069,7 +1158,7 @@ do_handle_async_batch_reply( worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, - batch := Batch, + min_batch := Batch, query_opts := QueryOpts }, Result @@ -1084,14 +1173,14 @@ do_handle_async_batch_reply( case Action of nack -> %% Keep retrying. - mark_inflight_as_retriable(InflightTID, Ref), - ?MODULE:block(Pid); + ok = mark_inflight_as_retriable(InflightTID, Ref), + ok = ?MODULE:block(Pid), + blocked; ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. -do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> - IsFullBefore = is_inflight_full(InflightTID), +do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), case maps:get(simple_query, QueryOpts, false) of true -> @@ -1101,9 +1190,47 @@ do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> false -> ok end, - IsFullBefore andalso ?MODULE:flush_worker(WorkerPid), ok. +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> + %% inflight was not full before async reply is handled, + %% after it is handled, the inflight table must be even smaller + %% hance we can rely on the buffer worker's flush timer to trigger + %% the next flush + ?tp(skip_flushing_worker, #{}), + ok; +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> + %% the inflight table was full before handling aync reply + ?tp(do_flushing_worker, #{}), + ok = ?MODULE:flush_worker(self()). + +%% check if the async reply is valid. +%% e.g. if a connector evaluates the callback more than once: +%% 1. If the request was previously deleted from inflight table due to +%% either succeeded previously or expired, this function logs a +%% warning message and returns 'discard' instruction. +%% 2. If the request was previously failed and now pending on a retry, +%% then this function will return 'continue' as there is no way to +%% tell if this reply is stae or not. +maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) -> + continue; +maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) -> + try ets:member(InflightTID, Ref) of + true -> + continue; + false -> + ?tp( + warning, + unknown_async_reply_discarded, + #{inflight_key => Ref} + ), + discard + catch + error:badarg -> + %% shutdown ? + discard + end. + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) -> @@ -1202,10 +1329,8 @@ inflight_get_first_retriable(InflightTID, Now) -> {single, Ref, Query} end; {[{Ref, Batch = [_ | _]}], _Continuation} -> - %% batch is non-empty because we check that in - %% `sieve_expired_requests'. case sieve_expired_requests(Batch, Now) of - all_expired -> + {[], _AllExpired} -> {expired, Ref, Batch}; {NotExpired, Expired} -> {batch, Ref, NotExpired, Expired} @@ -1218,10 +1343,10 @@ is_inflight_full(InflightTID) -> [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages %% because one batch request may hold several messages. - Size = inflight_num_batches(InflightTID), + Size = inflight_count(InflightTID), Size >= MaxSize. -inflight_num_batches(InflightTID) -> +inflight_count(InflightTID) -> case ets:info(InflightTID, size) of undefined -> 0; Size -> max(0, Size - ?INFLIGHT_META_ROWS) @@ -1243,7 +1368,7 @@ inflight_append( InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), - IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), + IsNew andalso inc_inflight(InflightTID, BatchSize), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; @@ -1258,7 +1383,7 @@ inflight_append( Query = mark_as_sent(Query0), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), - IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), + IsNew andalso inc_inflight(InflightTID, 1), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; @@ -1274,6 +1399,8 @@ mark_inflight_as_retriable(undefined, _Ref) -> ok; mark_inflight_as_retriable(InflightTID, Ref) -> _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), + %% the old worker's DOWN should not affect this inflight any more + _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}), ok. %% Track each worker pid only once. @@ -1317,13 +1444,18 @@ ack_inflight(InflightTID, Ref, Id, Index) -> 1; [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> length(Batch); - _ -> + [] -> 0 end, - IsAcked = Count > 0, - IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - IsAcked. + ok = dec_inflight(InflightTID, Count), + IsKnownRef = (Count > 0), + case IsKnownRef of + true -> + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); + false -> + ok + end, + IsKnownRef. mark_inflight_items_as_retriable(Data, WorkerMRef) -> #{inflight_tid := InflightTID} = Data, @@ -1341,9 +1473,18 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), - ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), + ok = dec_inflight(InflightTID, NumExpired). + +inc_inflight(InflightTID, Count) -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), + ok. + +dec_inflight(_InflightTID, 0) -> + ok; +dec_inflight(InflightTID, Count) when Count > 0 -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok. %%============================================================================== @@ -1453,22 +1594,12 @@ is_async_return(_) -> false. sieve_expired_requests(Batch, Now) -> - {Expired, NotExpired} = - lists:partition( - fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> - is_expired(ExpireAt, Now) - end, - Batch - ), - case {NotExpired, Expired} of - {[], []} -> - %% Should be impossible for batch_size >= 1. - all_expired; - {[], [_ | _]} -> - all_expired; - {[_ | _], _} -> - {NotExpired, Expired} - end. + lists:partition( + fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> + not is_expired(ExpireAt, Now) + end, + Batch + ). -spec is_expired(infinity | integer(), integer()) -> boolean(). is_expired(infinity = _ExpireAt, _Now) -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 1d96fa083..f41087b20 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -135,11 +135,11 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> after 1000 -> {error, timeout} end; -on_query(_InstId, {sleep, For}, #{pid := Pid}) -> +on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => sync, for => For}), ReqRef = make_ref(), From = {self(), ReqRef}, - Pid ! {From, {sleep, For}}, + Pid ! {From, {sleep_before_reply, For}}, receive {ReqRef, Result} -> Result @@ -159,9 +159,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> Pid ! {big_payload, Payload, ReplyFun}, {ok, Pid}; -on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) -> +on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => async, for => For}), - Pid ! {{sleep, For}, ReplyFun}, + Pid ! {{sleep_before_reply, For}, ReplyFun}, {ok, Pid}. on_batch_query(InstId, BatchReq, State) -> @@ -173,10 +173,13 @@ on_batch_query(InstId, BatchReq, State) -> get_counter -> batch_get_counter(sync, InstId, State); {big_payload, _Payload} -> - batch_big_payload(sync, InstId, BatchReq, State) + batch_big_payload(sync, InstId, BatchReq, State); + {random_reply, Num} -> + %% async batch retried + make_random_reply(Num) end. -on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> +on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -> %% Requests can be of multiple types, but cannot be mixed. case hd(BatchReq) of {inc_counter, _} -> @@ -186,7 +189,11 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> block_now -> on_query_async(InstId, block_now, ReplyFunAndArgs, State); {big_payload, _Payload} -> - batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State) + batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State); + {random_reply, Num} -> + %% only take the first Num in the batch should be random enough + Pid ! {{random_reply, Num}, ReplyFunAndArgs}, + {ok, Pid} end. batch_inc_counter(CallMode, InstId, BatchReq, State) -> @@ -299,16 +306,33 @@ counter_loop( {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State; - {{sleep, _} = SleepQ, ReplyFun} -> + {{random_reply, RandNum}, ReplyFun} -> + %% usually a behaving connector should reply once and only once for + %% each (batch) request + %% but we try to reply random results a random number of times + %% with 'ok' in the result, the buffer worker should eventually + %% drain the buffer (and inflights table) + ReplyCount = 1 + (RandNum rem 3), + Results = make_random_replies(ReplyCount), + %% add a delay to trigger inflight full + lists:foreach( + fun(Result) -> + timer:sleep(rand:uniform(5)), + apply_reply(ReplyFun, Result) + end, + Results + ), + State; + {{sleep_before_reply, _} = SleepQ, ReplyFun} -> apply_reply(ReplyFun, handle_query(async, SleepQ, Status)), State; - {{FromPid, ReqRef}, {sleep, _} = SleepQ} -> + {{FromPid, ReqRef}, {sleep_before_reply, _} = SleepQ} -> FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)}, State end, counter_loop(NewState). -handle_query(Mode, {sleep, For} = Query, Status) -> +handle_query(Mode, {sleep_before_reply, For} = Query, Status) -> ok = timer:sleep(For), Result = case Status of @@ -329,3 +353,18 @@ maybe_register(_Name, _Pid, false) -> apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> apply(ReplyFun, Args ++ [Result]). + +make_random_replies(0) -> + []; +make_random_replies(N) -> + [make_random_reply(N) | make_random_replies(N - 1)]. + +make_random_reply(N) -> + case rand:uniform(3) of + 1 -> + {ok, N}; + 2 -> + {error, {recoverable_error, N}}; + 3 -> + {error, {unrecoverable_error, N}} + end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 984b3b04a..af72e86f9 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1482,7 +1482,7 @@ t_retry_async_inflight_full(_Config) -> AsyncInflightWindow * 2, fun() -> For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4), - {sleep, For} + {sleep_before_reply, For} end, #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}} ), @@ -1507,6 +1507,59 @@ t_retry_async_inflight_full(_Config) -> ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok. +%% this test case is to ensure the buffer worker will not go crazy even +%% if the underlying connector is misbehaving: evaluate async callbacks multiple times +t_async_reply_multi_eval(_Config) -> + ResumeInterval = 5, + TotalTime = 5_000, + AsyncInflightWindow = 3, + TotalQueries = AsyncInflightWindow * 5, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => ?FUNCTION_NAME}, + #{ + query_mode => async, + async_inflight_window => AsyncInflightWindow, + batch_size => 3, + batch_time => 10, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + %% block + ok = emqx_resource:simple_sync_query(?ID, block), + inc_counter_in_parallel( + TotalQueries, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} + end, + #{} + ), + ?retry( + ResumeInterval, + TotalTime div ResumeInterval, + begin + Metrics = tap_metrics(?LINE), + #{ + counters := Counters, + gauges := #{queuing := 0, inflight := 0} + } = Metrics, + #{ + matched := Matched, + success := Success, + dropped := Dropped, + late_reply := LateReply, + failed := Failed + } = Counters, + ?assertEqual(TotalQueries, Matched - 1), + ?assertEqual(Matched, Success + Dropped + LateReply + Failed) + end + ). + t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), @@ -1944,7 +1997,7 @@ t_expiration_async_batch_after_reply(_Config) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 2, + batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => 2_000 @@ -1959,7 +2012,7 @@ do_t_expiration_async_after_reply(IsBatch) -> NAcks = case IsBatch of batch -> 1; - single -> 2 + single -> 3 end, ?force_ordering( #{?snk_kind := buffer_worker_flush_ack}, @@ -1980,6 +2033,10 @@ do_t_expiration_async_after_reply(IsBatch) -> ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) + ), ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) ), @@ -1997,30 +2054,44 @@ do_t_expiration_async_after_reply(IsBatch) -> {ok, _} = ?block_until( #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), + wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), unlink(Pid0), exit(Pid0, kill), ok end, fun(Trace) -> - ?assertMatch( - [ - #{ - expired := [{query, _, {inc_counter, 199}, _, _}] - } - ], - ?of_kind(handle_async_reply_expired, Trace) - ), - wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), + case IsBatch of + batch -> + ?assertMatch( + [ + #{ + expired := [ + {query, _, {inc_counter, 199}, _, _}, + {query, _, {inc_counter, 299}, _, _} + ] + } + ], + ?of_kind(handle_async_reply_expired, Trace) + ); + single -> + ?assertMatch( + [ + #{expired := [{query, _, {inc_counter, 199}, _, _}]}, + #{expired := [{query, _, {inc_counter, 299}, _, _}]} + ], + ?of_kind(handle_async_reply_expired, Trace) + ) + end, Metrics = tap_metrics(?LINE), ?assertMatch( #{ counters := #{ - matched := 2, + matched := 3, %% the request with infinity timeout. success := 1, dropped := 0, - late_reply := 1, + late_reply := 2, retried := 0, failed := 0 } @@ -2042,7 +2113,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 2, + batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => ResumeInterval @@ -2067,6 +2138,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) + ), Pid0 = spawn_link(fun() -> ?tp(delay_enter, #{}), @@ -2087,7 +2162,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?assertMatch( [ #{ - expired := [{query, _, {inc_counter, 199}, _, _}] + expired := [ + {query, _, {inc_counter, 199}, _, _}, + {query, _, {inc_counter, 299}, _, _} + ] } ], ?of_kind(handle_async_reply_expired, Trace) @@ -2096,12 +2174,16 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?assertMatch( #{ counters := #{ - matched := 1, + matched := 2, success := 0, dropped := 0, - late_reply := 1, + late_reply := 2, retried := 0, failed := 0 + }, + gauges := #{ + inflight := 0, + queuing := 0 } }, Metrics @@ -2217,6 +2299,16 @@ do_t_expiration_retry(IsBatch) -> [#{expired := [{query, _, {inc_counter, 1}, _, _}]}], ?of_kind(buffer_worker_retry_expired, Trace) ), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + gauges := #{ + inflight := 0, + queuing := 0 + } + }, + Metrics + ), ok end ), diff --git a/changes/ce/fix-10020.en.md b/changes/ce/fix-10020.en.md new file mode 100644 index 000000000..73615804b --- /dev/null +++ b/changes/ce/fix-10020.en.md @@ -0,0 +1 @@ +Fix bridge metrics when running in async mode with batching enabled (`batch_size` > 1). diff --git a/changes/ce/fix-10020.zh.md b/changes/ce/fix-10020.zh.md new file mode 100644 index 000000000..2fce853e3 --- /dev/null +++ b/changes/ce/fix-10020.zh.md @@ -0,0 +1 @@ +修复使用异步和批量配置的桥接计数不准确的问题。 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index d06218397..9b38e98d3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -268,7 +268,7 @@ kafka_bridge_rest_api_helper(Config) -> CreateBodyTmp = #{ <<"type">> => <<"kafka">>, <<"name">> => <<"my_kafka_bridge">>, - <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config), + <<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)), <<"enable">> => true, <<"authentication">> => maps:get(<<"authentication">>, Config), <<"producer">> => #{ @@ -276,7 +276,7 @@ kafka_bridge_rest_api_helper(Config) -> topic => <<"t/#">> }, <<"kafka">> => #{ - <<"topic">> => erlang:list_to_binary(KafkaTopic), + <<"topic">> => iolist_to_binary(KafkaTopic), <<"buffer">> => #{ <<"memory_overload_protection">> => <<"false">> }, diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index b44095624..ba6d1f91f 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -21,6 +21,12 @@ help() { echo " otherwise it runs the entire app's CT" } +if command -v docker-compose; then + DC='docker-compose' +else + DC='docker compose' +fi + WHICH_APP='novalue' CONSOLE='no' KEEP_UP='no' @@ -155,7 +161,7 @@ for dep in ${CT_DEPS}; do ;; tdengine) FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' ) - ;; + ;; *) echo "unknown_ct_dependency $dep" exit 1 @@ -201,7 +207,7 @@ if [ "$STOP" = 'no' ]; then # some left-over log file has to be deleted before a new docker-compose up rm -f '.ci/docker-compose-file/redis/*.log' # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS up -d --build --remove-orphans + $DC $F_OPTIONS up -d --build --remove-orphans fi echo "Fixing file owners and permissions for $UID_GID" @@ -218,7 +224,7 @@ set +e if [ "$STOP" = 'yes' ]; then # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS down --remove-orphans + $DC $F_OPTIONS down --remove-orphans elif [ "$ATTACH" = 'yes' ]; then docker exec -it "$ERLANG_CONTAINER" bash elif [ "$CONSOLE" = 'yes' ]; then @@ -235,11 +241,11 @@ else LOG='_build/test/logs/docker-compose.log' echo "Dumping docker-compose log to $LOG" # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS logs --no-color --timestamps > "$LOG" + $DC $F_OPTIONS logs --no-color --timestamps > "$LOG" fi if [ "$KEEP_UP" != 'yes' ]; then # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS down + $DC $F_OPTIONS down fi exit $RESULT fi