diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index b29cefacd..137cd1241 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -220,7 +220,7 @@ info_example_basic(webhook) -> auto_restart_interval => 15000, query_mode => async, inflight_window => 100, - max_queue_bytes => 100 * 1024 * 1024 + max_buffer_bytes => 100 * 1024 * 1024 } }; info_example_basic(mqtt) -> @@ -245,7 +245,7 @@ mqtt_main_example() -> health_check_interval => <<"15s">>, auto_restart_interval => <<"60s">>, query_mode => sync, - max_queue_bytes => 100 * 1024 * 1024 + max_buffer_bytes => 100 * 1024 * 1024 }, ssl => #{ enable => false diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index fe173fa89..595b75ecf 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -89,7 +89,7 @@ default_resource_opts() -> <<"inflight_window">> => 100, <<"auto_restart_interval">> => <<"60s">>, <<"health_check_interval">> => <<"15s">>, - <<"max_queue_bytes">> => <<"1GB">>, + <<"max_buffer_bytes">> => <<"1GB">>, <<"query_mode">> => <<"sync">>, %% there is only one underlying MQTT connection %% doesn't make a lot of sense to have a large pool diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index e222190d2..f08c87b6e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -175,7 +175,7 @@ bridge_async_config(#{port := Port} = Config) -> " inflight_window = 100\n" " auto_restart_interval = \"60s\"\n" " health_check_interval = \"15s\"\n" - " max_queue_bytes = \"1GB\"\n" + " max_buffer_bytes = \"1GB\"\n" " query_mode = \"~s\"\n" " request_timeout = \"~s\"\n" " start_after_created = \"true\"\n" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index f8d671b40..91572eac3 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -70,7 +70,7 @@ auto_restart_interval => pos_integer(), batch_size => pos_integer(), batch_time => pos_integer(), - max_queue_bytes => pos_integer(), + max_buffer_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), inflight_window => pos_integer() @@ -85,8 +85,8 @@ -define(WORKER_POOL_SIZE, 16). --define(DEFAULT_QUEUE_SIZE, 256 * 1024 * 1024). --define(DEFAULT_QUEUE_SIZE_RAW, <<"256MB">>). +-define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). +-define(DEFAULT_BUFFER_BYTES_RAW, <<"256MB">>). -define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index c53627ca4..a77335140 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1667,9 +1667,9 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> BatchTime. replayq_opts(Id, Index, Opts) -> - QueueMode = maps:get(queue_mode, Opts, memory_only), - TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), - case QueueMode of + BufferMode = maps:get(buffer_mode, Opts, memory_only), + TotalBytes = maps:get(max_buffer_bytes, Opts, ?DEFAULT_BUFFER_BYTES), + case BufferMode of memory_only -> #{ mem_only => true, @@ -1678,7 +1678,7 @@ replayq_opts(Id, Index, Opts) -> sizer => fun ?MODULE:estimate_size/1 }; volatile_offload -> - SegBytes0 = maps:get(queue_seg_bytes, Opts, TotalBytes), + SegBytes0 = maps:get(buffer_seg_bytes, Opts, TotalBytes), SegBytes = min(SegBytes0, TotalBytes), #{ dir => disk_queue_dir(Id, Index), diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 9116927fa..3b4fb66e5 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -40,7 +40,7 @@ fields("resource_opts") -> ]; fields("creation_opts") -> [ - {queue_mode, fun queue_mode/1}, + {buffer_mode, fun buffer_mode/1}, {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, {resume_interval, fun resume_interval/1}, @@ -54,8 +54,8 @@ fields("creation_opts") -> {batch_size, fun batch_size/1}, {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, - {max_queue_bytes, fun max_queue_bytes/1}, - {queue_seg_bytes, fun queue_seg_bytes/1} + {max_buffer_bytes, fun max_buffer_bytes/1}, + {buffer_seg_bytes, fun buffer_seg_bytes/1} ]. resource_opts_meta() -> @@ -145,23 +145,24 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW; batch_time(required) -> false; batch_time(_) -> undefined. -max_queue_bytes(type) -> emqx_schema:bytesize(); -max_queue_bytes(desc) -> ?DESC("max_queue_bytes"); -max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; -max_queue_bytes(required) -> false; -max_queue_bytes(_) -> undefined. +max_buffer_bytes(type) -> emqx_schema:bytesize(); +max_buffer_bytes(aliases) -> [max_queue_bytes]; +max_buffer_bytes(desc) -> ?DESC("max_buffer_bytes"); +max_buffer_bytes(default) -> ?DEFAULT_BUFFER_BYTES_RAW; +max_buffer_bytes(required) -> false; +max_buffer_bytes(_) -> undefined. -queue_mode(type) -> enum([memory_only, volatile_offload]); -queue_mode(desc) -> ?DESC("queue_mode"); -queue_mode(default) -> memory_only; -queue_mode(required) -> false; -queue_mode(importance) -> ?IMPORTANCE_HIDDEN; -queue_mode(_) -> undefined. +buffer_mode(type) -> enum([memory_only, volatile_offload]); +buffer_mode(desc) -> ?DESC("buffer_mode"); +buffer_mode(default) -> memory_only; +buffer_mode(required) -> false; +buffer_mode(importance) -> ?IMPORTANCE_HIDDEN; +buffer_mode(_) -> undefined. -queue_seg_bytes(type) -> emqx_schema:bytesize(); -queue_seg_bytes(desc) -> ?DESC("queue_seg_bytes"); -queue_seg_bytes(required) -> false; -queue_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN; -queue_seg_bytes(_) -> undefined. +buffer_seg_bytes(type) -> emqx_schema:bytesize(); +buffer_seg_bytes(desc) -> ?DESC("buffer_seg_bytes"); +buffer_seg_bytes(required) -> false; +buffer_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN; +buffer_seg_bytes(_) -> undefined. desc("creation_opts") -> ?DESC("creation_opts"). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 087a39eee..385b4cb91 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1314,8 +1314,8 @@ t_delete_and_re_create_with_same_name(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => NumBufferWorkers, - queue_mode => volatile_offload, - queue_seg_bytes => 100, + buffer_mode => volatile_offload, + buffer_seg_bytes => 100, resume_interval => 1_000 } ), @@ -1374,7 +1374,7 @@ t_delete_and_re_create_with_same_name(_Config) -> query_mode => async, batch_size => 1, worker_pool_size => 2, - queue_seg_bytes => 100, + buffer_seg_bytes => 100, resume_interval => 1_000 } ), @@ -1406,7 +1406,7 @@ t_always_overflow(_Config) -> query_mode => sync, batch_size => 1, worker_pool_size => 1, - max_queue_bytes => 1, + max_buffer_bytes => 1, resume_interval => 1_000 } ), @@ -2642,9 +2642,9 @@ t_call_mode_uncoupled_from_query_mode(_Config) -> %% The default mode is currently `memory_only'. t_volatile_offload_mode(_Config) -> - MaxQueueBytes = 1_000, + MaxBufferBytes = 1_000, DefaultOpts = #{ - max_queue_bytes => MaxQueueBytes, + max_buffer_bytes => MaxBufferBytes, worker_pool_size => 1 }, ?check_trace( @@ -2659,7 +2659,7 @@ t_volatile_offload_mode(_Config) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}, - DefaultOpts#{queue_mode => volatile_offload} + DefaultOpts#{buffer_mode => volatile_offload} ) ), ?assertEqual(ok, emqx_resource:remove_local(?ID)), @@ -2673,8 +2673,8 @@ t_volatile_offload_mode(_Config) -> ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ - queue_mode => volatile_offload, - queue_seg_bytes => MaxQueueBytes div 2 + buffer_mode => volatile_offload, + buffer_seg_bytes => MaxBufferBytes div 2 } ) ), @@ -2688,8 +2688,8 @@ t_volatile_offload_mode(_Config) -> ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ - queue_mode => volatile_offload, - queue_seg_bytes => MaxQueueBytes + buffer_mode => volatile_offload, + buffer_seg_bytes => MaxBufferBytes } ) ), @@ -2705,8 +2705,8 @@ t_volatile_offload_mode(_Config) -> ?TEST_RESOURCE, #{name => test_resource}, DefaultOpts#{ - queue_mode => volatile_offload, - queue_seg_bytes => 2 * MaxQueueBytes + buffer_mode => volatile_offload, + buffer_seg_bytes => 2 * MaxBufferBytes } ) ), @@ -2715,7 +2715,7 @@ t_volatile_offload_mode(_Config) -> ok end, fun(Trace) -> - HalfMaxQueueBytes = MaxQueueBytes div 2, + HalfMaxBufferBytes = MaxBufferBytes div 2, ?assertMatch( [ #{ @@ -2729,7 +2729,7 @@ t_volatile_offload_mode(_Config) -> max_total_bytes := MaxTotalBytes, %% uses the specified value since it's smaller %% than max bytes. - seg_bytes := HalfMaxQueueBytes, + seg_bytes := HalfMaxBufferBytes, offload := {true, volatile} }, #{ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl index 78db8352a..26c6de04d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_cassa.erl @@ -63,7 +63,7 @@ values(_Method, Type) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl index 0b611c142..56671c586 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl @@ -61,7 +61,7 @@ values(_Method, Type) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl index e6a3d1a58..ba1fd0c70 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl @@ -56,7 +56,7 @@ values(_Method) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index f3ed44247..7914c77e2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -57,7 +57,7 @@ values(_Method) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index 958bc3449..a5dcb19e6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -59,7 +59,7 @@ values(_Method, Type) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl index 78fd527d3..28b94a1a4 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl @@ -56,7 +56,7 @@ values(post) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }; values(put) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl index 49a5ed0ce..e216299c2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sqlserver.erl @@ -60,7 +60,7 @@ values(post) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }; values(put) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl index 7a958d45f..54406541d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl @@ -58,7 +58,7 @@ values(_Method) -> batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => sync, - max_queue_bytes => ?DEFAULT_QUEUE_SIZE + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }. diff --git a/rel/i18n/emqx_resource_schema.hocon b/rel/i18n/emqx_resource_schema.hocon index 885a6c4dd..031a5b412 100644 --- a/rel/i18n/emqx_resource_schema.hocon +++ b/rel/i18n/emqx_resource_schema.hocon @@ -168,7 +168,7 @@ When disabled the messages are buffered in RAM only.""" } } - max_queue_bytes { + max_buffer_bytes { desc { en: """Maximum number of bytes to buffer for each buffer worker.""" zh: """每个缓存 worker 允许使用的最大字节数。""" @@ -179,7 +179,7 @@ When disabled the messages are buffered in RAM only.""" } } - queue_seg_bytes { + buffer_seg_bytes { desc { en: "Applicable when buffer mode is set to volatile_offload.\n" "This value is to specify the size of each on-disk buffer file." @@ -192,9 +192,9 @@ When disabled the messages are buffered in RAM only.""" } } - queue_mode { + buffer_mode { desc { - en: "Queue operation mode.\n" + en: "Buffer operation mode.\n" "memory_only: Buffer all messages in memory." "volatile_offload: Buffer message in memory first, when up to certain limit" " (see buffer_seg_bytes config for more information), then start offloading messages to disk" diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index 0416e42b6..0574ac38a 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -11,7 +11,7 @@ bridges { batch_size = 100 batch_time = "10ms" health_check_interval = "15s" - max_queue_bytes = "1GB" + max_buffer_bytes = "1GB" query_mode = "sync" request_timeout = "15s" start_after_created = "true"