From bb13d0708f387e05af18479ba88ab101befb2233 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 13:20:58 +0100 Subject: [PATCH 01/18] fix(bridge): fix dropped counter and inflight gauge Prior to this fix there were two metrics issues 1. if a batch is all requests expired when receiving a reply it only bumped 1 instead of the batch size for 'late_reply' 2. when a batch is partially delivered (or expired), the dropped requests were not decremented from the inflight size gauge --- .../emqx_resource/src/emqx_resource_buffer_worker.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index bb4eee57d..2f83a347a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -336,8 +336,8 @@ resume_from_blocked(Data) -> %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, Query, Data); {batch, Ref, NotExpired, Expired} -> - update_inflight_item(InflightTID, Ref, NotExpired), NumExpired = length(Expired), + update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them @@ -1050,7 +1050,7 @@ handle_async_batch_reply( all_expired -> IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)), IsFullBefore andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => Batch}), ok; @@ -1317,10 +1317,10 @@ ack_inflight(InflightTID, Ref, Id, Index) -> 1; [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] -> length(Batch); - _ -> + [] -> 0 end, - IsAcked = Count > 0, + IsAcked = (Count > 0), IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), IsAcked. @@ -1341,8 +1341,9 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}), ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), ok. From 4e70374e2a051df6bdaed546d63ce05a32cf2979 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 18:28:23 +0100 Subject: [PATCH 02/18] test: make docker-compose work --- scripts/ct/run.sh | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index b44095624..ba6d1f91f 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -21,6 +21,12 @@ help() { echo " otherwise it runs the entire app's CT" } +if command -v docker-compose; then + DC='docker-compose' +else + DC='docker compose' +fi + WHICH_APP='novalue' CONSOLE='no' KEEP_UP='no' @@ -155,7 +161,7 @@ for dep in ${CT_DEPS}; do ;; tdengine) FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' ) - ;; + ;; *) echo "unknown_ct_dependency $dep" exit 1 @@ -201,7 +207,7 @@ if [ "$STOP" = 'no' ]; then # some left-over log file has to be deleted before a new docker-compose up rm -f '.ci/docker-compose-file/redis/*.log' # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS up -d --build --remove-orphans + $DC $F_OPTIONS up -d --build --remove-orphans fi echo "Fixing file owners and permissions for $UID_GID" @@ -218,7 +224,7 @@ set +e if [ "$STOP" = 'yes' ]; then # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS down --remove-orphans + $DC $F_OPTIONS down --remove-orphans elif [ "$ATTACH" = 'yes' ]; then docker exec -it "$ERLANG_CONTAINER" bash elif [ "$CONSOLE" = 'yes' ]; then @@ -235,11 +241,11 @@ else LOG='_build/test/logs/docker-compose.log' echo "Dumping docker-compose log to $LOG" # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS logs --no-color --timestamps > "$LOG" + $DC $F_OPTIONS logs --no-color --timestamps > "$LOG" fi if [ "$KEEP_UP" != 'yes' ]; then # shellcheck disable=2086 # no quotes for F_OPTIONS - docker compose $F_OPTIONS down + $DC $F_OPTIONS down fi exit $RESULT fi From fc614e16e55d0f3e484ab0ad53a3c03932a9d797 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 20:07:35 +0100 Subject: [PATCH 03/18] fix(bridge): update inflight items after partial expiry --- .../src/emqx_resource_buffer_worker.erl | 98 ++++++++++++------- .../test/emqx_resource_SUITE.erl | 2 +- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2f83a347a..6aa13092a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -335,11 +335,13 @@ resume_from_blocked(Data) -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, Query, Data); + {batch, Ref, NotExpired, []} -> + retry_inflight_sync(Ref, NotExpired, Data); {batch, Ref, NotExpired, Expired} -> NumExpired = length(Expired), - update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), + ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), - NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}), + ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. retry_inflight_sync(Ref, NotExpired, Data) @@ -496,7 +498,7 @@ flush(Data0) -> {NotExpired, Expired} -> NumExpired = length(Expired), emqx_resource_metrics:dropped_expired_inc(Id, NumExpired), - IsBatch = BatchSize =/= 1, + IsBatch = (BatchSize > 1), %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. %% Maybe we could re-open the queue? @@ -506,7 +508,6 @@ flush(Data0) -> ), Ref = make_request_ref(), do_flush(Data2, #{ - new_queue => Q1, is_batch => IsBatch, batch => NotExpired, ref => Ref, @@ -519,18 +520,16 @@ flush(Data0) -> is_batch := boolean(), batch := [queue_query()], ack_ref := replayq:ack_ref(), - ref := inflight_key(), - new_queue := replayq:q() + ref := inflight_key() }) -> gen_statem:event_handler_result(state(), data()). do_flush( - Data0, + #{queue := Q1} = Data0, #{ is_batch := false, batch := Batch, ref := Ref, - ack_ref := QAckRef, - new_queue := Q1 + ack_ref := QAckRef } ) -> #{ @@ -610,12 +609,11 @@ do_flush( end, {keep_state, Data1} end; -do_flush(Data0, #{ +do_flush(#{queue := Q1} = Data0, #{ is_batch := true, batch := Batch, ref := Ref, - ack_ref := QAckRef, - new_queue := Q1 + ack_ref := QAckRef }) -> #{ id := Id, @@ -715,17 +713,18 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> end, Batch ), - {ShouldAck, PostFns} = + {Action, PostFn1} = reply_caller_defer_metrics(Id, hd(Replies), QueryOpts), + PostFns = lists:foldl( - fun(Reply, {_ShouldAck, PostFns}) -> - {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), - {ShouldAck, [PostFn | PostFns]} + fun(Reply, PostFns) -> + {_, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), + [PostFn | PostFns] end, - {ack, []}, - Replies + [PostFn1], + tl(Replies) ), - PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, - {ShouldAck, PostFn}. + PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, + {Action, PostFn}. reply_caller(Id, Reply, QueryOpts) -> {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), @@ -1024,7 +1023,7 @@ do_handle_async_reply( case Action of nack -> %% Keep retrying. - mark_inflight_as_retriable(InflightTID, Ref), + ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) @@ -1051,15 +1050,40 @@ handle_async_batch_reply( IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)), - IsFullBefore andalso ?MODULE:flush_worker(Pid), + IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => Batch}), ok; - {NotExpired, Expired} -> - NumExpired = length(Expired), - emqx_resource_metrics:late_reply_inc(Id, NumExpired), - NumExpired > 0 andalso - ?tp(handle_async_reply_expired, #{expired => Expired}), - do_handle_async_batch_reply(ReplyContext#{batch := NotExpired}, Result) + {_NotExpired, []} -> + do_handle_async_batch_reply(ReplyContext, Result); + {_NotExpired, _Expired} -> + %% partial expire + %% the batch from reply context is minimized, so it cannot be used + %% to update the inflight items, hence discard Batch and lookup the RealBatch + ?tp(handle_async_reply_expired, #{expired => _Expired}), + case ets:lookup(InflightTID, Ref) of + [] -> + %% e.g. if the driver evaluates it more than once + %% which should really be a bug, TODO: add a unknown_reply counter + ok; + [?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef)] -> + %% All batch items share the same HasBeenSent flag + %% So we just take the original flag from the ReplyContext batch + %% and put it back to the batch found in inflight table + %% which must have already been set to `false` + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), + RealNotExpired = + lists:map( + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + end, + RealNotExpired0 + ), + NumExpired = length(RealExpired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), + do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + end end. do_handle_async_batch_reply( @@ -1084,7 +1108,7 @@ do_handle_async_batch_reply( case Action of nack -> %% Keep retrying. - mark_inflight_as_retriable(InflightTID, Ref), + ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) @@ -1320,10 +1344,15 @@ ack_inflight(InflightTID, Ref, Id, Index) -> [] -> 0 end, - IsAcked = (Count > 0), - IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), - IsAcked. + IsKnownRef = (Count > 0), + case IsKnownRef of + true -> + ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); + false -> + ok + end, + IsKnownRef. mark_inflight_items_as_retriable(Data, WorkerMRef) -> #{inflight_tid := InflightTID} = Data, @@ -1341,10 +1370,9 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}), - ?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 984b3b04a..92f069739 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1997,6 +1997,7 @@ do_t_expiration_async_after_reply(IsBatch) -> {ok, _} = ?block_until( #{?snk_kind := handle_async_reply_expired}, 10 * TimeoutMS ), + wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), unlink(Pid0), exit(Pid0, kill), @@ -2011,7 +2012,6 @@ do_t_expiration_async_after_reply(IsBatch) -> ], ?of_kind(handle_async_reply_expired, Trace) ), - wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}), Metrics = tap_metrics(?LINE), ?assertMatch( #{ From 2811c371adda857633837666e7ec6d2be8518012 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 22:18:44 +0100 Subject: [PATCH 04/18] docs: add changelogs --- changes/ce/fix-10020.en.md | 1 + changes/ce/fix-10020.zh.md | 1 + 2 files changed, 2 insertions(+) create mode 100644 changes/ce/fix-10020.en.md create mode 100644 changes/ce/fix-10020.zh.md diff --git a/changes/ce/fix-10020.en.md b/changes/ce/fix-10020.en.md new file mode 100644 index 000000000..73615804b --- /dev/null +++ b/changes/ce/fix-10020.en.md @@ -0,0 +1 @@ +Fix bridge metrics when running in async mode with batching enabled (`batch_size` > 1). diff --git a/changes/ce/fix-10020.zh.md b/changes/ce/fix-10020.zh.md new file mode 100644 index 000000000..2fce853e3 --- /dev/null +++ b/changes/ce/fix-10020.zh.md @@ -0,0 +1 @@ +修复使用异步和批量配置的桥接计数不准确的问题。 From bf8becd52197592fe36d3b7fbcca5dc45f378466 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 23:07:12 +0100 Subject: [PATCH 05/18] test: make sure gauge return to 0 in test cases --- apps/emqx_resource/test/emqx_resource_SUITE.erl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 92f069739..82c95cb99 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2102,6 +2102,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> late_reply := 1, retried := 0, failed := 0 + }, + gauges := #{ + inflight := 0, + queuing := 0 } }, Metrics @@ -2217,6 +2221,16 @@ do_t_expiration_retry(IsBatch) -> [#{expired := [{query, _, {inc_counter, 1}, _, _}]}], ?of_kind(buffer_worker_retry_expired, Trace) ), + Metrics = tap_metrics(?LINE), + ?assertMatch( + #{ + gauges := #{ + inflight := 0, + queuing := 0 + } + }, + Metrics + ), ok end ), From 036f69cd6e94df535dc0b217ee81d2657fd4aa3f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 22 Feb 2023 23:26:04 +0100 Subject: [PATCH 06/18] test: ensure batch size > 1 is covered in expiration test --- .../test/emqx_resource_SUITE.erl | 57 +++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 82c95cb99..bc146cc8e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1944,7 +1944,7 @@ t_expiration_async_batch_after_reply(_Config) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 2, + batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => 2_000 @@ -1959,7 +1959,7 @@ do_t_expiration_async_after_reply(IsBatch) -> NAcks = case IsBatch of batch -> 1; - single -> 2 + single -> 3 end, ?force_ordering( #{?snk_kind := buffer_worker_flush_ack}, @@ -1980,6 +1980,10 @@ do_t_expiration_async_after_reply(IsBatch) -> ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) + ), ?assertEqual( ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity}) ), @@ -2004,23 +2008,37 @@ do_t_expiration_async_after_reply(IsBatch) -> ok end, fun(Trace) -> - ?assertMatch( - [ - #{ - expired := [{query, _, {inc_counter, 199}, _, _}] - } - ], - ?of_kind(handle_async_reply_expired, Trace) - ), + case IsBatch of + batch -> + ?assertMatch( + [ + #{ + expired := [ + {query, _, {inc_counter, 199}, _, _}, + {query, _, {inc_counter, 299}, _, _} + ] + } + ], + ?of_kind(handle_async_reply_expired, Trace) + ); + single -> + ?assertMatch( + [ + #{expired := [{query, _, {inc_counter, 199}, _, _}]}, + #{expired := [{query, _, {inc_counter, 299}, _, _}]} + ], + ?of_kind(handle_async_reply_expired, Trace) + ) + end, Metrics = tap_metrics(?LINE), ?assertMatch( #{ counters := #{ - matched := 2, + matched := 3, %% the request with infinity timeout. success := 1, dropped := 0, - late_reply := 1, + late_reply := 2, retried := 0, failed := 0 } @@ -2042,7 +2060,7 @@ t_expiration_batch_all_expired_after_reply(_Config) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 2, + batch_size => 3, batch_time => 100, worker_pool_size => 1, resume_interval => ResumeInterval @@ -2067,6 +2085,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ok, emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) ), + ?assertEqual( + ok, + emqx_resource:query(?ID, {inc_counter, 299}, #{timeout => TimeoutMS}) + ), Pid0 = spawn_link(fun() -> ?tp(delay_enter, #{}), @@ -2087,7 +2109,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?assertMatch( [ #{ - expired := [{query, _, {inc_counter, 199}, _, _}] + expired := [ + {query, _, {inc_counter, 199}, _, _}, + {query, _, {inc_counter, 299}, _, _} + ] } ], ?of_kind(handle_async_reply_expired, Trace) @@ -2096,10 +2121,10 @@ t_expiration_batch_all_expired_after_reply(_Config) -> ?assertMatch( #{ counters := #{ - matched := 1, + matched := 2, success := 0, dropped := 0, - late_reply := 1, + late_reply := 2, retried := 0, failed := 0 }, From 713220f88b8a0bbc5dec89eb54fd9e5b9415d311 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 00:04:20 +0100 Subject: [PATCH 07/18] refactor(buffer_worker): more generic process for all_expired --- .../src/emqx_resource_buffer_worker.erl | 110 +++++++++--------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 6aa13092a..ff3b67c7a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -489,7 +489,7 @@ flush(Data0) -> %% if the request has expired, the caller is no longer %% waiting for a response. case sieve_expired_requests(Batch, Now) of - all_expired -> + {[], _AllExpired} -> ok = replayq:ack(Q1, QAckRef), emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -1031,9 +1031,6 @@ do_handle_async_reply( handle_async_batch_reply( #{ - buffer_worker := Pid, - resource_id := Id, - worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, batch := Batch @@ -1046,44 +1043,59 @@ handle_async_batch_reply( ), Now = now_(), case sieve_expired_requests(Batch, Now) of - all_expired -> - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsAcked andalso emqx_resource_metrics:late_reply_inc(Id, length(Batch)), - IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid), - ?tp(handle_async_reply_expired, #{expired => Batch}), - ok; {_NotExpired, []} -> + %% this is the critical code path, + %% we try not to do ets:lookup in this case + %% because the batch can be quite big do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> - %% partial expire + %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - case ets:lookup(InflightTID, Ref) of - [] -> - %% e.g. if the driver evaluates it more than once - %% which should really be a bug, TODO: add a unknown_reply counter - ok; - [?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef)] -> - %% All batch items share the same HasBeenSent flag - %% So we just take the original flag from the ReplyContext batch - %% and put it back to the batch found in inflight table - %% which must have already been set to `false` - [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, - {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), - RealNotExpired = - lists:map( - fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> - ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) - end, - RealNotExpired0 - ), - NumExpired = length(RealExpired), - emqx_resource_metrics:late_reply_inc(Id, NumExpired), - ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) - end + handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end. + +handle_async_batch_reply2([], _, _, _) -> + %% e.g. if the driver evaluates the callback more than once + %% which should really be a bug + ok; +handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> + ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, + #{ + buffer_worker := Pid, + resource_id := Id, + worker_index := Index, + inflight_tid := InflightTID, + request_ref := Ref, + batch := Batch + } = ReplyContext, + %% All batch items share the same HasBeenSent flag + %% So we just take the original flag from the ReplyContext batch + %% and put it back to the batch found in inflight table + %% which must have already been set to `false` + [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch, + {RealNotExpired0, RealExpired} = sieve_expired_requests(RealBatch, Now), + RealNotExpired = + lists:map( + fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) -> + ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt) + end, + RealNotExpired0 + ), + NumExpired = length(RealExpired), + emqx_resource_metrics:late_reply_inc(Id, NumExpired), + case RealNotExpired of + [] -> + %% all expired, no need to update back the inflight batch + IsFullBefore = is_inflight_full(InflightTID), + IsAcked = ack_inflight(InflightTID, Ref, Id, Index), + IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid); + _ -> + %% some queries are not expired, put them back to the inflight batch + %% so it can be either acked now or retried later + ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), + do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) end. do_handle_async_batch_reply( @@ -1226,10 +1238,8 @@ inflight_get_first_retriable(InflightTID, Now) -> {single, Ref, Query} end; {[{Ref, Batch = [_ | _]}], _Continuation} -> - %% batch is non-empty because we check that in - %% `sieve_expired_requests'. case sieve_expired_requests(Batch, Now) of - all_expired -> + {[], _AllExpired} -> {expired, Ref, Batch}; {NotExpired, Expired} -> {batch, Ref, NotExpired, Expired} @@ -1482,22 +1492,12 @@ is_async_return(_) -> false. sieve_expired_requests(Batch, Now) -> - {Expired, NotExpired} = - lists:partition( - fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> - is_expired(ExpireAt, Now) - end, - Batch - ), - case {NotExpired, Expired} of - {[], []} -> - %% Should be impossible for batch_size >= 1. - all_expired; - {[], [_ | _]} -> - all_expired; - {[_ | _], _} -> - {NotExpired, Expired} - end. + lists:partition( + fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) -> + not is_expired(ExpireAt, Now) + end, + Batch + ). -spec is_expired(infinity | integer(), integer()) -> boolean(). is_expired(infinity = _ExpireAt, _Now) -> From 3413af76be21a9be2dab6a0a6b07a01fbd681776 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 09:05:52 +0100 Subject: [PATCH 08/18] fix(emqx_misc): ensure flatten list for safe dir --- apps/emqx/src/emqx_misc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index fbeec8724..18ecc644a 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -720,4 +720,4 @@ pub_props_to_packet(Properties) -> safe_filename(Filename) when is_binary(Filename) -> binary:replace(Filename, <<":">>, <<"-">>, [global]); safe_filename(Filename) when is_list(Filename) -> - string:replace(Filename, ":", "-", all). + lists:flatten(string:replace(Filename, ":", "-", all)). From 356a94af30ec168f8c2a2d6a7360e7b446e14e26 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 09:47:34 +0100 Subject: [PATCH 09/18] fix(buffer_worker): ensure async flush message is sent This is a new issue introduced in the previous fix commits after handling the partial expiry correctly, the IsFullBefore check is no longer the state before the reply is received but the state after a partially-expired batch is shrinked. The fix is simple, move the check to the entry-point of where async reply callback enters, then send an async 'flush' notification regardless of the handling result. --- .../src/emqx_resource_buffer_worker.erl | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index ff3b67c7a..0a6adf3d6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1019,15 +1019,16 @@ do_handle_async_reply( ref => Ref, result => Result }), - + IsFullBefore = is_inflight_full(InflightTID), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) - end. + do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + end, + ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply( #{ @@ -1042,19 +1043,21 @@ handle_async_batch_reply( #{batch_or_query => Batch, ref => Ref} ), Now = now_(), + IsFullBefore = is_inflight_full(InflightTID), case sieve_expired_requests(Batch, Now) of {_NotExpired, []} -> %% this is the critical code path, %% we try not to do ets:lookup in this case %% because the batch can be quite big - do_handle_async_batch_reply(ReplyContext, Result); + ok = do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) - end. + ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end, + ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply2([], _, _, _) -> %% e.g. if the driver evaluates the callback more than once @@ -1063,7 +1066,6 @@ handle_async_batch_reply2([], _, _, _) -> handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, #{ - buffer_worker := Pid, resource_id := Id, worker_index := Index, inflight_tid := InflightTID, @@ -1088,15 +1090,15 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch - IsFullBefore = is_inflight_full(InflightTID), - IsAcked = ack_inflight(InflightTID, Ref, Id, Index), - IsFullBefore andalso IsAcked andalso ?MODULE:flush_worker(Pid); + _ = ack_inflight(InflightTID, Ref, Id, Index), + ok; _ -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) - end. + ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + end, + ok. do_handle_async_batch_reply( #{ @@ -1123,11 +1125,10 @@ do_handle_async_batch_reply( ok = mark_inflight_as_retriable(InflightTID, Ref), ?MODULE:block(Pid); ack -> - do_ack(InflightTID, Ref, Id, Index, PostFn, Pid, QueryOpts) + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. -do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> - IsFullBefore = is_inflight_full(InflightTID), +do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) -> IsKnownRef = ack_inflight(InflightTID, Ref, Id, Index), case maps:get(simple_query, QueryOpts, false) of true -> @@ -1137,9 +1138,18 @@ do_ack(InflightTID, Ref, Id, Index, PostFn, WorkerPid, QueryOpts) -> false -> ok end, - IsFullBefore andalso ?MODULE:flush_worker(WorkerPid), ok. +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> + %% inflight was not full before async reply is handled, + %% after it is handled, the inflight table must be even smaller + %% hance we can rely on the buffer worker's flush timer to trigger + %% the next flush + ok; +maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> + %% the inflight table was full before handling aync reply + ok = ?MODULE:flush_worker(self()). + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) -> From dbfdeec5e95613fca6f8e4f0e4a3c1bbd32951d4 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 12:40:44 +0100 Subject: [PATCH 10/18] fix(buffer_worker): log unknown async replies --- .../src/emqx_resource_buffer_worker.erl | 100 ++++++++++++++---- .../test/emqx_connector_demo.erl | 57 ++++++++-- .../test/emqx_resource_SUITE.erl | 64 ++++++++++- 3 files changed, 192 insertions(+), 29 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0a6adf3d6..77494f4ba 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -475,6 +475,7 @@ flush(Data0) -> ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), case {CurrentCount, IsFull} of {0, _} -> + ?tp(buffer_worker_queue_drained, #{inflight => inflight_num_batches(InflightTID)}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -918,7 +919,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, - query => minimize(Query) + min_query => minimize(Query) }, IsRetriable = false, WorkerMRef = undefined, @@ -951,7 +952,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re inflight_tid => InflightTID, request_ref => Ref, query_opts => QueryOpts, - batch => minimize(Batch) + min_batch => minimize(Batch) }, Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch @@ -967,19 +968,33 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re ). handle_async_reply( + #{ + request_ref := Ref, + inflight_tid := InflightTID + } = ReplyContext, + Result +) -> + case maybe_handle_unknown_async_reply(InflightTID, Ref) of + discard -> + ok; + continue -> + handle_async_reply1(ReplyContext, Result) + end. + +handle_async_reply1( #{ request_ref := Ref, inflight_tid := InflightTID, resource_id := Id, worker_index := Index, buffer_worker := Pid, - query := ?QUERY(_, _, _, ExpireAt) = _Query + min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, - #{batch_or_query => [_Query], ref => Ref} + #{batch_or_query => [_Query], ref => Ref, result => Result} ), Now = now_(), case is_expired(ExpireAt, Now) of @@ -1002,7 +1017,7 @@ do_handle_async_reply( worker_index := Index, buffer_worker := Pid, inflight_tid := InflightTID, - query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query }, Result ) -> @@ -1031,16 +1046,30 @@ do_handle_async_reply( ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply( + #{ + inflight_tid := InflightTID, + request_ref := Ref + } = ReplyContext, + Result +) -> + case maybe_handle_unknown_async_reply(InflightTID, Ref) of + discard -> + ok; + continue -> + handle_async_batch_reply1(ReplyContext, Result) + end. + +handle_async_batch_reply1( #{ inflight_tid := InflightTID, request_ref := Ref, - batch := Batch + min_batch := Batch } = ReplyContext, Result ) -> ?tp( handle_async_reply_enter, - #{batch_or_query => Batch, ref => Ref} + #{batch_or_query => Batch, ref => Ref, result => Result} ), Now = now_(), IsFullBefore = is_inflight_full(InflightTID), @@ -1060,8 +1089,7 @@ handle_async_batch_reply( ok = maybe_flush_after_async_reply(IsFullBefore). handle_async_batch_reply2([], _, _, _) -> - %% e.g. if the driver evaluates the callback more than once - %% which should really be a bug + %% should have caused the unknown_async_reply_discarded ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, @@ -1070,7 +1098,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, - batch := Batch + min_batch := Batch } = ReplyContext, %% All batch items share the same HasBeenSent flag %% So we just take the original flag from the ReplyContext batch @@ -1096,7 +1124,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - ok = do_handle_async_batch_reply(ReplyContext#{batch := RealNotExpired}, Result) + ok = do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) end, ok. @@ -1107,7 +1135,7 @@ do_handle_async_batch_reply( worker_index := Index, inflight_tid := InflightTID, request_ref := Ref, - batch := Batch, + min_batch := Batch, query_opts := QueryOpts }, Result @@ -1123,7 +1151,7 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ?MODULE:block(Pid); + ok = ?MODULE:block(Pid); ack -> ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. @@ -1150,6 +1178,32 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> %% the inflight table was full before handling aync reply ok = ?MODULE:flush_worker(self()). +%% check if the async reply is valid. +%% e.g. if a connector evaluates the callback more than once: +%% 1. If the request was previously deleted from inflight table due to +%% either succeeded previously or expired, this function logs a +%% warning message and returns 'discard' instruction. +%% 2. If the request was previously failed and now pending on a retry, +%% then this function will return 'continue' as there is no way to +%% tell if this reply is stae or not. +maybe_handle_unknown_async_reply(InflightTID, Ref) -> + try ets:member(InflightTID, Ref) of + true -> + %% NOTE: this does not mean the + continue; + false -> + ?tp( + warning, + unknown_async_reply_discarded, + #{inflight_key => Ref} + ), + discard + catch + error:badarg -> + %% shutdown ? + discard + end. + %%============================================================================== %% operations for queue queue_item_marshaller(Bin) when is_binary(Bin) -> @@ -1287,7 +1341,7 @@ inflight_append( InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), BatchSize = length(Batch), - IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), + IsNew andalso inc_inflight(InflightTID, BatchSize), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; @@ -1302,7 +1356,7 @@ inflight_append( Query = mark_as_sent(Query0), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), IsNew = ets:insert_new(InflightTID, InflightItem), - IsNew andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), + IsNew andalso inc_inflight(InflightTID, 1), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ?tp(buffer_worker_appended_to_inflight, #{item => InflightItem, is_new => IsNew}), ok; @@ -1318,6 +1372,8 @@ mark_inflight_as_retriable(undefined, _Ref) -> ok; mark_inflight_as_retriable(InflightTID, Ref) -> _ = ets:update_element(InflightTID, Ref, {?RETRY_IDX, true}), + %% the old worker's DOWN should not affect this inflight any more + _ = ets:update_element(InflightTID, Ref, {?WORKER_MREF_IDX, erased}), ok. %% Track each worker pid only once. @@ -1367,7 +1423,7 @@ ack_inflight(InflightTID, Ref, Id, Index) -> IsKnownRef = (Count > 0), case IsKnownRef of true -> - ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + ok = dec_inflight(InflightTID, Count), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); false -> ok @@ -1390,9 +1446,17 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), - _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -NumExpired, 0, 0}), + ok = dec_inflight(InflightTID, NumExpired), + ok. + +inc_inflight(InflightTID, Count) -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), + ok. + +dec_inflight(InflightTID, Count) when Count > 0 -> + _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 1d96fa083..a6b7b2339 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -135,11 +135,11 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> after 1000 -> {error, timeout} end; -on_query(_InstId, {sleep, For}, #{pid := Pid}) -> +on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => sync, for => For}), ReqRef = make_ref(), From = {self(), ReqRef}, - Pid ! {From, {sleep, For}}, + Pid ! {From, {sleep_before_reply, For}}, receive {ReqRef, Result} -> Result @@ -159,9 +159,9 @@ on_query_async(_InstId, block_now, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, {big_payload, Payload}, ReplyFun, #{pid := Pid}) -> Pid ! {big_payload, Payload, ReplyFun}, {ok, Pid}; -on_query_async(_InstId, {sleep, For}, ReplyFun, #{pid := Pid}) -> +on_query_async(_InstId, {sleep_before_reply, For}, ReplyFun, #{pid := Pid}) -> ?tp(connector_demo_sleep, #{mode => async, for => For}), - Pid ! {{sleep, For}, ReplyFun}, + Pid ! {{sleep_before_reply, For}, ReplyFun}, {ok, Pid}. on_batch_query(InstId, BatchReq, State) -> @@ -173,10 +173,13 @@ on_batch_query(InstId, BatchReq, State) -> get_counter -> batch_get_counter(sync, InstId, State); {big_payload, _Payload} -> - batch_big_payload(sync, InstId, BatchReq, State) + batch_big_payload(sync, InstId, BatchReq, State); + {random_reply, Num} -> + %% async batch retried + random_reply(Num) end. -on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> +on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -> %% Requests can be of multiple types, but cannot be mixed. case hd(BatchReq) of {inc_counter, _} -> @@ -186,7 +189,11 @@ on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> block_now -> on_query_async(InstId, block_now, ReplyFunAndArgs, State); {big_payload, _Payload} -> - batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State) + batch_big_payload({async, ReplyFunAndArgs}, InstId, BatchReq, State); + {random_reply, Num} -> + %% only take the first Num in the batch should be random enough + Pid ! {{random_reply, Num}, ReplyFunAndArgs}, + {ok, Pid} end. batch_inc_counter(CallMode, InstId, BatchReq, State) -> @@ -299,16 +306,31 @@ counter_loop( {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State; - {{sleep, _} = SleepQ, ReplyFun} -> + {{random_reply, RandNum}, ReplyFun} -> + %% usually a behaving connector should reply once and only once for + %% each (batch) request + %% but we try to reply random results a random number of times + %% with 'ok' in the result, the buffer worker should eventually + %% drain the buffer (and inflights table) + ReplyCount = 1 + (RandNum rem 3), + Results = random_replies(ReplyCount), + lists:foreach( + fun(Result) -> + apply_reply(ReplyFun, Result) + end, + Results + ), + State; + {{sleep_before_reply, _} = SleepQ, ReplyFun} -> apply_reply(ReplyFun, handle_query(async, SleepQ, Status)), State; - {{FromPid, ReqRef}, {sleep, _} = SleepQ} -> + {{FromPid, ReqRef}, {sleep_before_reply, _} = SleepQ} -> FromPid ! {ReqRef, handle_query(sync, SleepQ, Status)}, State end, counter_loop(NewState). -handle_query(Mode, {sleep, For} = Query, Status) -> +handle_query(Mode, {sleep_before_reply, For} = Query, Status) -> ok = timer:sleep(For), Result = case Status of @@ -329,3 +351,18 @@ maybe_register(_Name, _Pid, false) -> apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> apply(ReplyFun, Args ++ [Result]). + +random_replies(0) -> + []; +random_replies(N) -> + [random_reply(N) | random_replies(N - 1)]. + +random_reply(N) -> + case rand:uniform(3) of + 1 -> + {ok, N}; + 2 -> + {error, {recoverable_error, N}}; + 3 -> + {error, {unrecoverable_error, N}} + end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index bc146cc8e..1362cd1cc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1482,7 +1482,7 @@ t_retry_async_inflight_full(_Config) -> AsyncInflightWindow * 2, fun() -> For = (ResumeInterval div 4) + rand:uniform(ResumeInterval div 4), - {sleep, For} + {sleep_before_reply, For} end, #{async_reply_fun => {fun(Res) -> ct:pal("Res = ~p", [Res]) end, []}} ), @@ -1507,6 +1507,68 @@ t_retry_async_inflight_full(_Config) -> ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok. +%% this test case is to ensure the buffer worker will not go crazy even +%% if the underlying connector is misbehaving: evaluate async callbacks multiple times +t_async_reply_multi_eval(_Config) -> + ResumeInterval = 20, + AsyncInflightWindow = 5, + emqx_connector_demo:set_callback_mode(async_if_possible), + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => ?FUNCTION_NAME}, + #{ + query_mode => async, + async_inflight_window => AsyncInflightWindow, + batch_size => 3, + batch_time => 10, + worker_pool_size => 1, + resume_interval => ResumeInterval + } + ), + ?check_trace( + #{timetrap => 15_000}, + begin + %% block + ok = emqx_resource:simple_sync_query(?ID, block), + + {ok, {ok, _}} = + ?wait_async_action( + inc_counter_in_parallel( + AsyncInflightWindow * 2, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} + end, + #{} + ), + #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, + ResumeInterval * 200 + ), + ok + end, + [ + fun(Trace) -> + ?assertMatch([#{inflight := 0}], ?of_kind(buffer_worker_queue_drained, Trace)) + end + ] + ), + Metrics = tap_metrics(?LINE), + #{ + counters := Counters, + gauges := #{queuing := 0, inflight := 0} + } = Metrics, + #{ + matched := Matched, + success := Success, + dropped := Dropped, + late_reply := LateReply, + failed := Failed + } = Counters, + ?assertEqual(Matched, Success + Dropped + LateReply + Failed), + ok. + t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, emqx_connector_demo:set_callback_mode(async_if_possible), From 3a6dbbdd058efdf596e16750304522ee9e43c8eb Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 16:25:38 +0100 Subject: [PATCH 11/18] refactor(buffer_worker): ensure flsh message is never missed --- .../src/emqx_resource_buffer_worker.erl | 80 +++++++++++-------- .../test/emqx_connector_demo.erl | 2 + .../test/emqx_resource_SUITE.erl | 35 ++++---- 3 files changed, 67 insertions(+), 50 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 77494f4ba..e6fa1c537 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -70,6 +70,18 @@ -define(RETRY_IDX, 3). -define(WORKER_MREF_IDX, 4). +-define(ENSURE_ASYNC_FLUSH(InflightTID, EXPR), + (fun() -> + IsFullBefore = is_inflight_full(InflightTID), + case (EXPR) of + blocked -> + ok; + ok -> + maybe_flush_after_async_reply(IsFullBefore) + end + end)() +). + -type id() :: binary(). -type index() :: pos_integer(). -type expire_at() :: infinity | integer(). @@ -194,8 +206,8 @@ init({Id, Index, Opts}) -> ?tp(buffer_worker_init, #{id => Id, index => Index}), {ok, running, Data}. -running(enter, _, Data) -> - ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data)}), +running(enter, _, #{tref := _Tref} = Data) -> + ?tp(buffer_worker_enter_running, #{id => maps:get(id, Data), tref => _Tref}), %% According to `gen_statem' laws, we mustn't call `maybe_flush' %% directly because it may decide to return `{next_state, blocked, _}', %% and that's an invalid response for a state enter call. @@ -212,9 +224,8 @@ running(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data) -> handle_query_requests(Request0, Data); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); -running(internal, flush, St) -> - flush(St); running(info, {flush, _Ref}, _St) -> + ?tp(discarded_stale_flush, #{}), keep_state_and_data; running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) @@ -472,10 +483,15 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - ?tp(buffer_worker_flush, #{queue_count => CurrentCount, is_full => IsFull}), + InflightCount = inflight_num_batches(InflightTID), + ?tp(buffer_worker_flush, #{ + queued => CurrentCount, + is_inflight_full => IsFull, + inflight => InflightCount + }), case {CurrentCount, IsFull} of {0, _} -> - ?tp(buffer_worker_queue_drained, #{inflight => inflight_num_batches(InflightTID)}), + ?tp(buffer_worker_queue_drained, #{inflight => InflightCount}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -714,18 +730,18 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> end, Batch ), - {Action, PostFn1} = reply_caller_defer_metrics(Id, hd(Replies), QueryOpts), - PostFns = + {ShouldAck, PostFns} = lists:foldl( - fun(Reply, PostFns) -> - {_, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), - [PostFn | PostFns] + fun(Reply, {_ShouldAck, PostFns}) -> + %% _ShouldAck should be the same as ShouldAck starting from the second reply + {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), + {ShouldAck, [PostFn | PostFns]} end, - [PostFn1], - tl(Replies) + {ack, []}, + Replies ), PostFn = fun() -> lists:foreach(fun(F) -> F() end, lists:reverse(PostFns)) end, - {Action, PostFn}. + {ShouldAck, PostFn}. reply_caller(Id, Reply, QueryOpts) -> {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), @@ -978,7 +994,7 @@ handle_async_reply( discard -> ok; continue -> - handle_async_reply1(ReplyContext, Result) + ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_reply1(ReplyContext, Result)) end. handle_async_reply1( @@ -999,10 +1015,8 @@ handle_async_reply1( Now = now_(), case is_expired(ExpireAt, Now) of true -> - IsFullBefore = is_inflight_full(InflightTID), IsAcked = ack_inflight(InflightTID, Ref, Id, Index), IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), - IsFullBefore andalso ?MODULE:flush_worker(Pid), ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> @@ -1034,16 +1048,15 @@ do_handle_async_reply( ref => Ref, result => Result }), - IsFullBefore = is_inflight_full(InflightTID), case Action of nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ?MODULE:block(Pid); + ok = ?MODULE:block(Pid), + blocked; ack -> - do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) - end, - ok = maybe_flush_after_async_reply(IsFullBefore). + ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) + end. handle_async_batch_reply( #{ @@ -1056,7 +1069,7 @@ handle_async_batch_reply( discard -> ok; continue -> - handle_async_batch_reply1(ReplyContext, Result) + ?ENSURE_ASYNC_FLUSH(InflightTID, handle_async_batch_reply1(ReplyContext, Result)) end. handle_async_batch_reply1( @@ -1072,21 +1085,19 @@ handle_async_batch_reply1( #{batch_or_query => Batch, ref => Ref, result => Result} ), Now = now_(), - IsFullBefore = is_inflight_full(InflightTID), case sieve_expired_requests(Batch, Now) of {_NotExpired, []} -> %% this is the critical code path, %% we try not to do ets:lookup in this case %% because the batch can be quite big - ok = do_handle_async_batch_reply(ReplyContext, Result); + do_handle_async_batch_reply(ReplyContext, Result); {_NotExpired, _Expired} -> %% at least one is expired %% the batch from reply context is minimized, so it cannot be used %% to update the inflight items, hence discard Batch and lookup the RealBatch ?tp(handle_async_reply_expired, #{expired => _Expired}), - ok = handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) - end, - ok = maybe_flush_after_async_reply(IsFullBefore). + handle_async_batch_reply2(ets:lookup(InflightTID, Ref), ReplyContext, Result, Now) + end. handle_async_batch_reply2([], _, _, _) -> %% should have caused the unknown_async_reply_discarded @@ -1124,9 +1135,8 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% some queries are not expired, put them back to the inflight batch %% so it can be either acked now or retried later ok = update_inflight_item(InflightTID, Ref, RealNotExpired, NumExpired), - ok = do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) - end, - ok. + do_handle_async_batch_reply(ReplyContext#{min_batch := RealNotExpired}, Result) + end. do_handle_async_batch_reply( #{ @@ -1151,7 +1161,8 @@ do_handle_async_batch_reply( nack -> %% Keep retrying. ok = mark_inflight_as_retriable(InflightTID, Ref), - ok = ?MODULE:block(Pid); + ok = ?MODULE:block(Pid), + blocked; ack -> ok = do_async_ack(InflightTID, Ref, Id, Index, PostFn, QueryOpts) end. @@ -1173,9 +1184,11 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = false) -> %% after it is handled, the inflight table must be even smaller %% hance we can rely on the buffer worker's flush timer to trigger %% the next flush + ?tp(skip_flushing_worker, #{}), ok; maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> %% the inflight table was full before handling aync reply + ?tp(do_flushing_worker, #{}), ok = ?MODULE:flush_worker(self()). %% check if the async reply is valid. @@ -1189,7 +1202,6 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> maybe_handle_unknown_async_reply(InflightTID, Ref) -> try ets:member(InflightTID, Ref) of true -> - %% NOTE: this does not mean the continue; false -> ?tp( @@ -1446,7 +1458,7 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), ok = dec_inflight(InflightTID, NumExpired), ok. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index a6b7b2339..3b5f83d05 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -314,6 +314,8 @@ counter_loop( %% drain the buffer (and inflights table) ReplyCount = 1 + (RandNum rem 3), Results = random_replies(ReplyCount), + %% add a delay to trigger inflight full + timer:sleep(5), lists:foreach( fun(Result) -> apply_reply(ReplyFun, Result) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 1362cd1cc..dfe64de24 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1510,8 +1510,9 @@ t_retry_async_inflight_full(_Config) -> %% this test case is to ensure the buffer worker will not go crazy even %% if the underlying connector is misbehaving: evaluate async callbacks multiple times t_async_reply_multi_eval(_Config) -> - ResumeInterval = 20, - AsyncInflightWindow = 5, + ResumeInterval = 5, + TotalTime = 5_000, + AsyncInflightWindow = 3, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( ?ID, @@ -1528,29 +1529,31 @@ t_async_reply_multi_eval(_Config) -> } ), ?check_trace( - #{timetrap => 15_000}, + #{timetrap => 30_000}, begin %% block ok = emqx_resource:simple_sync_query(?ID, block), - {ok, {ok, _}} = - ?wait_async_action( - inc_counter_in_parallel( - AsyncInflightWindow * 2, - fun() -> - Rand = rand:uniform(1000), - {random_reply, Rand} - end, - #{} - ), - #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, - ResumeInterval * 200 + ?wait_async_action( + inc_counter_in_parallel( + AsyncInflightWindow * 5, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} + end, + #{} ), + #{?snk_kind := buffer_worker_flush, inflight := 0, queued := 0}, + TotalTime + ), ok end, [ fun(Trace) -> - ?assertMatch([#{inflight := 0}], ?of_kind(buffer_worker_queue_drained, Trace)) + ?assertMatch( + [#{inflight := 0} | _], + lists:reverse(?of_kind(buffer_worker_queue_drained, Trace)) + ) end ] ), From 7a6465e2cfdf8fc0892938c9a9266c1676b3165b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 21:00:39 +0100 Subject: [PATCH 12/18] fix(buffer_worker): ensure flush timer reset in blocked state --- .../src/emqx_resource_buffer_worker.erl | 20 +++++++++++++------ .../test/emqx_resource_SUITE.erl | 2 +- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index e6fa1c537..aaa07cf9a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -236,21 +236,24 @@ running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}), keep_state_and_data. -blocked(enter, _, #{resume_interval := ResumeT} = _St) -> +blocked(enter, _, #{resume_interval := ResumeT} = St0) -> ?tp(buffer_worker_enter_blocked, #{}), - {keep_state_and_data, {state_timeout, ResumeT, unblock}}; + %% discard the old timer, new timer will be started when entering running state again + St = cancel_flush_timer(St0), + {keep_state, St, {state_timeout, ResumeT, unblock}}; blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, resume, St) -> resume_from_blocked(St); -blocked(cast, flush, Data) -> - resume_from_blocked(Data); +blocked(cast, flush, St) -> + resume_from_blocked(St); blocked(state_timeout, unblock, St) -> resume_from_blocked(St); blocked(info, ?SEND_REQ(_ReplyTo, _Req) = Request0, Data0) -> Data = collect_and_enqueue_query_requests(Request0, Data0), {keep_state, Data}; blocked(info, {flush, _Ref}, _Data) -> + %% ignore stale timer keep_state_and_data; blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when is_map_key(Pid, AsyncWorkers0) @@ -622,6 +625,9 @@ do_flush( }), flush_worker(self()); false -> + ?tp(buffer_worker_queue_drained, #{ + inflight => inflight_num_batches(InflightTID) + }), ok end, {keep_state, Data1} @@ -700,6 +706,9 @@ do_flush(#{queue := Q1} = Data0, #{ Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> + ?tp(buffer_worker_queue_drained, #{ + inflight => inflight_num_batches(InflightTID) + }), Data1; {true, true} -> ?tp(buffer_worker_flush_ack_reflush, #{ @@ -1003,7 +1012,6 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, worker_index := Index, - buffer_worker := Pid, min_query := ?QUERY(_, _, _, ExpireAt) = _Query } = ReplyContext, Result @@ -1100,7 +1108,7 @@ handle_async_batch_reply1( end. handle_async_batch_reply2([], _, _, _) -> - %% should have caused the unknown_async_reply_discarded + %% this usually should never happen unless the async callback is being evaluated concurrently ok; handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> ?INFLIGHT_ITEM(_, RealBatch, _IsRetriable, _WorkerMRef) = Inflight, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index dfe64de24..e22ca7750 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1543,7 +1543,7 @@ t_async_reply_multi_eval(_Config) -> end, #{} ), - #{?snk_kind := buffer_worker_flush, inflight := 0, queued := 0}, + #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, TotalTime ), ok From a10dbba08416d428507013f0e72b9cefe32c5a67 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 23 Feb 2023 21:23:10 +0100 Subject: [PATCH 13/18] refactor(buffer_worker): less defensive on inflight counter decrement --- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index aaa07cf9a..601a77deb 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1440,10 +1440,10 @@ ack_inflight(InflightTID, Ref, Id, Index) -> [] -> 0 end, + ok = dec_inflight(InflightTID, Count), IsKnownRef = (Count > 0), case IsKnownRef of true -> - ok = dec_inflight(InflightTID, Count), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)); false -> ok @@ -1466,15 +1466,16 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) -> ok. %% used to update a batch after dropping expired individual queries. -update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) when NumExpired > 0 -> +update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> _ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}), - ok = dec_inflight(InflightTID, NumExpired), - ok. + ok = dec_inflight(InflightTID, NumExpired). inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), ok. +dec_inflight(_InflightTID, 0) -> + ok; dec_inflight(InflightTID, Count) when Count > 0 -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), ok. From c97d17cc919b9e1a0280eb8da3372691629ae075 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 24 Feb 2023 01:24:36 +0100 Subject: [PATCH 14/18] test: refactor to loop wait for counters --- .../src/emqx_resource_buffer_worker.erl | 15 ++-- .../test/emqx_connector_demo.erl | 14 ++-- .../test/emqx_resource_SUITE.erl | 79 +++++++++---------- 3 files changed, 52 insertions(+), 56 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 601a77deb..38de2dc34 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -77,7 +77,7 @@ blocked -> ok; ok -> - maybe_flush_after_async_reply(IsFullBefore) + ok = maybe_flush_after_async_reply(IsFullBefore) end end)() ). @@ -486,15 +486,14 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - InflightCount = inflight_num_batches(InflightTID), ?tp(buffer_worker_flush, #{ queued => CurrentCount, is_inflight_full => IsFull, - inflight => InflightCount + inflight => inflight_count(InflightTID) }), case {CurrentCount, IsFull} of {0, _} -> - ?tp(buffer_worker_queue_drained, #{inflight => InflightCount}), + ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -626,7 +625,7 @@ do_flush( flush_worker(self()); false -> ?tp(buffer_worker_queue_drained, #{ - inflight => inflight_num_batches(InflightTID) + inflight => inflight_count(InflightTID) }), ok end, @@ -707,7 +706,7 @@ do_flush(#{queue := Q1} = Data0, #{ case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> ?tp(buffer_worker_queue_drained, #{ - inflight => inflight_num_batches(InflightTID) + inflight => inflight_count(InflightTID) }), Data1; {true, true} -> @@ -1336,10 +1335,10 @@ is_inflight_full(InflightTID) -> [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages %% because one batch request may hold several messages. - Size = inflight_num_batches(InflightTID), + Size = inflight_count(InflightTID), Size >= MaxSize. -inflight_num_batches(InflightTID) -> +inflight_count(InflightTID) -> case ets:info(InflightTID, size) of undefined -> 0; Size -> max(0, Size - ?INFLIGHT_META_ROWS) diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 3b5f83d05..f41087b20 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -176,7 +176,7 @@ on_batch_query(InstId, BatchReq, State) -> batch_big_payload(sync, InstId, BatchReq, State); {random_reply, Num} -> %% async batch retried - random_reply(Num) + make_random_reply(Num) end. on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, #{pid := Pid} = State) -> @@ -313,11 +313,11 @@ counter_loop( %% with 'ok' in the result, the buffer worker should eventually %% drain the buffer (and inflights table) ReplyCount = 1 + (RandNum rem 3), - Results = random_replies(ReplyCount), + Results = make_random_replies(ReplyCount), %% add a delay to trigger inflight full - timer:sleep(5), lists:foreach( fun(Result) -> + timer:sleep(rand:uniform(5)), apply_reply(ReplyFun, Result) end, Results @@ -354,12 +354,12 @@ maybe_register(_Name, _Pid, false) -> apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) -> apply(ReplyFun, Args ++ [Result]). -random_replies(0) -> +make_random_replies(0) -> []; -random_replies(N) -> - [random_reply(N) | random_replies(N - 1)]. +make_random_replies(N) -> + [make_random_reply(N) | make_random_replies(N - 1)]. -random_reply(N) -> +make_random_reply(N) -> case rand:uniform(3) of 1 -> {ok, N}; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e22ca7750..9b1031c42 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1513,6 +1513,7 @@ t_async_reply_multi_eval(_Config) -> ResumeInterval = 5, TotalTime = 5_000, AsyncInflightWindow = 3, + TotalQueries = AsyncInflightWindow * 5, emqx_connector_demo:set_callback_mode(async_if_possible), {ok, _} = emqx_resource:create( ?ID, @@ -1528,49 +1529,33 @@ t_async_reply_multi_eval(_Config) -> resume_interval => ResumeInterval } ), - ?check_trace( - #{timetrap => 30_000}, - begin - %% block - ok = emqx_resource:simple_sync_query(?ID, block), - - ?wait_async_action( - inc_counter_in_parallel( - AsyncInflightWindow * 5, - fun() -> - Rand = rand:uniform(1000), - {random_reply, Rand} - end, - #{} - ), - #{?snk_kind := buffer_worker_queue_drained, inflight := 0}, - TotalTime - ), - ok + %% block + ok = emqx_resource:simple_sync_query(?ID, block), + inc_counter_in_parallel( + TotalQueries, + fun() -> + Rand = rand:uniform(1000), + {random_reply, Rand} end, - [ - fun(Trace) -> - ?assertMatch( - [#{inflight := 0} | _], - lists:reverse(?of_kind(buffer_worker_queue_drained, Trace)) - ) - end - ] + #{} ), - Metrics = tap_metrics(?LINE), - #{ - counters := Counters, - gauges := #{queuing := 0, inflight := 0} - } = Metrics, - #{ - matched := Matched, - success := Success, - dropped := Dropped, - late_reply := LateReply, - failed := Failed - } = Counters, - ?assertEqual(Matched, Success + Dropped + LateReply + Failed), - ok. + F = fun() -> + Metrics = tap_metrics(?LINE), + #{ + counters := Counters, + gauges := #{queuing := 0, inflight := 0} + } = Metrics, + #{ + matched := Matched, + success := Success, + dropped := Dropped, + late_reply := LateReply, + failed := Failed + } = Counters, + ?assertEqual(TotalQueries, Matched - 1), + ?assertEqual(Matched, Success + Dropped + LateReply + Failed) + end, + loop_wait(F, _Interval = 5, TotalTime). t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, @@ -2637,3 +2622,15 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ) ), ok. + +loop_wait(F, Interval, TotalTime) when Interval >= TotalTime -> + %% do it for the last time + F(); +loop_wait(F, Interval, TotalTime) -> + try + F() + catch + _:_ -> + timer:sleep(Interval), + loop_wait(F, Interval, TotalTime - Interval) + end. From de740a2fd9506ac2df2ac884ab5d7fe5171cb06e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 24 Feb 2023 15:03:03 +0300 Subject: [PATCH 15/18] ci: use official zookeeper image Former one ate almost all of my free memory for some reason. This one looks more predictable. --- .ci/docker-compose-file/docker-compose-kafka.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 976b0bc1c..63e74fa11 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -2,7 +2,7 @@ version: '3.9' services: zookeeper: - image: wurstmeister/zookeeper + image: docker.io/library/zookeeper:3.6 ports: - "2181:2181" container_name: zookeeper From 9cbe64a132d7cc82a27eac2f7c3aeda03a375bf6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 24 Feb 2023 15:05:20 +0300 Subject: [PATCH 16/18] fix(test): make strings json-friendly in kafka testsuite --- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index d06218397..9b38e98d3 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -268,7 +268,7 @@ kafka_bridge_rest_api_helper(Config) -> CreateBodyTmp = #{ <<"type">> => <<"kafka">>, <<"name">> => <<"my_kafka_bridge">>, - <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config), + <<"bootstrap_hosts">> => iolist_to_binary(maps:get(<<"bootstrap_hosts">>, Config)), <<"enable">> => true, <<"authentication">> => maps:get(<<"authentication">>, Config), <<"producer">> => #{ @@ -276,7 +276,7 @@ kafka_bridge_rest_api_helper(Config) -> topic => <<"t/#">> }, <<"kafka">> => #{ - <<"topic">> => erlang:list_to_binary(KafkaTopic), + <<"topic">> => iolist_to_binary(KafkaTopic), <<"buffer">> => #{ <<"memory_overload_protection">> => <<"false">> }, From 2b4e49e7df5d286141713fa2d15a0cbc65f619b4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 24 Feb 2023 15:06:49 +0300 Subject: [PATCH 17/18] fix(bufworker): handle replies of simple async queries Before that change, simple queries were treated as "retries" essentially, thus skipping all the reply processing there is. --- .../src/emqx_resource_buffer_worker.erl | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 38de2dc34..a8ae4454d 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -109,6 +109,7 @@ start_link(Id, Index, Opts) -> -spec sync_query(id(), request(), query_opts()) -> Result :: term(). sync_query(Id, Request, Opts0) -> + ?tp(sync_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, sync), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), @@ -118,6 +119,7 @@ sync_query(Id, Request, Opts0) -> -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts0) -> + ?tp(async_query, #{id => Id, request => Request, query_opts => Opts0}), Opts1 = ensure_timeout_query_opts(Opts0, async), Opts = ensure_expire_at(Opts1), PickKey = maps:get(pick_key, Opts, self()), @@ -133,6 +135,7 @@ simple_sync_query(Id, Request) -> %% call ends up calling buffering functions, that's a bug and %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. + ?tp(simple_sync_query, #{id => Id, request => Request}), Index = undefined, QueryOpts = simple_query_opts(), emqx_resource_metrics:matched_inc(Id), @@ -144,6 +147,7 @@ simple_sync_query(Id, Request) -> %% simple async-query the resource without batching and queuing. -spec simple_async_query(id(), request(), query_opts()) -> term(). simple_async_query(Id, Request, QueryOpts0) -> + ?tp(simple_async_query, #{id => Id, request => Request, query_opts => QueryOpts0}), Index = undefined, QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), @@ -877,7 +881,7 @@ handle_async_worker_down(Data0, Pid) -> {keep_state, Data}. call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> - ?tp(call_query_enter, #{id => Id, query => Query}), + ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); @@ -994,11 +998,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re handle_async_reply( #{ request_ref := Ref, - inflight_tid := InflightTID + inflight_tid := InflightTID, + query_opts := Opts } = ReplyContext, Result ) -> - case maybe_handle_unknown_async_reply(InflightTID, Ref) of + case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of discard -> ok; continue -> @@ -1068,11 +1073,12 @@ do_handle_async_reply( handle_async_batch_reply( #{ inflight_tid := InflightTID, - request_ref := Ref + request_ref := Ref, + query_opts := Opts } = ReplyContext, Result ) -> - case maybe_handle_unknown_async_reply(InflightTID, Ref) of + case maybe_handle_unknown_async_reply(InflightTID, Ref, Opts) of discard -> ok; continue -> @@ -1206,7 +1212,9 @@ maybe_flush_after_async_reply(_WasFullBeforeReplyHandled = true) -> %% 2. If the request was previously failed and now pending on a retry, %% then this function will return 'continue' as there is no way to %% tell if this reply is stae or not. -maybe_handle_unknown_async_reply(InflightTID, Ref) -> +maybe_handle_unknown_async_reply(undefined, _Ref, #{simple_query := true}) -> + continue; +maybe_handle_unknown_async_reply(InflightTID, Ref, #{}) -> try ets:member(InflightTID, Ref) of true -> continue; From c883e4b36a1246ef8dcd07bfee64e64550af4880 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 24 Feb 2023 18:16:35 +0300 Subject: [PATCH 18/18] test: drop custom `loop_wait` in favor of snabkaffe's `?retry` --- .../test/emqx_resource_SUITE.erl | 49 ++++++++----------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 9b1031c42..af72e86f9 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1539,23 +1539,26 @@ t_async_reply_multi_eval(_Config) -> end, #{} ), - F = fun() -> - Metrics = tap_metrics(?LINE), - #{ - counters := Counters, - gauges := #{queuing := 0, inflight := 0} - } = Metrics, - #{ - matched := Matched, - success := Success, - dropped := Dropped, - late_reply := LateReply, - failed := Failed - } = Counters, - ?assertEqual(TotalQueries, Matched - 1), - ?assertEqual(Matched, Success + Dropped + LateReply + Failed) - end, - loop_wait(F, _Interval = 5, TotalTime). + ?retry( + ResumeInterval, + TotalTime div ResumeInterval, + begin + Metrics = tap_metrics(?LINE), + #{ + counters := Counters, + gauges := #{queuing := 0, inflight := 0} + } = Metrics, + #{ + matched := Matched, + success := Success, + dropped := Dropped, + late_reply := LateReply, + failed := Failed + } = Counters, + ?assertEqual(TotalQueries, Matched - 1), + ?assertEqual(Matched, Success + Dropped + LateReply + Failed) + end + ). t_retry_async_inflight_batch(_Config) -> ResumeInterval = 1_000, @@ -2622,15 +2625,3 @@ assert_async_retry_fail_then_succeed_inflight(Trace) -> ) ), ok. - -loop_wait(F, Interval, TotalTime) when Interval >= TotalTime -> - %% do it for the last time - F(); -loop_wait(F, Interval, TotalTime) -> - try - F() - catch - _:_ -> - timer:sleep(Interval), - loop_wait(F, Interval, TotalTime - Interval) - end.