diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl
index 904eeffa5..f8d671b40 100644
--- a/apps/emqx_resource/include/emqx_resource.hrl
+++ b/apps/emqx_resource/include/emqx_resource.hrl
@@ -85,9 +85,6 @@
-define(WORKER_POOL_SIZE, 16).
--define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024).
--define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>).
-
-define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>).
diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
index 0fa4c0bd8..c53627ca4 100644
--- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
+++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
@@ -178,20 +178,7 @@ init({Id, Index, Opts}) ->
process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Id, {Id, Index}),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
- SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
- TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
- SegBytes = min(SegBytes0, TotalBytes),
- QueueOpts =
- #{
- dir => disk_queue_dir(Id, Index),
- marshaller => fun ?MODULE:queue_item_marshaller/1,
- max_total_bytes => TotalBytes,
- %% we don't want to retain the queue after
- %% resource restarts.
- offload => {true, volatile},
- seg_bytes => SegBytes,
- sizer => fun ?MODULE:estimate_size/1
- },
+ QueueOpts = replayq_opts(Id, Index, Opts),
Queue = replayq:open(QueueOpts),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:inflight_set(Id, Index, 0),
@@ -214,7 +201,7 @@ init({Id, Index, Opts}) ->
resume_interval => ResumeInterval,
tref => undefined
},
- ?tp(buffer_worker_init, #{id => Id, index => Index}),
+ ?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}),
{ok, running, Data}.
running(enter, _, #{tref := _Tref} = Data) ->
@@ -1679,6 +1666,32 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
end,
BatchTime.
+replayq_opts(Id, Index, Opts) ->
+ QueueMode = maps:get(queue_mode, Opts, memory_only),
+ TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
+ case QueueMode of
+ memory_only ->
+ #{
+ mem_only => true,
+ marshaller => fun ?MODULE:queue_item_marshaller/1,
+ max_total_bytes => TotalBytes,
+ sizer => fun ?MODULE:estimate_size/1
+ };
+ volatile_offload ->
+ SegBytes0 = maps:get(queue_seg_bytes, Opts, TotalBytes),
+ SegBytes = min(SegBytes0, TotalBytes),
+ #{
+ dir => disk_queue_dir(Id, Index),
+ marshaller => fun ?MODULE:queue_item_marshaller/1,
+ max_total_bytes => TotalBytes,
+ %% we don't want to retain the queue after
+ %% resource restarts.
+ offload => {true, volatile},
+ seg_bytes => SegBytes,
+ sizer => fun ?MODULE:estimate_size/1
+ }
+ end.
+
%% The request timeout should be greater than the resume interval, as
%% it defines how often the buffer worker tries to unblock. If request
%% timeout is <= resume interval and the buffer worker is ever
diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl
index 647a40fed..9116927fa 100644
--- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl
+++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl
@@ -40,6 +40,7 @@ fields("resource_opts") ->
];
fields("creation_opts") ->
[
+ {queue_mode, fun queue_mode/1},
{worker_pool_size, fun worker_pool_size/1},
{health_check_interval, fun health_check_interval/1},
{resume_interval, fun resume_interval/1},
@@ -53,7 +54,8 @@ fields("creation_opts") ->
{batch_size, fun batch_size/1},
{batch_time, fun batch_time/1},
{enable_queue, fun enable_queue/1},
- {max_queue_bytes, fun max_queue_bytes/1}
+ {max_queue_bytes, fun max_queue_bytes/1},
+ {queue_seg_bytes, fun queue_seg_bytes/1}
].
resource_opts_meta() ->
@@ -149,4 +151,17 @@ max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
max_queue_bytes(required) -> false;
max_queue_bytes(_) -> undefined.
+queue_mode(type) -> enum([memory_only, volatile_offload]);
+queue_mode(desc) -> ?DESC("queue_mode");
+queue_mode(default) -> memory_only;
+queue_mode(required) -> false;
+queue_mode(importance) -> ?IMPORTANCE_HIDDEN;
+queue_mode(_) -> undefined.
+
+queue_seg_bytes(type) -> emqx_schema:bytesize();
+queue_seg_bytes(desc) -> ?DESC("queue_seg_bytes");
+queue_seg_bytes(required) -> false;
+queue_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN;
+queue_seg_bytes(_) -> undefined.
+
desc("creation_opts") -> ?DESC("creation_opts").
diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl
index 5d8e85697..087a39eee 100644
--- a/apps/emqx_resource/test/emqx_resource_SUITE.erl
+++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl
@@ -1314,6 +1314,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
query_mode => sync,
batch_size => 1,
worker_pool_size => NumBufferWorkers,
+ queue_mode => volatile_offload,
queue_seg_bytes => 100,
resume_interval => 1_000
}
@@ -2639,6 +2640,117 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
end
).
+%% The default mode is currently `memory_only'.
+t_volatile_offload_mode(_Config) ->
+ MaxQueueBytes = 1_000,
+ DefaultOpts = #{
+ max_queue_bytes => MaxQueueBytes,
+ worker_pool_size => 1
+ },
+ ?check_trace(
+ begin
+ emqx_connector_demo:set_callback_mode(async_if_possible),
+ %% Create without any specified segment bytes; should
+ %% default to equal max bytes.
+ ?assertMatch(
+ {ok, _},
+ emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ DefaultOpts#{queue_mode => volatile_offload}
+ )
+ ),
+ ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+
+ %% Create with segment bytes < max bytes
+ ?assertMatch(
+ {ok, _},
+ emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ DefaultOpts#{
+ queue_mode => volatile_offload,
+ queue_seg_bytes => MaxQueueBytes div 2
+ }
+ )
+ ),
+ ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+ %% Create with segment bytes = max bytes
+ ?assertMatch(
+ {ok, _},
+ emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ DefaultOpts#{
+ queue_mode => volatile_offload,
+ queue_seg_bytes => MaxQueueBytes
+ }
+ )
+ ),
+ ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+
+ %% Create with segment bytes > max bytes; should normalize
+ %% to max bytes.
+ ?assertMatch(
+ {ok, _},
+ emqx_resource:create(
+ ?ID,
+ ?DEFAULT_RESOURCE_GROUP,
+ ?TEST_RESOURCE,
+ #{name => test_resource},
+ DefaultOpts#{
+ queue_mode => volatile_offload,
+ queue_seg_bytes => 2 * MaxQueueBytes
+ }
+ )
+ ),
+ ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+
+ ok
+ end,
+ fun(Trace) ->
+ HalfMaxQueueBytes = MaxQueueBytes div 2,
+ ?assertMatch(
+ [
+ #{
+ dir := _,
+ max_total_bytes := MaxTotalBytes,
+ seg_bytes := MaxTotalBytes,
+ offload := {true, volatile}
+ },
+ #{
+ dir := _,
+ max_total_bytes := MaxTotalBytes,
+ %% uses the specified value since it's smaller
+ %% than max bytes.
+ seg_bytes := HalfMaxQueueBytes,
+ offload := {true, volatile}
+ },
+ #{
+ dir := _,
+ max_total_bytes := MaxTotalBytes,
+ seg_bytes := MaxTotalBytes,
+ offload := {true, volatile}
+ },
+ #{
+ dir := _,
+ max_total_bytes := MaxTotalBytes,
+ seg_bytes := MaxTotalBytes,
+ offload := {true, volatile}
+ }
+ ],
+ ?projection(queue_opts, ?of_kind(buffer_worker_init, Trace))
+ ),
+ ok
+ end
+ ).
+
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
diff --git a/changes/ce/feat-10404.en.md b/changes/ce/feat-10404.en.md
new file mode 100644
index 000000000..ad216336e
--- /dev/null
+++ b/changes/ce/feat-10404.en.md
@@ -0,0 +1,2 @@
+Change the default queue mode for buffer workers to `memory_only`.
+Before this change, the default queue mode was `volatile_offload`. When under high message rate pressure and when the resource is not keeping up with such rate, the buffer performance degraded a lot due to the constant disk operations.
diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon
index c73f8b1aa..7b693c256 100644
--- a/rel/i18n/emqx_resource_schema.hocon
+++ b/rel/i18n/emqx_resource_schema.hocon
@@ -179,5 +179,43 @@ When disabled the messages are buffered in RAM only."""
}
}
+ queue_seg_bytes {
+ desc {
+ en: "Applicable when buffer mode is set to volatile_offload
.\n"
+ "This value is to specify the size of each on-disk buffer file."
+ zh: "当缓存模式是 volatile_offload
时适用。"
+ "该配置用于指定缓存到磁盘上的文件的大小。"
+ }
+ label {
+ en: "Segment File Bytes"
+ zh: "缓存文件大小"
+ }
+ }
+
+ queue_mode {
+ desc {
+ en: "Queue operation mode.\n"
+ "\n"
+ "memory_only: Buffer all messages in memory."
+ " The messages will be lost in case of EMQX node restart.\n"
+ "volatile_offload
: Buffer message in memory first, when up to certain limit"
+ " (see queue_seg_bytes
config for more information), then start offloading"
+ " messages to disk, Like memory_only
mode, the messages will be lost in case of"
+ " EMQX node restart."
+ zh: "队列操作模式。\n"
+ "\n"
+ "memory_only
: 所有的消息都缓存在内存里。"
+ " 如果 EMQX 服务重启,缓存的消息会丢失。\n"
+ "hybrid
: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
+ "(配置项 queue_seg_bytes
描述了该限制)后,"
+ " 后续的消息会缓存到磁盘上。与 memory_only
模式一样,"
+ " 如果 EMQX 服务重启,缓存的消息会丢失。"
+ }
+ label {
+ en: "Queue Mode"
+ zh: "排队模式"
+ }
+ }
+
}