From 4de13d280079725d3822883b55f5eaff83ea6a1f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 14 Apr 2023 09:31:33 -0300 Subject: [PATCH 1/5] feat(buffer_worker): change default max queue bytes to 256 MB --- apps/emqx_resource/include/emqx_resource.hrl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 7f88e1440..904eeffa5 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -88,8 +88,8 @@ -define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024). -define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>). --define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). --define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). +-define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024). +-define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>). -define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). From 14ed4a7adae564497719e1fc3bc66ec504beeb82 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 14 Apr 2023 10:05:20 -0300 Subject: [PATCH 2/5] feat(buffer_worker): set default queue mode to `memory_only` Fixes https://emqx.atlassian.net/browse/EMQX-9367 For better user experience and performance for the average bridge, we should change the default queue mode to `memory_only`, as was the behavior of most bridges in e4.x. This leads to better performance when message rate is high enough and the remote resource is not keeping up with EMQX. Also, we set the default segment size to equal max queue bytes. --- apps/emqx_resource/include/emqx_resource.hrl | 3 - .../src/emqx_resource_buffer_worker.erl | 43 ++++--- .../src/schema/emqx_resource_schema.erl | 17 ++- .../test/emqx_resource_SUITE.erl | 112 ++++++++++++++++++ changes/ce/feat-10404.en.md | 2 + rel/i18n/emqx_resource_schema.hocon | 38 ++++++ 6 files changed, 196 insertions(+), 19 deletions(-) create mode 100644 changes/ce/feat-10404.en.md diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 904eeffa5..f8d671b40 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -85,9 +85,6 @@ -define(WORKER_POOL_SIZE, 16). --define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024). --define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>). - -define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0fa4c0bd8..c53627ca4 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -178,20 +178,7 @@ init({Id, Index, Opts}) -> process_flag(trap_exit, true), true = gproc_pool:connect_worker(Id, {Id, Index}), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), - SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), - TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), - SegBytes = min(SegBytes0, TotalBytes), - QueueOpts = - #{ - dir => disk_queue_dir(Id, Index), - marshaller => fun ?MODULE:queue_item_marshaller/1, - max_total_bytes => TotalBytes, - %% we don't want to retain the queue after - %% resource restarts. - offload => {true, volatile}, - seg_bytes => SegBytes, - sizer => fun ?MODULE:estimate_size/1 - }, + QueueOpts = replayq_opts(Id, Index, Opts), Queue = replayq:open(QueueOpts), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), @@ -214,7 +201,7 @@ init({Id, Index, Opts}) -> resume_interval => ResumeInterval, tref => undefined }, - ?tp(buffer_worker_init, #{id => Id, index => Index}), + ?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}), {ok, running, Data}. running(enter, _, #{tref := _Tref} = Data) -> @@ -1679,6 +1666,32 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. +replayq_opts(Id, Index, Opts) -> + QueueMode = maps:get(queue_mode, Opts, memory_only), + TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + case QueueMode of + memory_only -> + #{ + mem_only => true, + marshaller => fun ?MODULE:queue_item_marshaller/1, + max_total_bytes => TotalBytes, + sizer => fun ?MODULE:estimate_size/1 + }; + volatile_offload -> + SegBytes0 = maps:get(queue_seg_bytes, Opts, TotalBytes), + SegBytes = min(SegBytes0, TotalBytes), + #{ + dir => disk_queue_dir(Id, Index), + marshaller => fun ?MODULE:queue_item_marshaller/1, + max_total_bytes => TotalBytes, + %% we don't want to retain the queue after + %% resource restarts. + offload => {true, volatile}, + seg_bytes => SegBytes, + sizer => fun ?MODULE:estimate_size/1 + } + end. + %% The request timeout should be greater than the resume interval, as %% it defines how often the buffer worker tries to unblock. If request %% timeout is <= resume interval and the buffer worker is ever diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 647a40fed..9116927fa 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -40,6 +40,7 @@ fields("resource_opts") -> ]; fields("creation_opts") -> [ + {queue_mode, fun queue_mode/1}, {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, {resume_interval, fun resume_interval/1}, @@ -53,7 +54,8 @@ fields("creation_opts") -> {batch_size, fun batch_size/1}, {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, - {max_queue_bytes, fun max_queue_bytes/1} + {max_queue_bytes, fun max_queue_bytes/1}, + {queue_seg_bytes, fun queue_seg_bytes/1} ]. resource_opts_meta() -> @@ -149,4 +151,17 @@ max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; max_queue_bytes(required) -> false; max_queue_bytes(_) -> undefined. +queue_mode(type) -> enum([memory_only, volatile_offload]); +queue_mode(desc) -> ?DESC("queue_mode"); +queue_mode(default) -> memory_only; +queue_mode(required) -> false; +queue_mode(importance) -> ?IMPORTANCE_HIDDEN; +queue_mode(_) -> undefined. + +queue_seg_bytes(type) -> emqx_schema:bytesize(); +queue_seg_bytes(desc) -> ?DESC("queue_seg_bytes"); +queue_seg_bytes(required) -> false; +queue_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN; +queue_seg_bytes(_) -> undefined. + desc("creation_opts") -> ?DESC("creation_opts"). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5d8e85697..087a39eee 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1314,6 +1314,7 @@ t_delete_and_re_create_with_same_name(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => NumBufferWorkers, + queue_mode => volatile_offload, queue_seg_bytes => 100, resume_interval => 1_000 } @@ -2639,6 +2640,117 @@ t_call_mode_uncoupled_from_query_mode(_Config) -> end ). +%% The default mode is currently `memory_only'. +t_volatile_offload_mode(_Config) -> + MaxQueueBytes = 1_000, + DefaultOpts = #{ + max_queue_bytes => MaxQueueBytes, + worker_pool_size => 1 + }, + ?check_trace( + begin + emqx_connector_demo:set_callback_mode(async_if_possible), + %% Create without any specified segment bytes; should + %% default to equal max bytes. + ?assertMatch( + {ok, _}, + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{queue_mode => volatile_offload} + ) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + + %% Create with segment bytes < max bytes + ?assertMatch( + {ok, _}, + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{ + queue_mode => volatile_offload, + queue_seg_bytes => MaxQueueBytes div 2 + } + ) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + %% Create with segment bytes = max bytes + ?assertMatch( + {ok, _}, + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{ + queue_mode => volatile_offload, + queue_seg_bytes => MaxQueueBytes + } + ) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + + %% Create with segment bytes > max bytes; should normalize + %% to max bytes. + ?assertMatch( + {ok, _}, + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + DefaultOpts#{ + queue_mode => volatile_offload, + queue_seg_bytes => 2 * MaxQueueBytes + } + ) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + + ok + end, + fun(Trace) -> + HalfMaxQueueBytes = MaxQueueBytes div 2, + ?assertMatch( + [ + #{ + dir := _, + max_total_bytes := MaxTotalBytes, + seg_bytes := MaxTotalBytes, + offload := {true, volatile} + }, + #{ + dir := _, + max_total_bytes := MaxTotalBytes, + %% uses the specified value since it's smaller + %% than max bytes. + seg_bytes := HalfMaxQueueBytes, + offload := {true, volatile} + }, + #{ + dir := _, + max_total_bytes := MaxTotalBytes, + seg_bytes := MaxTotalBytes, + offload := {true, volatile} + }, + #{ + dir := _, + max_total_bytes := MaxTotalBytes, + seg_bytes := MaxTotalBytes, + offload := {true, volatile} + } + ], + ?projection(queue_opts, ?of_kind(buffer_worker_init, Trace)) + ), + ok + end + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/feat-10404.en.md b/changes/ce/feat-10404.en.md new file mode 100644 index 000000000..ad216336e --- /dev/null +++ b/changes/ce/feat-10404.en.md @@ -0,0 +1,2 @@ +Change the default queue mode for buffer workers to `memory_only`. +Before this change, the default queue mode was `volatile_offload`. When under high message rate pressure and when the resource is not keeping up with such rate, the buffer performance degraded a lot due to the constant disk operations. diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index c73f8b1aa..7b693c256 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -179,5 +179,43 @@ When disabled the messages are buffered in RAM only.""" } } + queue_seg_bytes { + desc { + en: "Applicable when buffer mode is set to volatile_offload.\n" + "This value is to specify the size of each on-disk buffer file." + zh: "当缓存模式是 volatile_offload 时适用。" + "该配置用于指定缓存到磁盘上的文件的大小。" + } + label { + en: "Segment File Bytes" + zh: "缓存文件大小" + } + } + + queue_mode { + desc { + en: "Queue operation mode.\n" + "\n" + "memory_only: Buffer all messages in memory." + " The messages will be lost in case of EMQX node restart.\n" + "volatile_offload: Buffer message in memory first, when up to certain limit" + " (see queue_seg_bytes config for more information), then start offloading" + " messages to disk, Like memory_only mode, the messages will be lost in case of" + " EMQX node restart." + zh: "队列操作模式。\n" + "\n" + "memory_only: 所有的消息都缓存在内存里。" + " 如果 EMQX 服务重启,缓存的消息会丢失。\n" + "hybrid: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" + "(配置项 queue_seg_bytes 描述了该限制)后," + " 后续的消息会缓存到磁盘上。与 memory_only 模式一样," + " 如果 EMQX 服务重启,缓存的消息会丢失。" + } + label { + en: "Queue Mode" + zh: "排队模式" + } + } + } From dd381227972f0965d680bd4d8872f6f15765f70a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 14 Apr 2023 11:22:04 -0300 Subject: [PATCH 3/5] docs: improve descriptions Co-authored-by: Zaiming (Stone) Shi --- rel/i18n/emqx_resource_schema.hocon | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 7b693c256..885a6c4dd 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -195,25 +195,18 @@ When disabled the messages are buffered in RAM only.""" queue_mode { desc { en: "Queue operation mode.\n" - "\n" "memory_only: Buffer all messages in memory." - " The messages will be lost in case of EMQX node restart.\n" "volatile_offload: Buffer message in memory first, when up to certain limit" - " (see queue_seg_bytes config for more information), then start offloading" - " messages to disk, Like memory_only mode, the messages will be lost in case of" - " EMQX node restart." + " (see buffer_seg_bytes config for more information), then start offloading messages to disk" zh: "队列操作模式。\n" - "\n" "memory_only: 所有的消息都缓存在内存里。" - " 如果 EMQX 服务重启,缓存的消息会丢失。\n" - "hybrid: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" - "(配置项 queue_seg_bytes 描述了该限制)后," - " 后续的消息会缓存到磁盘上。与 memory_only 模式一样," - " 如果 EMQX 服务重启,缓存的消息会丢失。" + "volatile_offload: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" + "(配置项 buffer_seg_bytes 该限制)后," + " 消息会缓存到磁盘上" } label { - en: "Queue Mode" - zh: "排队模式" + en: "Buffer Mode" + zh: "缓存模式" } } From e073bc90bc76d3bea1387a0f4196a3fe6b23ac9a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 14 Apr 2023 11:29:52 -0300 Subject: [PATCH 4/5] refactor(buffer_worker): rename `s/queue/buffer/g` --- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 +- .../schema/emqx_bridge_compatible_config.erl | 2 +- .../test/emqx_bridge_webhook_SUITE.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 6 +-- .../src/emqx_resource_buffer_worker.erl | 8 ++-- .../src/schema/emqx_resource_schema.erl | 39 ++++++++++--------- .../test/emqx_resource_SUITE.erl | 30 +++++++------- .../src/emqx_ee_bridge_cassa.erl | 2 +- .../src/emqx_ee_bridge_clickhouse.erl | 2 +- .../src/emqx_ee_bridge_dynamo.erl | 2 +- .../src/emqx_ee_bridge_mysql.erl | 2 +- .../src/emqx_ee_bridge_pgsql.erl | 2 +- .../src/emqx_ee_bridge_rocketmq.erl | 2 +- .../src/emqx_ee_bridge_sqlserver.erl | 2 +- .../src/emqx_ee_bridge_tdengine.erl | 2 +- rel/i18n/emqx_resource_schema.hocon | 8 ++-- scripts/test/influx/influx-bridge.conf | 2 +- 17 files changed, 59 insertions(+), 58 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index b29cefacd..137cd1241 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -220,7 +220,7 @@ info_example_basic(webhook) -> auto_restart_interval => 15000, query_mode => async, inflight_window => 100, - max_queue_bytes => 100 * 1024 * 1024 + max_buffer_bytes => 100 * 1024 * 1024 } }; info_example_basic(mqtt) -> @@ -245,7 +245,7 @@ mqtt_main_example() -> health_check_interval => <<"15s">>, auto_restart_interval => <<"60s">>, query_mode => sync, - max_queue_bytes => 100 * 1024 * 1024 + max_buffer_bytes => 100 * 1024 * 1024 }, ssl => #{ enable => false diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index fe173fa89..595b75ecf 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -89,7 +89,7 @@ default_resource_opts() -> <<"inflight_window">> => 100, <<"auto_restart_interval">> => <<"60s">>, <<"health_check_interval">> => <<"15s">>, - <<"max_queue_bytes">> => <<"1GB">>, + <<"max_buffer_bytes">> => <<"1GB">>, <<"query_mode">> => <<"sync">>, %% there is only one underlying MQTT connection %% doesn't make a lot of sense to have a large pool diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index e222190d2..f08c87b6e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -175,7 +175,7 @@ bridge_async_config(#{port := Port} = Config) -> " inflight_window = 100\n" " auto_restart_interval = \"60s\"\n" " health_check_interval = \"15s\"\n" - " max_queue_bytes = \"1GB\"\n" + " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" " request_timeout = \"~s\"\n" " start_after_created = \"true\"\n" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index f8d671b40..91572eac3 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -70,7 +70,7 @@ auto_restart_interval => pos_integer(), batch_size => pos_integer(), batch_time => pos_integer(), - max_queue_bytes => pos_integer(), + max_buffer_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), inflight_window => pos_integer() @@ -85,8 +85,8 @@ -define(WORKER_POOL_SIZE, 16). --define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024). --define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>). +-define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). +-define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). -define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index c53627ca4..a77335140 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1667,9 +1667,9 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> BatchTime. replayq_opts(Id, Index, Opts) -> - QueueMode = maps:get(queue_mode, Opts, memory_only), - TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), - case QueueMode of + BufferMode = maps:get(buffer_mode, Opts, memory_only), + TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES), + case BufferMode of memory_only -> #{ mem_only => true, @@ -1678,7 +1678,7 @@ replayq_opts(Id, Index, Opts) -> sizer => fun ?MODULE:estimate_size/1 }; volatile_offload -> - SegBytes0 = maps:get(queue_seg_bytes, Opts, TotalBytes), + SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes), SegBytes = min(SegBytes0, TotalBytes), #{ dir => disk_queue_dir(Id, Index), diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 9116927fa..3b4fb66e5 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -40,7 +40,7 @@ fields("resource_opts") -> ]; fields("creation_opts") -> [ - {queue_mode, fun queue_mode/1}, + {buffer_mode, fun buffer_mode/1}, {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, {resume_interval, fun resume_interval/1}, @@ -54,8 +54,8 @@ fields("creation_opts") -> {batch_size, fun batch_size/1}, {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, - {max_queue_bytes, fun max_queue_bytes/1}, - {queue_seg_bytes, fun queue_seg_bytes/1} + {max_buffer_bytes, fun max_buffer_bytes/1}, + {buffer_seg_bytes, fun buffer_seg_bytes/1} ]. resource_opts_meta() -> @@ -145,23 +145,24 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW; batch_time(required) -> false; batch_time(_) -> undefined. -max_queue_bytes(type) -> emqx_schema:bytesize(); -max_queue_bytes(desc) -> ?DESC("max_queue_bytes"); -max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; -max_queue_bytes(required) -> false; -max_queue_bytes(_) -> undefined. +max_buffer_bytes(type) -> emqx_schema:bytesize(); +max_buffer_bytes(aliases) -> [max_queue_bytes]; +max_buffer_bytes(desc) -> ?DESC("max_buffer_bytes"); +max_buffer_bytes(default) -> ?DEFAULT_BUFFER_BYTES_RAW; +max_buffer_bytes(required) -> false; +max_buffer_bytes(_) -> undefined. -queue_mode(type) -> enum([memory_only, volatile_offload]); -queue_mode(desc) -> ?DESC("queue_mode"); -queue_mode(default) -> memory_only; -queue_mode(required) -> false; -queue_mode(importance) -> ?IMPORTANCE_HIDDEN; -queue_mode(_) -> undefined. +buffer_mode(type) -> enum([memory_only, volatile_offload]); +buffer_mode(desc) -> ?DESC("buffer_mode"); +buffer_mode(default) -> memory_only; +buffer_mode(required) -> false; +buffer_mode(importance) -> ?IMPORTANCE_HIDDEN; +buffer_mode(_) -> undefined. -queue_seg_bytes(type) -> emqx_schema:bytesize(); -queue_seg_bytes(desc) -> ?DESC("queue_seg_bytes"); -queue_seg_bytes(required) -> false; -queue_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN; -queue_seg_bytes(_) -> undefined. +buffer_seg_bytes(type) -> emqx_schema:bytesize(); +buffer_seg_bytes(desc) -> ?DESC("buffer_seg_bytes"); +buffer_seg_bytes(required) -> false; +buffer_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN; +buffer_seg_bytes(_) -> undefined. desc("creation_opts") -> ?DESC("creation_opts"). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 087a39eee..385b4cb91 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1314,8 +1314,8 @@ t_delete_and_re_create_with_same_name(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => NumBufferWorkers, - queue_mode => volatile_offload, - queue_seg_bytes => 100, + buffer_mode => volatile_offload, + buffer_seg_bytes => 100, resume_interval => 1_000 } ), @@ -1374,7 +1374,7 @@ t_delete_and_re_create_with_same_name(_Config) -> query_mode => async, batch_size => 1, worker_pool_size => 2, - queue_seg_bytes => 100, + buffer_seg_bytes => 100, resume_interval => 1_000 } ), @@ -1406,7 +1406,7 @@ t_always_overflow(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => 1, - max_queue_bytes => 1, + max_buffer_bytes => 1, resume_interval => 1_000 } ), @@ -2642,9 +2642,9 @@ t_call_mode_uncoupled_from_query_mode(_Config) -> %% The default mode is currently `memory_only'. t_volatile_offload_mode(_Config) -> - MaxQueueBytes = 1_000, + MaxBufferBytes = 1_000, DefaultOpts = #{ - max_queue_bytes => MaxQueueBytes, + max_buffer_bytes => MaxBufferBytes, worker_pool_size => 1 }, ?check_trace( @@ -2659,7 +2659,7 @@ t_volatile_offload_mode(_Config) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, - DefaultOpts#{queue_mode => volatile_offload} + DefaultOpts#{buffer_mode => volatile_offload} ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), @@ -2673,8 +2673,8 @@ t_volatile_offload_mode(_Config) -> ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ - queue_mode => volatile_offload, - queue_seg_bytes => MaxQueueBytes div 2 + buffer_mode => volatile_offload, + buffer_seg_bytes => MaxBufferBytes div 2 } ) ), @@ -2688,8 +2688,8 @@ t_volatile_offload_mode(_Config) -> ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ - queue_mode => volatile_offload, - queue_seg_bytes => MaxQueueBytes + buffer_mode => volatile_offload, + buffer_seg_bytes => MaxBufferBytes } ) ), @@ -2705,8 +2705,8 @@ t_volatile_offload_mode(_Config) -> ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ - queue_mode => volatile_offload, - queue_seg_bytes => 2 * MaxQueueBytes + buffer_mode => volatile_offload, + buffer_seg_bytes => 2 * MaxBufferBytes } ) ), @@ -2715,7 +2715,7 @@ t_volatile_offload_mode(_Config) -> ok end, fun(Trace) -> - HalfMaxQueueBytes = MaxQueueBytes div 2, + HalfMaxBufferBytes = MaxBufferBytes div 2, ?assertMatch( [ #{ @@ -2729,7 +2729,7 @@ t_volatile_offload_mode(_Config) -> max_total_bytes := MaxTotalBytes, %% uses the specified value since it's smaller %% than max bytes. - seg_bytes := HalfMaxQueueBytes, + seg_bytes := HalfMaxBufferBytes, offload := {true, volatile} }, #{ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl index 78db8352a..26c6de04d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl @@ -63,7 +63,7 @@ values(_Method, Type) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl index 0b611c142..56671c586 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl @@ -61,7 +61,7 @@ values(_Method, Type) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl index e6a3d1a58..ba1fd0c70 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl @@ -56,7 +56,7 @@ values(_Method) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index f3ed44247..7914c77e2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -57,7 +57,7 @@ values(_Method) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index 958bc3449..a5dcb19e6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -59,7 +59,7 @@ values(_Method, Type) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl index 78fd527d3..28b94a1a4 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl @@ -56,7 +56,7 @@ values(post) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }; values(put) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl index 49a5ed0ce..e216299c2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl @@ -60,7 +60,7 @@ values(post) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }; values(put) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl index 7a958d45f..54406541d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl @@ -58,7 +58,7 @@ values(_Method) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 885a6c4dd..031a5b412 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -168,7 +168,7 @@ When disabled the messages are buffered in RAM only.""" } } - max_queue_bytes { + max_buffer_bytes { desc { en: """Maximum number of bytes to buffer for each buffer worker.""" zh: """每个缓存 worker 允许使用的最大字节数。""" @@ -179,7 +179,7 @@ When disabled the messages are buffered in RAM only.""" } } - queue_seg_bytes { + buffer_seg_bytes { desc { en: "Applicable when buffer mode is set to volatile_offload.\n" "This value is to specify the size of each on-disk buffer file." @@ -192,9 +192,9 @@ When disabled the messages are buffered in RAM only.""" } } - queue_mode { + buffer_mode { desc { - en: "Queue operation mode.\n" + en: "Buffer operation mode.\n" "memory_only: Buffer all messages in memory." "volatile_offload: Buffer message in memory first, when up to certain limit" " (see buffer_seg_bytes config for more information), then start offloading messages to disk" diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index 0416e42b6..0574ac38a 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -11,7 +11,7 @@ bridges { batch_size = 100 batch_time = "10ms" health_check_interval = "15s" - max_queue_bytes = "1GB" + max_buffer_bytes = "1GB" query_mode = "sync" request_timeout = "15s" start_after_created = "true" From 3373a63137a7c0443831985919fef131c2d7d6b4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 14 Apr 2023 13:21:30 -0300 Subject: [PATCH 5/5] docs: improve descriptions Co-authored-by: Zaiming (Stone) Shi --- rel/i18n/emqx_resource_schema.hocon | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 031a5b412..f4a9982bc 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -201,8 +201,8 @@ When disabled the messages are buffered in RAM only.""" zh: "队列操作模式。\n" "memory_only: 所有的消息都缓存在内存里。" "volatile_offload: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" - "(配置项 buffer_seg_bytes 该限制)后," - " 消息会缓存到磁盘上" + "(配置项 buffer_seg_bytes 指定该限制)后," + " 消息会开始缓存到磁盘上。" } label { en: "Buffer Mode"