feat(buffer_worker): set default queue mode to `memory_only`
Fixes https://emqx.atlassian.net/browse/EMQX-9367 For better user experience and performance for the average bridge, we should change the default queue mode to `memory_only`, as was the behavior of most bridges in e4.x. This leads to better performance when message rate is high enough and the remote resource is not keeping up with EMQX. Also, we set the default segment size to equal max queue bytes.
This commit is contained in:
parent
4de13d2800
commit
14ed4a7ada
|
@ -85,9 +85,6 @@
|
||||||
|
|
||||||
-define(WORKER_POOL_SIZE, 16).
|
-define(WORKER_POOL_SIZE, 16).
|
||||||
|
|
||||||
-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024).
|
|
||||||
-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>).
|
|
||||||
|
|
||||||
-define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024).
|
-define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024).
|
||||||
-define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>).
|
-define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>).
|
||||||
|
|
||||||
|
|
|
@ -178,20 +178,7 @@ init({Id, Index, Opts}) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
true = gproc_pool:connect_worker(Id, {Id, Index}),
|
true = gproc_pool:connect_worker(Id, {Id, Index}),
|
||||||
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
|
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
|
||||||
SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
|
QueueOpts = replayq_opts(Id, Index, Opts),
|
||||||
TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
|
||||||
SegBytes = min(SegBytes0, TotalBytes),
|
|
||||||
QueueOpts =
|
|
||||||
#{
|
|
||||||
dir => disk_queue_dir(Id, Index),
|
|
||||||
marshaller => fun ?MODULE:queue_item_marshaller/1,
|
|
||||||
max_total_bytes => TotalBytes,
|
|
||||||
%% we don't want to retain the queue after
|
|
||||||
%% resource restarts.
|
|
||||||
offload => {true, volatile},
|
|
||||||
seg_bytes => SegBytes,
|
|
||||||
sizer => fun ?MODULE:estimate_size/1
|
|
||||||
},
|
|
||||||
Queue = replayq:open(QueueOpts),
|
Queue = replayq:open(QueueOpts),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
||||||
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
||||||
|
@ -214,7 +201,7 @@ init({Id, Index, Opts}) ->
|
||||||
resume_interval => ResumeInterval,
|
resume_interval => ResumeInterval,
|
||||||
tref => undefined
|
tref => undefined
|
||||||
},
|
},
|
||||||
?tp(buffer_worker_init, #{id => Id, index => Index}),
|
?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}),
|
||||||
{ok, running, Data}.
|
{ok, running, Data}.
|
||||||
|
|
||||||
running(enter, _, #{tref := _Tref} = Data) ->
|
running(enter, _, #{tref := _Tref} = Data) ->
|
||||||
|
@ -1679,6 +1666,32 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
|
||||||
end,
|
end,
|
||||||
BatchTime.
|
BatchTime.
|
||||||
|
|
||||||
|
replayq_opts(Id, Index, Opts) ->
|
||||||
|
QueueMode = maps:get(queue_mode, Opts, memory_only),
|
||||||
|
TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
||||||
|
case QueueMode of
|
||||||
|
memory_only ->
|
||||||
|
#{
|
||||||
|
mem_only => true,
|
||||||
|
marshaller => fun ?MODULE:queue_item_marshaller/1,
|
||||||
|
max_total_bytes => TotalBytes,
|
||||||
|
sizer => fun ?MODULE:estimate_size/1
|
||||||
|
};
|
||||||
|
volatile_offload ->
|
||||||
|
SegBytes0 = maps:get(queue_seg_bytes, Opts, TotalBytes),
|
||||||
|
SegBytes = min(SegBytes0, TotalBytes),
|
||||||
|
#{
|
||||||
|
dir => disk_queue_dir(Id, Index),
|
||||||
|
marshaller => fun ?MODULE:queue_item_marshaller/1,
|
||||||
|
max_total_bytes => TotalBytes,
|
||||||
|
%% we don't want to retain the queue after
|
||||||
|
%% resource restarts.
|
||||||
|
offload => {true, volatile},
|
||||||
|
seg_bytes => SegBytes,
|
||||||
|
sizer => fun ?MODULE:estimate_size/1
|
||||||
|
}
|
||||||
|
end.
|
||||||
|
|
||||||
%% The request timeout should be greater than the resume interval, as
|
%% The request timeout should be greater than the resume interval, as
|
||||||
%% it defines how often the buffer worker tries to unblock. If request
|
%% it defines how often the buffer worker tries to unblock. If request
|
||||||
%% timeout is <= resume interval and the buffer worker is ever
|
%% timeout is <= resume interval and the buffer worker is ever
|
||||||
|
|
|
@ -40,6 +40,7 @@ fields("resource_opts") ->
|
||||||
];
|
];
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
[
|
[
|
||||||
|
{queue_mode, fun queue_mode/1},
|
||||||
{worker_pool_size, fun worker_pool_size/1},
|
{worker_pool_size, fun worker_pool_size/1},
|
||||||
{health_check_interval, fun health_check_interval/1},
|
{health_check_interval, fun health_check_interval/1},
|
||||||
{resume_interval, fun resume_interval/1},
|
{resume_interval, fun resume_interval/1},
|
||||||
|
@ -53,7 +54,8 @@ fields("creation_opts") ->
|
||||||
{batch_size, fun batch_size/1},
|
{batch_size, fun batch_size/1},
|
||||||
{batch_time, fun batch_time/1},
|
{batch_time, fun batch_time/1},
|
||||||
{enable_queue, fun enable_queue/1},
|
{enable_queue, fun enable_queue/1},
|
||||||
{max_queue_bytes, fun max_queue_bytes/1}
|
{max_queue_bytes, fun max_queue_bytes/1},
|
||||||
|
{queue_seg_bytes, fun queue_seg_bytes/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
resource_opts_meta() ->
|
resource_opts_meta() ->
|
||||||
|
@ -149,4 +151,17 @@ max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
|
||||||
max_queue_bytes(required) -> false;
|
max_queue_bytes(required) -> false;
|
||||||
max_queue_bytes(_) -> undefined.
|
max_queue_bytes(_) -> undefined.
|
||||||
|
|
||||||
|
queue_mode(type) -> enum([memory_only, volatile_offload]);
|
||||||
|
queue_mode(desc) -> ?DESC("queue_mode");
|
||||||
|
queue_mode(default) -> memory_only;
|
||||||
|
queue_mode(required) -> false;
|
||||||
|
queue_mode(importance) -> ?IMPORTANCE_HIDDEN;
|
||||||
|
queue_mode(_) -> undefined.
|
||||||
|
|
||||||
|
queue_seg_bytes(type) -> emqx_schema:bytesize();
|
||||||
|
queue_seg_bytes(desc) -> ?DESC("queue_seg_bytes");
|
||||||
|
queue_seg_bytes(required) -> false;
|
||||||
|
queue_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN;
|
||||||
|
queue_seg_bytes(_) -> undefined.
|
||||||
|
|
||||||
desc("creation_opts") -> ?DESC("creation_opts").
|
desc("creation_opts") -> ?DESC("creation_opts").
|
||||||
|
|
|
@ -1314,6 +1314,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
worker_pool_size => NumBufferWorkers,
|
worker_pool_size => NumBufferWorkers,
|
||||||
|
queue_mode => volatile_offload,
|
||||||
queue_seg_bytes => 100,
|
queue_seg_bytes => 100,
|
||||||
resume_interval => 1_000
|
resume_interval => 1_000
|
||||||
}
|
}
|
||||||
|
@ -2639,6 +2640,117 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% The default mode is currently `memory_only'.
|
||||||
|
t_volatile_offload_mode(_Config) ->
|
||||||
|
MaxQueueBytes = 1_000,
|
||||||
|
DefaultOpts = #{
|
||||||
|
max_queue_bytes => MaxQueueBytes,
|
||||||
|
worker_pool_size => 1
|
||||||
|
},
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||||
|
%% Create without any specified segment bytes; should
|
||||||
|
%% default to equal max bytes.
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{queue_mode => volatile_offload}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
|
||||||
|
%% Create with segment bytes < max bytes
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{
|
||||||
|
queue_mode => volatile_offload,
|
||||||
|
queue_seg_bytes => MaxQueueBytes div 2
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
%% Create with segment bytes = max bytes
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{
|
||||||
|
queue_mode => volatile_offload,
|
||||||
|
queue_seg_bytes => MaxQueueBytes
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
|
||||||
|
%% Create with segment bytes > max bytes; should normalize
|
||||||
|
%% to max bytes.
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{
|
||||||
|
queue_mode => volatile_offload,
|
||||||
|
queue_seg_bytes => 2 * MaxQueueBytes
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
HalfMaxQueueBytes = MaxQueueBytes div 2,
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
seg_bytes := MaxTotalBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
%% uses the specified value since it's smaller
|
||||||
|
%% than max bytes.
|
||||||
|
seg_bytes := HalfMaxQueueBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
seg_bytes := MaxTotalBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
seg_bytes := MaxTotalBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
?projection(queue_opts, ?of_kind(buffer_worker_init, Trace))
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Change the default queue mode for buffer workers to `memory_only`.
|
||||||
|
Before this change, the default queue mode was `volatile_offload`. When under high message rate pressure and when the resource is not keeping up with such rate, the buffer performance degraded a lot due to the constant disk operations.
|
|
@ -179,5 +179,43 @@ When disabled the messages are buffered in RAM only."""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queue_seg_bytes {
|
||||||
|
desc {
|
||||||
|
en: "Applicable when buffer mode is set to <code>volatile_offload</code>.\n"
|
||||||
|
"This value is to specify the size of each on-disk buffer file."
|
||||||
|
zh: "当缓存模式是 <code>volatile_offload</code> 时适用。"
|
||||||
|
"该配置用于指定缓存到磁盘上的文件的大小。"
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: "Segment File Bytes"
|
||||||
|
zh: "缓存文件大小"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queue_mode {
|
||||||
|
desc {
|
||||||
|
en: "Queue operation mode.\n"
|
||||||
|
"\n"
|
||||||
|
"<code>memory_only</mode>: Buffer all messages in memory."
|
||||||
|
" The messages will be lost in case of EMQX node restart.\n"
|
||||||
|
"<code>volatile_offload</code>: Buffer message in memory first, when up to certain limit"
|
||||||
|
" (see <code>queue_seg_bytes</code> config for more information), then start offloading"
|
||||||
|
" messages to disk, Like <code>memory_only</code> mode, the messages will be lost in case of"
|
||||||
|
" EMQX node restart."
|
||||||
|
zh: "队列操作模式。\n"
|
||||||
|
"\n"
|
||||||
|
"<code>memory_only</code>: 所有的消息都缓存在内存里。"
|
||||||
|
" 如果 EMQX 服务重启,缓存的消息会丢失。\n"
|
||||||
|
"<code>hybrid</code>: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
|
||||||
|
"(配置项 <code>queue_seg_bytes</code> 描述了该限制)后,"
|
||||||
|
" 后续的消息会缓存到磁盘上。与 <code>memory_only</code> 模式一样,"
|
||||||
|
" 如果 EMQX 服务重启,缓存的消息会丢失。"
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: "Queue Mode"
|
||||||
|
zh: "排队模式"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue