refactor(buffer_worker): rename `s/queue/buffer/g`

This commit is contained in:
Thales Macedo Garitezi 2023-04-14 11:29:52 -03:00
parent dd38122797
commit e073bc90bc
17 changed files with 59 additions and 58 deletions

View File

@ -220,7 +220,7 @@ info_example_basic(webhook) ->
auto_restart_interval => 15000, auto_restart_interval => 15000,
query_mode => async, query_mode => async,
inflight_window => 100, inflight_window => 100,
max_queue_bytes => 100 * 1024 * 1024 max_buffer_bytes => 100 * 1024 * 1024
} }
}; };
info_example_basic(mqtt) -> info_example_basic(mqtt) ->
@ -245,7 +245,7 @@ mqtt_main_example() ->
health_check_interval => <<"15s">>, health_check_interval => <<"15s">>,
auto_restart_interval => <<"60s">>, auto_restart_interval => <<"60s">>,
query_mode => sync, query_mode => sync,
max_queue_bytes => 100 * 1024 * 1024 max_buffer_bytes => 100 * 1024 * 1024
}, },
ssl => #{ ssl => #{
enable => false enable => false

View File

@ -89,7 +89,7 @@ default_resource_opts() ->
<<"inflight_window">> => 100, <<"inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>, <<"auto_restart_interval">> => <<"60s">>,
<<"health_check_interval">> => <<"15s">>, <<"health_check_interval">> => <<"15s">>,
<<"max_queue_bytes">> => <<"1GB">>, <<"max_buffer_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
%% there is only one underlying MQTT connection %% there is only one underlying MQTT connection
%% doesn't make a lot of sense to have a large pool %% doesn't make a lot of sense to have a large pool

View File

@ -175,7 +175,7 @@ bridge_async_config(#{port := Port} = Config) ->
" inflight_window = 100\n" " inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n" " auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n" " health_check_interval = \"15s\"\n"
" max_queue_bytes = \"1GB\"\n" " max_buffer_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n" " query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n" " request_timeout = \"~s\"\n"
" start_after_created = \"true\"\n" " start_after_created = \"true\"\n"

View File

@ -70,7 +70,7 @@
auto_restart_interval => pos_integer(), auto_restart_interval => pos_integer(),
batch_size => pos_integer(), batch_size => pos_integer(),
batch_time => pos_integer(), batch_time => pos_integer(),
max_queue_bytes => pos_integer(), max_buffer_bytes => pos_integer(),
query_mode => query_mode(), query_mode => query_mode(),
resume_interval => pos_integer(), resume_interval => pos_integer(),
inflight_window => pos_integer() inflight_window => pos_integer()
@ -85,8 +85,8 @@
-define(WORKER_POOL_SIZE, 16). -define(WORKER_POOL_SIZE, 16).
-define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024). -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>). -define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>).
-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). -define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)).

View File

@ -1667,9 +1667,9 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
BatchTime. BatchTime.
replayq_opts(Id, Index, Opts) -> replayq_opts(Id, Index, Opts) ->
QueueMode = maps:get(queue_mode, Opts, memory_only), BufferMode = maps:get(buffer_mode, Opts, memory_only),
TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES),
case QueueMode of case BufferMode of
memory_only -> memory_only ->
#{ #{
mem_only => true, mem_only => true,
@ -1678,7 +1678,7 @@ replayq_opts(Id, Index, Opts) ->
sizer => fun ?MODULE:estimate_size/1 sizer => fun ?MODULE:estimate_size/1
}; };
volatile_offload -> volatile_offload ->
SegBytes0 = maps:get(queue_seg_bytes, Opts, TotalBytes), SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes),
SegBytes = min(SegBytes0, TotalBytes), SegBytes = min(SegBytes0, TotalBytes),
#{ #{
dir => disk_queue_dir(Id, Index), dir => disk_queue_dir(Id, Index),

View File

