diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 26f58ff69..f7e861f6b 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -33,7 +33,7 @@ {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, - {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}} + {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} ]}. {plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2e2cd5631..266b8df69 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -482,14 +482,16 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - ?tp(buffer_worker_flush, #{ + ?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{ queued => CurrentCount, is_inflight_full => IsFull, inflight => inflight_count(InflightTID) }), case {CurrentCount, IsFull} of {0, _} -> - ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}), + ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ + inflight => inflight_count(InflightTID) + }), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -620,7 +622,7 @@ do_flush( }), flush_worker(self()); false -> - ?tp(buffer_worker_queue_drained, #{ + ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), ok @@ -701,7 +703,7 @@ do_flush(#{queue := Q1} = Data0, #{ Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> - ?tp(buffer_worker_queue_drained, #{ + ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), Data1; @@ -1279,13 +1281,10 @@ append_queue(Id, Index, Q, Queries) -> %% the inflight queue for async query -define(MAX_SIZE_REF, max_size). -define(SIZE_REF, size). +-define(BATCH_COUNT_REF, batch_count). -define(INITIAL_TIME_REF, initial_time). -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). -%% NOTE -%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝ --define(INFLIGHT_META_ROWS, 4). - inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( emqx_resource_buffer_worker_inflight_tab, @@ -1295,6 +1294,7 @@ inflight_new(InfltWinSZ, Id, Index) -> %% we use this counter because we might deal with batches as %% elements. inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), + inflight_append(TableId, {?BATCH_COUNT_REF, 0}, Id, Index), inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), inflight_append( TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index @@ -1344,10 +1344,7 @@ is_inflight_full(InflightTID) -> Size >= MaxSize. inflight_count(InflightTID) -> - case ets:info(InflightTID, size) of - undefined -> 0; - Size -> max(0, Size - ?INFLIGHT_META_ROWS) - end. + emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0). inflight_num_msgs(InflightTID) -> [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), @@ -1476,12 +1473,14 @@ update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), + _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}), ok. dec_inflight(_InflightTID, 0) -> ok; dec_inflight(InflightTID, Count) when Count > 0 -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e098c2e1c..b861e1be2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2327,7 +2327,7 @@ t_expiration_retry(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(single). + do_t_expiration_retry(). t_expiration_retry_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), @@ -2344,9 +2344,9 @@ t_expiration_retry_batch(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(batch). + do_t_expiration_retry(). -do_t_expiration_retry(IsBatch) -> +do_t_expiration_retry() -> ResumeInterval = 300, ?check_trace( begin @@ -2399,15 +2399,10 @@ do_t_expiration_retry(IsBatch) -> ResumeInterval * 10 ), - SuccessEventKind = - case IsBatch of - batch -> buffer_worker_retry_inflight_succeeded; - single -> buffer_worker_flush_ack - end, {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := SuccessEventKind}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 5 ), diff --git a/changes/ce/perf-10573.en.md b/changes/ce/perf-10573.en.md new file mode 100644 index 000000000..d01cb9733 --- /dev/null +++ b/changes/ce/perf-10573.en.md @@ -0,0 +1,2 @@ +Improved performance of Webhook bridge when using synchronous query mode. +This also should improve the performance of other bridges when they are configured with no batching. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl index 2e3510aed..fa696c185 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl @@ -524,7 +524,7 @@ t_write_failure(Config) -> send_message(Config, SentData) end, #{?snk_kind := buffer_worker_flush_nack}, - 1_000 + 10_000 ) end), fun(Trace0) -> diff --git a/mix.exs b/mix.exs index 41a35e0e7..9c07fbbbf 100644 --- a/mix.exs +++ b/mix.exs @@ -71,7 +71,7 @@ defmodule EMQXUmbrella.MixProject do {:telemetry, "1.1.0"}, # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, - {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true}, + {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, {:hocon, github: "emqx/hocon", tag: "0.38.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, diff --git a/rebar.config b/rebar.config index 267f60ee4..ba91b760e 100644 --- a/rebar.config +++ b/rebar.config @@ -74,7 +74,7 @@ , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.38.2"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}