From f611cbab45245e1756fc000a11ceecacae5a649a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 23:16:05 +0100 Subject: [PATCH 1/4] chore: cap replayq seg size under total size --- apps/emqx_resource/src/emqx_resource_worker.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 6722e1b43..482c82f6a 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -117,13 +117,16 @@ init({Id, Index, Opts}) -> true = gproc_pool:connect_worker(Id, {Id, Index}), Name = name(Id, Index), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), + SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), + TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + SegBytes = min(SegBytes0, TotalBytes), Queue = case maps:get(enable_queue, Opts, false) of true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), - seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), - max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + seg_bytes => SegBytes, + max_total_bytes => TotalBytes, sizer => fun ?MODULE:estimate_size/1, marshaller => fun ?MODULE:queue_item_marshaller/1 }); From c085ffa0fe959723182ddf95dcecee5de250872f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 23:59:46 +0100 Subject: [PATCH 2/4] refactor: default mqtt bridgge buffer pool size down to 4 --- apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl index 997337c9d..41601ee1e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl @@ -86,7 +86,9 @@ default_resource_opts() -> <<"health_check_interval">> => <<"15s">>, <<"max_queue_bytes">> => <<"1GB">>, <<"query_mode">> => <<"sync">>, - <<"worker_pool_size">> => 16 + %% there is only one underlying MQTT connection + %% doesn't make a lot of sense to have a large pool + <<"worker_pool_size">> => 4 }. egress(Config) -> From da51433dc3b200616ff016aa01a79855e5a69e9f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 20 Dec 2022 00:00:31 +0100 Subject: [PATCH 3/4] refactor: add default value for eggress remote 'qos' and 'retain' otherwise when updating from dashboard, there is no way to set 'false' for 'retain' because it's a checkbox, it's either 'true' or 'undefined' --- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 93bd846e4..4c6d9cb84 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -255,7 +255,8 @@ fields("egress_remote") -> mk( qos(), #{ - required => true, + required => false, + default => 1, desc => ?DESC("egress_remote_qos") } )}, @@ -263,7 +264,8 @@ fields("egress_remote") -> mk( hoconsc:union([boolean(), binary()]), #{ - required => true, + required => false, + default => false, desc => ?DESC("retain") } )}, From 479e191dcf0fb605c9750c3d402de83e851b6c4d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 20 Dec 2022 00:02:36 +0100 Subject: [PATCH 4/4] refactor: refine worker pool config and doc worker pool is a buffer pool the description hinted connection pool which is wrong. --- .../i18n/emqx_resource_schema_i18n.conf | 27 ++++++++++--------- apps/emqx_resource/include/emqx_resource.hrl | 6 ++--- .../src/schema/emqx_resource_schema.erl | 2 +- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index d7953ac3b..332dfdd8c 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -24,12 +24,13 @@ emqx_resource_schema { worker_pool_size { desc { - en: """Resource worker pool size.""" - zh: """资源连接池大小。""" + en: """The number of buffer workers. Only applicable for egress type bridges. +For bridges only have ingress direction data flow, it can be set to 0 otherwise must be greater than 0.""" + zh: """缓存队列 worker 数量。仅对 egress 类型的桥接有意义。当桥接仅有 ingress 方向时,可设置为 0,否则必须大于 0)。""" } label { - en: """Worker Pool Size""" - zh: """资源连接池大小""" + en: """Buffer Pool Size""" + zh: """缓存池大小""" } } @@ -101,12 +102,14 @@ emqx_resource_schema { enable_queue { desc { - en: """Queue mode enabled.""" - zh: """启用队列模式。""" + en: """Enable disk buffer queue (only applicable for egress bridges). +When Enabled, messages will be buffered on disk when the bridge connection is down. +When disabled the messages are buffered in RAM only.""" + zh: """启用磁盘缓存队列(仅对 egress 方向桥接有用)。""" } label { - en: """Enable queue""" - zh: """启用队列模式""" + en: """Enable disk buffer queue""" + zh: """启用磁盘缓存队列""" } } @@ -145,12 +148,12 @@ emqx_resource_schema { max_queue_bytes { desc { - en: """Maximum queue storage.""" - zh: """消息队列的最大长度。""" + en: """Maximum number of bytes to buffer for each buffer worker.""" + zh: """每个缓存 worker 允许使用的最大字节数。""" } label { - en: """Queue max bytes""" - zh: """队列最大长度""" + en: """Max buffer queue size""" + zh: """缓存队列最大长度""" } } diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 02b1208b7..051b57e31 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -43,15 +43,15 @@ }. -type resource_group() :: binary(). -type creation_opts() :: #{ - %%======================================= Deprecated Opts: + %%======================================= Deprecated Opts BEGIN %% use health_check_interval instead health_check_timeout => integer(), %% use start_timeout instead wait_for_resource_ready => integer(), %% use auto_restart_interval instead auto_retry_interval => integer(), - %%======================================= Deprecated Opts End - worker_pool_size => pos_integer(), + %%======================================= Deprecated Opts END + worker_pool_size => non_neg_integer(), %% use `integer()` compatibility to release 5.0.0 bpapi health_check_interval => integer(), %% We can choose to block the return of emqx_resource:start until diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index c666974b1..465a935c3 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -56,7 +56,7 @@ fields("creation_opts") -> {max_queue_bytes, fun max_queue_bytes/1} ]. -worker_pool_size(type) -> pos_integer(); +worker_pool_size(type) -> non_neg_integer(); worker_pool_size(desc) -> ?DESC("worker_pool_size"); worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false;