From 8aa7c014e750e9a594cd9fc4aeaf9de8c605b603 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 2 May 2023 11:16:48 -0300 Subject: [PATCH] perf(buffer_worker): avoid calling `ets:info/2` (Almost?) fixes https://emqx.atlassian.net/browse/EMQX-9637 During the course of performance tests comparing the performance of e5.0.3 and e4.4.16 regarding the webhook bridge in sync mode, we observed that the throughput in e5.0.3 (sync) was much lower than in e4.4.16: ~ 9 k msgs / s vs. ~ 50 k msgs / s, respectively. Analyzing `observer_cli` output, we noticed that a lot of the time both buffer workers and ehttpc processes was spent in `ets:info/2`. That function was called to check the size of the inflight table when updating metrics and checking if the inflight table was full. Other uses of `ets:info/2` were contained inside the arguments to some `?tp/2` macro usages (https://github.com/kafka4beam/snabbkaffe/pull/60). By using a specific record to track the size of the table, we managed to improve the bridge performance to ~ 45 k msgs / s in sync mode. --- apps/emqx/rebar.config | 2 +- .../src/emqx_resource_buffer_worker.erl | 23 +++++++++---------- .../test/emqx_resource_SUITE.erl | 13 ++++------- changes/ce/perf-10573.en.md | 2 ++ .../test/emqx_ee_bridge_cassa_SUITE.erl | 2 +- mix.exs | 2 +- rebar.config | 2 +- 7 files changed, 21 insertions(+), 25 deletions(-) create mode 100644 changes/ce/perf-10573.en.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 26f58ff69..f7e861f6b 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -33,7 +33,7 @@ {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, - {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}} + {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} ]}. {plugins, [{rebar3_proper, "0.12.1"}, rebar3_path_deps]}. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2e2cd5631..266b8df69 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -482,14 +482,16 @@ flush(Data0) -> Data1 = cancel_flush_timer(Data0), CurrentCount = queue_count(Q0), IsFull = is_inflight_full(InflightTID), - ?tp(buffer_worker_flush, #{ + ?tp_ignore_side_effects_in_prod(buffer_worker_flush, #{ queued => CurrentCount, is_inflight_full => IsFull, inflight => inflight_count(InflightTID) }), case {CurrentCount, IsFull} of {0, _} -> - ?tp(buffer_worker_queue_drained, #{inflight => inflight_count(InflightTID)}), + ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ + inflight => inflight_count(InflightTID) + }), {keep_state, Data1}; {_, true} -> ?tp(buffer_worker_flush_but_inflight_full, #{}), @@ -620,7 +622,7 @@ do_flush( }), flush_worker(self()); false -> - ?tp(buffer_worker_queue_drained, #{ + ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), ok @@ -701,7 +703,7 @@ do_flush(#{queue := Q1} = Data0, #{ Data2 = case {CurrentCount > 0, CurrentCount >= BatchSize} of {false, _} -> - ?tp(buffer_worker_queue_drained, #{ + ?tp_ignore_side_effects_in_prod(buffer_worker_queue_drained, #{ inflight => inflight_count(InflightTID) }), Data1; @@ -1279,13 +1281,10 @@ append_queue(Id, Index, Q, Queries) -> %% the inflight queue for async query -define(MAX_SIZE_REF, max_size). -define(SIZE_REF, size). +-define(BATCH_COUNT_REF, batch_count). -define(INITIAL_TIME_REF, initial_time). -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). -%% NOTE -%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝ --define(INFLIGHT_META_ROWS, 4). - inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( emqx_resource_buffer_worker_inflight_tab, @@ -1295,6 +1294,7 @@ inflight_new(InfltWinSZ, Id, Index) -> %% we use this counter because we might deal with batches as %% elements. inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), + inflight_append(TableId, {?BATCH_COUNT_REF, 0}, Id, Index), inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), inflight_append( TableId, {?INITIAL_MONOTONIC_TIME_REF, make_request_ref()}, Id, Index @@ -1344,10 +1344,7 @@ is_inflight_full(InflightTID) -> Size >= MaxSize. inflight_count(InflightTID) -> - case ets:info(InflightTID, size) of - undefined -> 0; - Size -> max(0, Size - ?INFLIGHT_META_ROWS) - end. + emqx_utils_ets:lookup_value(InflightTID, ?BATCH_COUNT_REF, 0). inflight_num_msgs(InflightTID) -> [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), @@ -1476,12 +1473,14 @@ update_inflight_item(InflightTID, Ref, NewBatch, NumExpired) -> inc_inflight(InflightTID, Count) -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, Count}), + _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, 1}), ok. dec_inflight(_InflightTID, 0) -> ok; dec_inflight(InflightTID, Count) when Count > 0 -> _ = ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + _ = ets:update_counter(InflightTID, ?BATCH_COUNT_REF, {2, -1, 0, 0}), ok. %%============================================================================== diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e098c2e1c..b861e1be2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2327,7 +2327,7 @@ t_expiration_retry(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(single). + do_t_expiration_retry(). t_expiration_retry_batch(_Config) -> emqx_connector_demo:set_callback_mode(always_sync), @@ -2344,9 +2344,9 @@ t_expiration_retry_batch(_Config) -> resume_interval => 300 } ), - do_t_expiration_retry(batch). + do_t_expiration_retry(). -do_t_expiration_retry(IsBatch) -> +do_t_expiration_retry() -> ResumeInterval = 300, ?check_trace( begin @@ -2399,15 +2399,10 @@ do_t_expiration_retry(IsBatch) -> ResumeInterval * 10 ), - SuccessEventKind = - case IsBatch of - batch -> buffer_worker_retry_inflight_succeeded; - single -> buffer_worker_flush_ack - end, {ok, {ok, _}} = ?wait_async_action( emqx_resource:simple_sync_query(?ID, resume), - #{?snk_kind := SuccessEventKind}, + #{?snk_kind := buffer_worker_retry_inflight_succeeded}, ResumeInterval * 5 ), diff --git a/changes/ce/perf-10573.en.md b/changes/ce/perf-10573.en.md new file mode 100644 index 000000000..d01cb9733 --- /dev/null +++ b/changes/ce/perf-10573.en.md @@ -0,0 +1,2 @@ +Improved performance of Webhook bridge when using synchronous query mode. +This also should improve the performance of other bridges when they are configured with no batching. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl index 2e3510aed..fa696c185 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl @@ -524,7 +524,7 @@ t_write_failure(Config) -> send_message(Config, SentData) end, #{?snk_kind := buffer_worker_flush_nack}, - 1_000 + 10_000 ) end), fun(Trace0) -> diff --git a/mix.exs b/mix.exs index 41a35e0e7..9c07fbbbf 100644 --- a/mix.exs +++ b/mix.exs @@ -71,7 +71,7 @@ defmodule EMQXUmbrella.MixProject do {:telemetry, "1.1.0"}, # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, - {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.7", override: true}, + {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, {:hocon, github: "emqx/hocon", tag: "0.38.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, diff --git a/rebar.config b/rebar.config index 267f60ee4..ba91b760e 100644 --- a/rebar.config +++ b/rebar.config @@ -74,7 +74,7 @@ , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} - , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.7"}}} + , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.38.2"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}