@ -40,7 +40,7 @@ fields("resource_opts") ->
]; ];
fields("creation_opts") -> fields("creation_opts") ->
[ [
{queue_mode, fun queue_mode/1}, {buffer_mode, fun buffer_mode/1},
{worker_pool_size, fun worker_pool_size/1}, {worker_pool_size, fun worker_pool_size/1},
{health_check_interval, fun health_check_interval/1}, {health_check_interval, fun health_check_interval/1},
{resume_interval, fun resume_interval/1}, {resume_interval, fun resume_interval/1},
@ -54,8 +54,8 @@ fields("creation_opts") ->
{batch_size, fun batch_size/1}, {batch_size, fun batch_size/1},
{batch_time, fun batch_time/1}, {batch_time, fun batch_time/1},
{enable_queue, fun enable_queue/1}, {enable_queue, fun enable_queue/1},
{max_queue_bytes, fun max_queue_bytes/1}, {max_buffer_bytes, fun max_buffer_bytes/1},
{queue_seg_bytes, fun queue_seg_bytes/1} {buffer_seg_bytes, fun buffer_seg_bytes/1}
]. ].
resource_opts_meta() -> resource_opts_meta() ->
@ -145,23 +145,24 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW;
batch_time(required) -> false; batch_time(required) -> false;
batch_time(_) -> undefined. batch_time(_) -> undefined.
max_queue_bytes(type) -> emqx_schema:bytesize(); max_buffer_bytes(type) -> emqx_schema:bytesize();
max_queue_bytes(desc) -> ?DESC("max_queue_bytes"); max_buffer_bytes(aliases) -> [max_queue_bytes];
max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; max_buffer_bytes(desc) -> ?DESC("max_buffer_bytes");
max_queue_bytes(required) -> false; max_buffer_bytes(default) -> ?DEFAULT_BUFFER_BYTES_RAW;
max_queue_bytes(_) -> undefined. max_buffer_bytes(required) -> false;
max_buffer_bytes(_) -> undefined.
queue_mode(type) -> enum([memory_only, volatile_offload]); buffer_mode(type) -> enum([memory_only, volatile_offload]);
queue_mode(desc) -> ?DESC("queue_mode"); buffer_mode(desc) -> ?DESC("buffer_mode");
queue_mode(default) -> memory_only; buffer_mode(default) -> memory_only;
queue_mode(required) -> false; buffer_mode(required) -> false;
queue_mode(importance) -> ?IMPORTANCE_HIDDEN; buffer_mode(importance) -> ?IMPORTANCE_HIDDEN;
queue_mode(_) -> undefined. buffer_mode(_) -> undefined.
queue_seg_bytes(type) -> emqx_schema:bytesize(); buffer_seg_bytes(type) -> emqx_schema:bytesize();
queue_seg_bytes(desc) -> ?DESC("queue_seg_bytes"); buffer_seg_bytes(desc) -> ?DESC("buffer_seg_bytes");
queue_seg_bytes(required) -> false; buffer_seg_bytes(required) -> false;
queue_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN; buffer_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN;
queue_seg_bytes(_) -> undefined. buffer_seg_bytes(_) -> undefined.
desc("creation_opts") -> ?DESC("creation_opts"). desc("creation_opts") -> ?DESC("creation_opts").

View File

@ -1314,8 +1314,8 @@ t_delete_and_re_create_with_same_name(_Config) ->
query_mode => sync, query_mode => sync,
batch_size => 1, batch_size => 1,
worker_pool_size => NumBufferWorkers, worker_pool_size => NumBufferWorkers,
queue_mode => volatile_offload, buffer_mode => volatile_offload,
queue_seg_bytes => 100, buffer_seg_bytes => 100,
resume_interval => 1_000 resume_interval => 1_000
} }
), ),
@ -1374,7 +1374,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
query_mode => async, query_mode => async,
batch_size => 1, batch_size => 1,
worker_pool_size => 2, worker_pool_size => 2,
queue_seg_bytes => 100, buffer_seg_bytes => 100,
resume_interval => 1_000 resume_interval => 1_000
} }
), ),
@ -1406,7 +1406,7 @@ t_always_overflow(_Config) ->
query_mode => sync, query_mode => sync,
batch_size => 1, batch_size => 1,
worker_pool_size => 1, worker_pool_size => 1,
max_queue_bytes => 1, max_buffer_bytes => 1,
resume_interval => 1_000 resume_interval => 1_000
} }
), ),
@ -2642,9 +2642,9 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
%% The default mode is currently `memory_only'. %% The default mode is currently `memory_only'.
t_volatile_offload_mode(_Config) -> t_volatile_offload_mode(_Config) ->
MaxQueueBytes = 1_000, MaxBufferBytes = 1_000,
DefaultOpts = #{ DefaultOpts = #{
max_queue_bytes => MaxQueueBytes, max_buffer_bytes => MaxBufferBytes,
worker_pool_size => 1 worker_pool_size => 1
}, },
?check_trace( ?check_trace(
@ -2659,7 +2659,7 @@ t_volatile_offload_mode(_Config) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource}, #{name => test_resource},
DefaultOpts#{queue_mode => volatile_offload} DefaultOpts#{buffer_mode => volatile_offload}
) )
), ),
?assertEqual(ok, emqx_resource:remove_local(?ID)), ?assertEqual(ok, emqx_resource:remove_local(?ID)),
@ -2673,8 +2673,8 @@ t_volatile_offload_mode(_Config) ->
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource}, #{name => test_resource},
DefaultOpts#{ DefaultOpts#{
queue_mode => volatile_offload, buffer_mode => volatile_offload,
queue_seg_bytes => MaxQueueBytes div 2 buffer_seg_bytes => MaxBufferBytes div 2
} }
) )
), ),
@ -2688,8 +2688,8 @@ t_volatile_offload_mode(_Config) ->
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource}, #{name => test_resource},
DefaultOpts#{ DefaultOpts#{
queue_mode => volatile_offload, buffer_mode => volatile_offload,
queue_seg_bytes => MaxQueueBytes buffer_seg_bytes => MaxBufferBytes
} }
) )
), ),
@ -2705,8 +2705,8 @@ t_volatile_offload_mode(_Config) ->
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource}, #{name => test_resource},
DefaultOpts#{ DefaultOpts#{
queue_mode => volatile_offload, buffer_mode => volatile_offload,
queue_seg_bytes => 2 * MaxQueueBytes buffer_seg_bytes => 2 * MaxBufferBytes
} }
) )
), ),
@ -2715,7 +2715,7 @@ t_volatile_offload_mode(_Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
HalfMaxQueueBytes = MaxQueueBytes div 2, HalfMaxBufferBytes = MaxBufferBytes div 2,
?assertMatch( ?assertMatch(
[ [
#{ #{
@ -2729,7 +2729,7 @@ t_volatile_offload_mode(_Config) ->
max_total_bytes := MaxTotalBytes, max_total_bytes := MaxTotalBytes,
%% uses the specified value since it's smaller %% uses the specified value since it's smaller
%% than max bytes. %% than max bytes.
seg_bytes := HalfMaxQueueBytes, seg_bytes := HalfMaxBufferBytes,
offload := {true, volatile} offload := {true, volatile}
}, },
#{ #{

View File

@ -63,7 +63,7 @@ values(_Method, Type) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}. }.

View File

@ -61,7 +61,7 @@ values(_Method, Type) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}. }.

