feat(connector): add new option 'mode' to mqtt connectors

This commit is contained in:
Shawn 2021-12-06 18:44:27 +08:00
parent 9b4fe87ed0
commit 6903997b94
4 changed files with 22 additions and 11 deletions

View File

@ -1,4 +1,5 @@
#connectors.mqtt.my_mqtt_connector { #connectors.mqtt.my_mqtt_connector {
# mode = cluster_shareload
# server = "127.0.0.1:1883" # server = "127.0.0.1:1883"
# proto_ver = "v4" # proto_ver = "v4"
# username = "username1" # username = "username1"
@ -8,7 +9,6 @@
# retry_interval = "30s" # retry_interval = "30s"
# max_inflight = 32 # max_inflight = 32
# reconnect_interval = "30s" # reconnect_interval = "30s"
# bridge_mode = true
# replayq { # replayq {
# dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/" # dir = "{{ platform_data_dir }}/replayq/bridge_mqtt/"
# seg_bytes = "100MB" # seg_bytes = "100MB"

View File

@ -105,10 +105,10 @@ method_example(_Type, put) ->
info_example_basic(mqtt) -> info_example_basic(mqtt) ->
#{ #{
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>, server => <<"127.0.0.1:1883">>,
reconnect_interval => <<"30s">>, reconnect_interval => <<"30s">>,
proto_ver => <<"v4">>, proto_ver => <<"v4">>,
bridge_mode => true,
username => <<"foo">>, username => <<"foo">>,
password => <<"bar">>, password => <<"bar">>,
clientid => <<"foo">>, clientid => <<"foo">>,

View File

@ -182,7 +182,6 @@ basic_config(#{
server := Server, server := Server,
reconnect_interval := ReconnIntv, reconnect_interval := ReconnIntv,
proto_ver := ProtoVer, proto_ver := ProtoVer,
bridge_mode := BridgeMod,
username := User, username := User,
password := Password, password := Password,
clean_start := CleanStart, clean_start := CleanStart,
@ -197,7 +196,7 @@ basic_config(#{
server => Server, server => Server,
reconnect_interval => ReconnIntv, reconnect_interval => ReconnIntv,
proto_ver => ProtoVer, proto_ver => ProtoVer,
bridge_mode => BridgeMod, bridge_mode => true,
username => User, username => User,
password => Password, password => Password,
clean_start => CleanStart, clean_start => CleanStart,

View File

@ -8,7 +8,7 @@
%% http://www.apache.org/licenses/LICENSE-2.0 %% http://www.apache.org/licenses/LICENSE-2.0
%% %%
%% Unless required by applicable law or agreed to in writing, software %% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, %% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
@ -38,7 +38,24 @@ fields("config") ->
topic_mappings(); topic_mappings();
fields("connector") -> fields("connector") ->
[ {server, [ {mode,
sc(hoconsc:enum([cluster_singleton, cluster_shareload]),
#{ default => cluster_shareload
, desc => """
The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'<br>
- cluster_singleton: create an unique MQTT connection within the emqx cluster.<br>
In 'cluster_singleton' node, all messages toward the remote broker go through the same
MQTT connection.<br>
- cluster_shareload: create an MQTT connection on each node in the emqx cluster.<br>
In 'cluster_shareload' mode, the incomming load from the remote broker is shared by
using shared subscription.<br>
Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription
topic filters for 'from_remote_topic'.
"""
})}
, {server,
sc(emqx_schema:ip_port(), sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883" #{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker" , desc => "The host and port of the remote MQTT broker"
@ -49,11 +66,6 @@ fields("connector") ->
#{ default => v4 #{ default => v4
, desc => "The MQTT protocol version" , desc => "The MQTT protocol version"
})} })}
, {bridge_mode,
sc(boolean(),
#{ default => true
, desc => "The bridge mode of the MQTT protocol"
})}
, {username, , {username,
sc(binary(), sc(binary(),
#{ default => "emqx" #{ default => "emqx"