Merge pull request #10404 from thalesmg/buffer-mem-only-v50
feat(buffer_worker): set default queue mode to `memory_only`
This commit is contained in:
commit
0e3a6d7f22
|
@ -220,7 +220,7 @@ info_example_basic(webhook) ->
|
||||||
auto_restart_interval => 15000,
|
auto_restart_interval => 15000,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
inflight_window => 100,
|
inflight_window => 100,
|
||||||
max_queue_bytes => 100 * 1024 * 1024
|
max_buffer_bytes => 100 * 1024 * 1024
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
info_example_basic(mqtt) ->
|
info_example_basic(mqtt) ->
|
||||||
|
@ -245,7 +245,7 @@ mqtt_main_example() ->
|
||||||
health_check_interval => <<"15s">>,
|
health_check_interval => <<"15s">>,
|
||||||
auto_restart_interval => <<"60s">>,
|
auto_restart_interval => <<"60s">>,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
max_queue_bytes => 100 * 1024 * 1024
|
max_buffer_bytes => 100 * 1024 * 1024
|
||||||
},
|
},
|
||||||
ssl => #{
|
ssl => #{
|
||||||
enable => false
|
enable => false
|
||||||
|
|
|
@ -89,7 +89,7 @@ default_resource_opts() ->
|
||||||
<<"inflight_window">> => 100,
|
<<"inflight_window">> => 100,
|
||||||
<<"auto_restart_interval">> => <<"60s">>,
|
<<"auto_restart_interval">> => <<"60s">>,
|
||||||
<<"health_check_interval">> => <<"15s">>,
|
<<"health_check_interval">> => <<"15s">>,
|
||||||
<<"max_queue_bytes">> => <<"1GB">>,
|
<<"max_buffer_bytes">> => <<"1GB">>,
|
||||||
<<"query_mode">> => <<"sync">>,
|
<<"query_mode">> => <<"sync">>,
|
||||||
%% there is only one underlying MQTT connection
|
%% there is only one underlying MQTT connection
|
||||||
%% doesn't make a lot of sense to have a large pool
|
%% doesn't make a lot of sense to have a large pool
|
||||||
|
|
|
@ -175,7 +175,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
" inflight_window = 100\n"
|
" inflight_window = 100\n"
|
||||||
" auto_restart_interval = \"60s\"\n"
|
" auto_restart_interval = \"60s\"\n"
|
||||||
" health_check_interval = \"15s\"\n"
|
" health_check_interval = \"15s\"\n"
|
||||||
" max_queue_bytes = \"1GB\"\n"
|
" max_buffer_bytes = \"1GB\"\n"
|
||||||
" query_mode = \"~s\"\n"
|
" query_mode = \"~s\"\n"
|
||||||
" request_timeout = \"~s\"\n"
|
" request_timeout = \"~s\"\n"
|
||||||
" start_after_created = \"true\"\n"
|
" start_after_created = \"true\"\n"
|
||||||
|
|
|
@ -70,7 +70,7 @@
|
||||||
auto_restart_interval => pos_integer(),
|
auto_restart_interval => pos_integer(),
|
||||||
batch_size => pos_integer(),
|
batch_size => pos_integer(),
|
||||||
batch_time => pos_integer(),
|
batch_time => pos_integer(),
|
||||||
max_queue_bytes => pos_integer(),
|
max_buffer_bytes => pos_integer(),
|
||||||
query_mode => query_mode(),
|
query_mode => query_mode(),
|
||||||
resume_interval => pos_integer(),
|
resume_interval => pos_integer(),
|
||||||
inflight_window => pos_integer()
|
inflight_window => pos_integer()
|
||||||
|
@ -85,11 +85,8 @@
|
||||||
|
|
||||||
-define(WORKER_POOL_SIZE, 16).
|
-define(WORKER_POOL_SIZE, 16).
|
||||||
|
|
||||||
-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024).
|
-define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024).
|
||||||
-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>).
|
-define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>).
|
||||||
|
|
||||||
-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024).
|
|
||||||
-define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).
|
|
||||||
|
|
||||||
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)).
|
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)).
|
||||||
|
|
||||||
|
|
|
@ -178,20 +178,7 @@ init({Id, Index, Opts}) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
true = gproc_pool:connect_worker(Id, {Id, Index}),
|
true = gproc_pool:connect_worker(Id, {Id, Index}),
|
||||||
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
|
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
|
||||||
SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
|
QueueOpts = replayq_opts(Id, Index, Opts),
|
||||||
TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
|
|
||||||
SegBytes = min(SegBytes0, TotalBytes),
|
|
||||||
QueueOpts =
|
|
||||||
#{
|
|
||||||
dir => disk_queue_dir(Id, Index),
|
|
||||||
marshaller => fun ?MODULE:queue_item_marshaller/1,
|
|
||||||
max_total_bytes => TotalBytes,
|
|
||||||
%% we don't want to retain the queue after
|
|
||||||
%% resource restarts.
|
|
||||||
offload => {true, volatile},
|
|
||||||
seg_bytes => SegBytes,
|
|
||||||
sizer => fun ?MODULE:estimate_size/1
|
|
||||||
},
|
|
||||||
Queue = replayq:open(QueueOpts),
|
Queue = replayq:open(QueueOpts),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
||||||
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
||||||
|
@ -214,7 +201,7 @@ init({Id, Index, Opts}) ->
|
||||||
resume_interval => ResumeInterval,
|
resume_interval => ResumeInterval,
|
||||||
tref => undefined
|
tref => undefined
|
||||||
},
|
},
|
||||||
?tp(buffer_worker_init, #{id => Id, index => Index}),
|
?tp(buffer_worker_init, #{id => Id, index => Index, queue_opts => QueueOpts}),
|
||||||
{ok, running, Data}.
|
{ok, running, Data}.
|
||||||
|
|
||||||
running(enter, _, #{tref := _Tref} = Data) ->
|
running(enter, _, #{tref := _Tref} = Data) ->
|
||||||
|
@ -1679,6 +1666,32 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
|
||||||
end,
|
end,
|
||||||
BatchTime.
|
BatchTime.
|
||||||
|
|
||||||
|
replayq_opts(Id, Index, Opts) ->
|
||||||
|
BufferMode = maps:get(buffer_mode, Opts, memory_only),
|
||||||
|
TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES),
|
||||||
|
case BufferMode of
|
||||||
|
memory_only ->
|
||||||
|
#{
|
||||||
|
mem_only => true,
|
||||||
|
marshaller => fun ?MODULE:queue_item_marshaller/1,
|
||||||
|
max_total_bytes => TotalBytes,
|
||||||
|
sizer => fun ?MODULE:estimate_size/1
|
||||||
|
};
|
||||||
|
volatile_offload ->
|
||||||
|
SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes),
|
||||||
|
SegBytes = min(SegBytes0, TotalBytes),
|
||||||
|
#{
|
||||||
|
dir => disk_queue_dir(Id, Index),
|
||||||
|
marshaller => fun ?MODULE:queue_item_marshaller/1,
|
||||||
|
max_total_bytes => TotalBytes,
|
||||||
|
%% we don't want to retain the queue after
|
||||||
|
%% resource restarts.
|
||||||
|
offload => {true, volatile},
|
||||||
|
seg_bytes => SegBytes,
|
||||||
|
sizer => fun ?MODULE:estimate_size/1
|
||||||
|
}
|
||||||
|
end.
|
||||||
|
|
||||||
%% The request timeout should be greater than the resume interval, as
|
%% The request timeout should be greater than the resume interval, as
|
||||||
%% it defines how often the buffer worker tries to unblock. If request
|
%% it defines how often the buffer worker tries to unblock. If request
|
||||||
%% timeout is <= resume interval and the buffer worker is ever
|
%% timeout is <= resume interval and the buffer worker is ever
|
||||||
|
|
|
@ -40,6 +40,7 @@ fields("resource_opts") ->
|
||||||
];
|
];
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
[
|
[
|
||||||
|
{buffer_mode, fun buffer_mode/1},
|
||||||
{worker_pool_size, fun worker_pool_size/1},
|
{worker_pool_size, fun worker_pool_size/1},
|
||||||
{health_check_interval, fun health_check_interval/1},
|
{health_check_interval, fun health_check_interval/1},
|
||||||
{resume_interval, fun resume_interval/1},
|
{resume_interval, fun resume_interval/1},
|
||||||
|
@ -53,7 +54,8 @@ fields("creation_opts") ->
|
||||||
{batch_size, fun batch_size/1},
|
{batch_size, fun batch_size/1},
|
||||||
{batch_time, fun batch_time/1},
|
{batch_time, fun batch_time/1},
|
||||||
{enable_queue, fun enable_queue/1},
|
{enable_queue, fun enable_queue/1},
|
||||||
{max_queue_bytes, fun max_queue_bytes/1}
|
{max_buffer_bytes, fun max_buffer_bytes/1},
|
||||||
|
{buffer_seg_bytes, fun buffer_seg_bytes/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
resource_opts_meta() ->
|
resource_opts_meta() ->
|
||||||
|
@ -143,10 +145,24 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW;
|
||||||
batch_time(required) -> false;
|
batch_time(required) -> false;
|
||||||
batch_time(_) -> undefined.
|
batch_time(_) -> undefined.
|
||||||
|
|
||||||
max_queue_bytes(type) -> emqx_schema:bytesize();
|
max_buffer_bytes(type) -> emqx_schema:bytesize();
|
||||||
max_queue_bytes(desc) -> ?DESC("max_queue_bytes");
|
max_buffer_bytes(aliases) -> [max_queue_bytes];
|
||||||
max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
|
max_buffer_bytes(desc) -> ?DESC("max_buffer_bytes");
|
||||||
max_queue_bytes(required) -> false;
|
max_buffer_bytes(default) -> ?DEFAULT_BUFFER_BYTES_RAW;
|
||||||
max_queue_bytes(_) -> undefined.
|
max_buffer_bytes(required) -> false;
|
||||||
|
max_buffer_bytes(_) -> undefined.
|
||||||
|
|
||||||
|
buffer_mode(type) -> enum([memory_only, volatile_offload]);
|
||||||
|
buffer_mode(desc) -> ?DESC("buffer_mode");
|
||||||
|
buffer_mode(default) -> memory_only;
|
||||||
|
buffer_mode(required) -> false;
|
||||||
|
buffer_mode(importance) -> ?IMPORTANCE_HIDDEN;
|
||||||
|
buffer_mode(_) -> undefined.
|
||||||
|
|
||||||
|
buffer_seg_bytes(type) -> emqx_schema:bytesize();
|
||||||
|
buffer_seg_bytes(desc) -> ?DESC("buffer_seg_bytes");
|
||||||
|
buffer_seg_bytes(required) -> false;
|
||||||
|
buffer_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN;
|
||||||
|
buffer_seg_bytes(_) -> undefined.
|
||||||
|
|
||||||
desc("creation_opts") -> ?DESC("creation_opts").
|
desc("creation_opts") -> ?DESC("creation_opts").
|
||||||
|
|
|
@ -1314,7 +1314,8 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
worker_pool_size => NumBufferWorkers,
|
worker_pool_size => NumBufferWorkers,
|
||||||
queue_seg_bytes => 100,
|
buffer_mode => volatile_offload,
|
||||||
|
buffer_seg_bytes => 100,
|
||||||
resume_interval => 1_000
|
resume_interval => 1_000
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -1373,7 +1374,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
worker_pool_size => 2,
|
worker_pool_size => 2,
|
||||||
queue_seg_bytes => 100,
|
buffer_seg_bytes => 100,
|
||||||
resume_interval => 1_000
|
resume_interval => 1_000
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -1405,7 +1406,7 @@ t_always_overflow(_Config) ->
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
max_queue_bytes => 1,
|
max_buffer_bytes => 1,
|
||||||
resume_interval => 1_000
|
resume_interval => 1_000
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
|
@ -2639,6 +2640,117 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% The default mode is currently `memory_only'.
|
||||||
|
t_volatile_offload_mode(_Config) ->
|
||||||
|
MaxBufferBytes = 1_000,
|
||||||
|
DefaultOpts = #{
|
||||||
|
max_buffer_bytes => MaxBufferBytes,
|
||||||
|
worker_pool_size => 1
|
||||||
|
},
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
emqx_connector_demo:set_callback_mode(async_if_possible),
|
||||||
|
%% Create without any specified segment bytes; should
|
||||||
|
%% default to equal max bytes.
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{buffer_mode => volatile_offload}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
|
||||||
|
%% Create with segment bytes < max bytes
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{
|
||||||
|
buffer_mode => volatile_offload,
|
||||||
|
buffer_seg_bytes => MaxBufferBytes div 2
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
%% Create with segment bytes = max bytes
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{
|
||||||
|
buffer_mode => volatile_offload,
|
||||||
|
buffer_seg_bytes => MaxBufferBytes
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
|
||||||
|
%% Create with segment bytes > max bytes; should normalize
|
||||||
|
%% to max bytes.
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
emqx_resource:create(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
#{name => test_resource},
|
||||||
|
DefaultOpts#{
|
||||||
|
buffer_mode => volatile_offload,
|
||||||
|
buffer_seg_bytes => 2 * MaxBufferBytes
|
||||||
|
}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
HalfMaxBufferBytes = MaxBufferBytes div 2,
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
seg_bytes := MaxTotalBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
%% uses the specified value since it's smaller
|
||||||
|
%% than max bytes.
|
||||||
|
seg_bytes := HalfMaxBufferBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
seg_bytes := MaxTotalBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
},
|
||||||
|
#{
|
||||||
|
dir := _,
|
||||||
|
max_total_bytes := MaxTotalBytes,
|
||||||
|
seg_bytes := MaxTotalBytes,
|
||||||
|
offload := {true, volatile}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
?projection(queue_opts, ?of_kind(buffer_worker_init, Trace))
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Change the default queue mode for buffer workers to `memory_only`.
|
||||||
|
Before this change, the default queue mode was `volatile_offload`. When under high message rate pressure and when the resource is not keeping up with such rate, the buffer performance degraded a lot due to the constant disk operations.
|
|
@ -63,7 +63,7 @@ values(_Method, Type) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,7 @@ values(_Method, Type) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ values(_Method) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ values(_Method) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ values(_Method, Type) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ values(post) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
values(put) ->
|
values(put) ->
|
||||||
|
|
|
@ -60,7 +60,7 @@ values(post) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
values(put) ->
|
values(put) ->
|
||||||
|
|
|
@ -58,7 +58,7 @@ values(_Method) ->
|
||||||
batch_size => ?DEFAULT_BATCH_SIZE,
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
batch_time => ?DEFAULT_BATCH_TIME,
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
query_mode => sync,
|
query_mode => sync,
|
||||||
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
|
|
@ -168,7 +168,7 @@ When disabled the messages are buffered in RAM only."""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
max_queue_bytes {
|
max_buffer_bytes {
|
||||||
desc {
|
desc {
|
||||||
en: """Maximum number of bytes to buffer for each buffer worker."""
|
en: """Maximum number of bytes to buffer for each buffer worker."""
|
||||||
zh: """每个缓存 worker 允许使用的最大字节数。"""
|
zh: """每个缓存 worker 允许使用的最大字节数。"""
|
||||||
|
@ -179,5 +179,36 @@ When disabled the messages are buffered in RAM only."""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
buffer_seg_bytes {
|
||||||
|
desc {
|
||||||
|
en: "Applicable when buffer mode is set to <code>volatile_offload</code>.\n"
|
||||||
|
"This value is to specify the size of each on-disk buffer file."
|
||||||
|
zh: "当缓存模式是 <code>volatile_offload</code> 时适用。"
|
||||||
|
"该配置用于指定缓存到磁盘上的文件的大小。"
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: "Segment File Bytes"
|
||||||
|
zh: "缓存文件大小"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer_mode {
|
||||||
|
desc {
|
||||||
|
en: "Buffer operation mode.\n"
|
||||||
|
"<code>memory_only</mode>: Buffer all messages in memory."
|
||||||
|
"<code>volatile_offload</code>: Buffer message in memory first, when up to certain limit"
|
||||||
|
" (see <code>buffer_seg_bytes</code> config for more information), then start offloading messages to disk"
|
||||||
|
zh: "队列操作模式。\n"
|
||||||
|
"<code>memory_only</code>: 所有的消息都缓存在内存里。"
|
||||||
|
"<code>volatile_offload</code>: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
|
||||||
|
"(配置项 <code>buffer_seg_bytes</code> 指定该限制)后,"
|
||||||
|
" 消息会开始缓存到磁盘上。"
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: "Buffer Mode"
|
||||||
|
zh: "缓存模式"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ bridges {
|
||||||
batch_size = 100
|
batch_size = 100
|
||||||
batch_time = "10ms"
|
batch_time = "10ms"
|
||||||
health_check_interval = "15s"
|
health_check_interval = "15s"
|
||||||
max_queue_bytes = "1GB"
|
max_buffer_bytes = "1GB"
|
||||||
query_mode = "sync"
|
query_mode = "sync"
|
||||||
request_timeout = "15s"
|
request_timeout = "15s"
|
||||||
start_after_created = "true"
|
start_after_created = "true"
|
||||||
|
|
Loading…
Reference in New Issue