From 6903997b9440d8429306e0a97e1fa84a1b2b7838 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Mon, 6 Dec 2021 18:44:27 +0800
Subject: [PATCH] feat(connector): add new option 'mode' to mqtt connectors
---
apps/emqx_connector/etc/emqx_connector.conf | 2 +-
.../emqx_connector/src/emqx_connector_api.erl | 2 +-
.../src/emqx_connector_mqtt.erl | 3 +--
.../src/mqtt/emqx_connector_mqtt_schema.erl | 26 ++++++++++++++-----
4 files changed, 22 insertions(+), 11 deletions(-)
diff --git a/apps/emqx_connector/etc/emqx_connector.conf b/apps/emqx_connector/etc/emqx_connector.conf
index 06395ac94..8929598be 100644
--- a/apps/emqx_connector/etc/emqx_connector.conf
+++ b/apps/emqx_connector/etc/emqx_connector.conf
@@ -1,4 +1,5 @@
#connectors.mqtt.my_mqtt_connector {
+# mode = cluster_shareload
# server = "127.0.0.1:1883"
# proto_ver = "v4"
# username = "username1"
@@ -8,7 +9,6 @@
# retry_interval = "30s"
# max_inflight = 32
# reconnect_interval = "30s"
-# bridge_mode = true
# replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB"
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index fe28dde9d..cfe52d279 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -105,10 +105,10 @@ method_example(_Type, put) ->
info_example_basic(mqtt) ->
#{
+ mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"30s">>,
proto_ver => <<"v4">>,
- bridge_mode => true,
username => <<"foo">>,
password => <<"bar">>,
clientid => <<"foo">>,
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 2cce0d195..6bc609fa8 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -182,7 +182,6 @@ basic_config(#{
server := Server,
reconnect_interval := ReconnIntv,
proto_ver := ProtoVer,
- bridge_mode := BridgeMod,
username := User,
password := Password,
clean_start := CleanStart,
@@ -197,7 +196,7 @@ basic_config(#{
server => Server,
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
- bridge_mode => BridgeMod,
+ bridge_mode => true,
username => User,
password => Password,
clean_start => CleanStart,
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 415c6fa1a..2338129d1 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -8,7 +8,7 @@
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
+%% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
@@ -38,7 +38,24 @@ fields("config") ->
topic_mappings();
fields("connector") ->
- [ {server,
+ [ {mode,
+ sc(hoconsc:enum([cluster_singleton, cluster_shareload]),
+ #{ default => cluster_shareload
+ , desc => """
+The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'
+
+- cluster_singleton: create an unique MQTT connection within the emqx cluster.
+In 'cluster_singleton' node, all messages toward the remote broker go through the same
+MQTT connection.
+- cluster_shareload: create an MQTT connection on each node in the emqx cluster.
+In 'cluster_shareload' mode, the incomming load from the remote broker is shared by
+using shared subscription.
+Note that the 'clientid' is suffixed by the node name, this is to avoid
+clientid conflicts between different nodes. And we can only use shared subscription
+topic filters for 'from_remote_topic'.
+"""
+ })}
+ , {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker"
@@ -49,11 +66,6 @@ fields("connector") ->
#{ default => v4
, desc => "The MQTT protocol version"
})}
- , {bridge_mode,
- sc(boolean(),
- #{ default => true
- , desc => "The bridge mode of the MQTT protocol"
- })}
, {username,
sc(binary(),
#{ default => "emqx"