View File

@ -56,7 +56,7 @@ values(_Method) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}. }.

View File

@ -57,7 +57,7 @@ values(_Method) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}. }.

View File

@ -59,7 +59,7 @@ values(_Method, Type) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}. }.

View File

@ -56,7 +56,7 @@ values(post) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}; };
values(put) -> values(put) ->

View File

@ -60,7 +60,7 @@ values(post) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async, query_mode => async,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}; };
values(put) -> values(put) ->

View File

@ -58,7 +58,7 @@ values(_Method) ->
batch_size => ?DEFAULT_BATCH_SIZE, batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
query_mode => sync, query_mode => sync,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
} }
}. }.

View File

@ -168,7 +168,7 @@ When disabled the messages are buffered in RAM only."""
} }
} }
max_queue_bytes { max_buffer_bytes {
desc { desc {
en: """Maximum number of bytes to buffer for each buffer worker.""" en: """Maximum number of bytes to buffer for each buffer worker."""
zh: """每个缓存 worker 允许使用的最大字节数。""" zh: """每个缓存 worker 允许使用的最大字节数。"""
@ -179,7 +179,7 @@ When disabled the messages are buffered in RAM only."""
} }
} }
queue_seg_bytes { buffer_seg_bytes {
desc { desc {
en: "Applicable when buffer mode is set to <code>volatile_offload</code>.\n" en: "Applicable when buffer mode is set to <code>volatile_offload</code>.\n"
"This value is to specify the size of each on-disk buffer file." "This value is to specify the size of each on-disk buffer file."
@ -192,9 +192,9 @@ When disabled the messages are buffered in RAM only."""
} }
} }
queue_mode { buffer_mode {
desc { desc {
en: "Queue operation mode.\n" en: "Buffer operation mode.\n"
"<code>memory_only</mode>: Buffer all messages in memory." "<code>memory_only</mode>: Buffer all messages in memory."
"<code>volatile_offload</code>: Buffer message in memory first, when up to certain limit" "<code>volatile_offload</code>: Buffer message in memory first, when up to certain limit"
" (see <code>buffer_seg_bytes</code> config for more information), then start offloading messages to disk" " (see <code>buffer_seg_bytes</code> config for more information), then start offloading messages to disk"

View File

@ -11,7 +11,7 @@ bridges {
batch_size = 100 batch_size = 100
batch_time = "10ms" batch_time = "10ms"
health_check_interval = "15s" health_check_interval = "15s"
max_queue_bytes = "1GB" max_buffer_bytes = "1GB"
query_mode = "sync" query_mode = "sync"
request_timeout = "15s" request_timeout = "15s"
start_after_created = "true" start_after_created = "true"