diff --git a/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf b/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf index 93f0f5579..023f986fc 100644 --- a/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf +++ b/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf @@ -149,9 +149,10 @@ bridge.mqtt.aws.retry_interval = 20s bridge.mqtt.aws.batch_size = 32 ## Inflight size. +## 0 means infinity (no limit on the inflight window) ## ## Value: Integer -bridge.mqtt.aws.max_inflight_size = 32 +bridge.mqtt.aws.max_inflight = 32 ## Base directory for replayq to store messages on disk ## If this config entry is missing or set to undefined, diff --git a/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema b/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema index 9d5f3fe29..05c048d0a 100644 --- a/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema +++ b/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema @@ -103,7 +103,7 @@ {datatype, {duration, ms}} ]}. -{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [ +{mapping, "bridge.mqtt.$name.max_inflight", "emqx_bridge_mqtt.bridges", [ {default, 0}, {datatype, integer} ]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index c5f688753..3f63cdb46 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -92,7 +92,8 @@ ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> Error -> Error end; ensure_subscribed(_Conn, _Topic, _QoS) -> - %% return ok for now, next re-connect should should call start with new topic added to config + %% return ok for now + %% next re-connect should should call start with new topic added to config ok. ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> @@ -101,7 +102,8 @@ ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> Error -> Error end; ensure_unsubscribed(_, _) -> - %% return ok for now, next re-connect should should call start with this topic deleted from config + %% return ok for now + %% next re-connect should should call start with this topic deleted from config ok. safe_stop(Pid, StopF, Timeout) -> @@ -172,7 +174,7 @@ subscribe_remote_topics(ClientPid, Subscriptions) -> %%-------------------------------------------------------------------- replvar(Options) -> - replvar([clientid], Options). + replvar([clientid, max_inflight], Options). replvar([], Options) -> Options; @@ -186,5 +188,11 @@ replvar([Key|More], Options) -> %% ${node} => node() feedvar(clientid, ClientId, _) -> - iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))). + iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))); + +feedvar(max_inflight, 0, _) -> + infinity; + +feedvar(max_inflight, Size, _) -> + Size. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 81954c62b..29f7d31ef 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -251,7 +251,7 @@ init_opts(Config) -> BridgeHandler = maps:get(bridge_handler, Config, ?NO_BRIDGE_HANDLER), Mountpoint = maps:get(forward_mountpoint, Config, undefined), ReceiveMountpoint = maps:get(receive_mountpoint, Config, undefined), - MaxInflightSize = maps:get(max_inflight_batches, Config, ?DEFAULT_BATCH_SIZE), + MaxInflightSize = maps:get(max_inflight, Config, ?DEFAULT_BATCH_SIZE), BatchSize = maps:get(batch_size, Config, ?DEFAULT_BATCH_SIZE), Name = maps:get(name, Config, undefined), #{start_type => StartType,