Merge pull request #9581 from zmstone/1219-fix-mqtt-bridge-config

1219 fix mqtt bridge config
This commit is contained in:
Zaiming (Stone) Shi 2022-12-20 14:47:02 +01:00 committed by GitHub
commit 516147ad00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 31 additions and 21 deletions

View File

@ -86,7 +86,9 @@ default_resource_opts() ->
<<"health_check_interval">> => <<"15s">>, <<"health_check_interval">> => <<"15s">>,
<<"max_queue_bytes">> => <<"1GB">>, <<"max_queue_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
<<"worker_pool_size">> => 16 %% there is only one underlying MQTT connection
%% doesn't make a lot of sense to have a large pool
<<"worker_pool_size">> => 4
}. }.
egress(Config) -> egress(Config) ->

View File

@ -255,7 +255,8 @@ fields("egress_remote") ->
mk( mk(
qos(), qos(),
#{ #{
required => true, required => false,
default => 1,
desc => ?DESC("egress_remote_qos") desc => ?DESC("egress_remote_qos")
} }
)}, )},
@ -263,7 +264,8 @@ fields("egress_remote") ->
mk( mk(
hoconsc:union([boolean(), binary()]), hoconsc:union([boolean(), binary()]),
#{ #{
required => true, required => false,
default => false,
desc => ?DESC("retain") desc => ?DESC("retain")
} }
)}, )},

View File

@ -24,12 +24,13 @@ emqx_resource_schema {
worker_pool_size { worker_pool_size {
desc { desc {
en: """Resource worker pool size.""" en: """The number of buffer workers. Only applicable for egress type bridges.
zh: """资源连接池大小。""" For bridges only have ingress direction data flow, it can be set to 0 otherwise must be greater than 0."""
zh: """缓存队列 worker 数量。仅对 egress 类型的桥接有意义。当桥接仅有 ingress 方向时,可设置为 0否则必须大于 0。"""
} }
label { label {
en: """Worker Pool Size""" en: """Buffer Pool Size"""
zh: """资源连接池大小""" zh: """缓存池大小"""
} }
} }
@ -101,12 +102,14 @@ emqx_resource_schema {
enable_queue { enable_queue {
desc { desc {
en: """Queue mode enabled.""" en: """Enable disk buffer queue (only applicable for egress bridges).
zh: """启用队列模式。""" When Enabled, messages will be buffered on disk when the bridge connection is down.
When disabled the messages are buffered in RAM only."""
zh: """启用磁盘缓存队列(仅对 egress 方向桥接有用)。"""
} }
label { label {
en: """Enable queue""" en: """Enable disk buffer queue"""
zh: """启用队列模式""" zh: """启用磁盘缓存队列"""
} }
} }
@ -145,12 +148,12 @@ emqx_resource_schema {
max_queue_bytes { max_queue_bytes {
desc { desc {
en: """Maximum queue storage.""" en: """Maximum number of bytes to buffer for each buffer worker."""
zh: """消息队列的最大长度。""" zh: """每个缓存 worker 允许使用的最大字节数。"""
} }
label { label {
en: """Queue max bytes""" en: """Max buffer queue size"""
zh: """队列最大长度""" zh: """缓存队列最大长度"""
} }
} }

View File

@ -43,15 +43,15 @@
}. }.
-type resource_group() :: binary(). -type resource_group() :: binary().
-type creation_opts() :: #{ -type creation_opts() :: #{
%%======================================= Deprecated Opts: %%======================================= Deprecated Opts BEGIN
%% use health_check_interval instead %% use health_check_interval instead
health_check_timeout => integer(), health_check_timeout => integer(),
%% use start_timeout instead %% use start_timeout instead
wait_for_resource_ready => integer(), wait_for_resource_ready => integer(),
%% use auto_restart_interval instead %% use auto_restart_interval instead
auto_retry_interval => integer(), auto_retry_interval => integer(),
%%======================================= Deprecated Opts End %%======================================= Deprecated Opts END
worker_pool_size => pos_integer(), worker_pool_size => non_neg_integer(),
%% use `integer()` compatibility to release 5.0.0 bpapi %% use `integer()` compatibility to release 5.0.0 bpapi
health_check_interval => integer(), health_check_interval => integer(),
%% We can choose to block the return of emqx_resource:start until %% We can choose to block the return of emqx_resource:start until

View File

@ -117,13 +117,16 @@ init({Id, Index, Opts}) ->
true = gproc_pool:connect_worker(Id, {Id, Index}), true = gproc_pool:connect_worker(Id, {Id, Index}),
Name = name(Id, Index), Name = name(Id, Index),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
SegBytes = min(SegBytes0, TotalBytes),
Queue = Queue =
case maps:get(enable_queue, Opts, false) of case maps:get(enable_queue, Opts, false) of
true -> true ->
replayq:open(#{ replayq:open(#{
dir => disk_queue_dir(Id, Index), dir => disk_queue_dir(Id, Index),
seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), seg_bytes => SegBytes,
max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), max_total_bytes => TotalBytes,
sizer => fun ?MODULE:estimate_size/1, sizer => fun ?MODULE:estimate_size/1,
marshaller => fun ?MODULE:queue_item_marshaller/1 marshaller => fun ?MODULE:queue_item_marshaller/1
}); });

View File

@ -56,7 +56,7 @@ fields("creation_opts") ->
{max_queue_bytes, fun max_queue_bytes/1} {max_queue_bytes, fun max_queue_bytes/1}
]. ].
worker_pool_size(type) -> pos_integer(); worker_pool_size(type) -> non_neg_integer();
worker_pool_size(desc) -> ?DESC("worker_pool_size"); worker_pool_size(desc) -> ?DESC("worker_pool_size");
worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(default) -> ?WORKER_POOL_SIZE;
worker_pool_size(required) -> false; worker_pool_size(required) -> false;