From 2fb5dbd54688e25561b3a25af9230f647a89f943 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 2 Mar 2021 17:35:13 +0800 Subject: [PATCH] revert(bridge-mqtt): remove the mqtt_sub resource We have several reasons to remove this feature: 1. The design does not make sense. A rule engine resource should not have an impact on the system's messages directly after it is created. This mqtt_sub actually conflicts with any design concept of the rules engine. 2. The implementation is incorrect. mqtt_sub uses a client pool to establish a subscription relationship to an MQTT Broker. This causes a message to be sent repeatedly to EMQ X. Unless a shared subscription is used, or a Pool Size of 1 is configured. 3. The emqx-bridge-mqtt supports all the features of mqtt_sub. This feature introduced by https://github.com/emqx/emqx-bridge-mqtt/pull/78. And it released to v4.2.0 (NOT WORK), v4.2.1-v4.2.7 (FIXED) --- .../src/emqx_bridge_mqtt_actions.erl | 221 ++---------------- 1 file changed, 14 insertions(+), 207 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 0dccbd353..cbd7f28ed 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -39,7 +39,6 @@ ]). -define(RESOURCE_TYPE_MQTT, 'bridge_mqtt'). --define(RESOURCE_TYPE_MQTT_SUB, 'bridge_mqtt_sub'). -define(RESOURCE_TYPE_RPC, 'bridge_rpc'). -define(RESOURCE_CONFIG_SPEC_MQTT, #{ @@ -111,7 +110,7 @@ zh => <<"桥接挂载点"/utf8>>}, description => #{ en => <<"MountPoint for bridge topic:
" - "Example: The topic of messages sent to `topic1` on local node" + "Example: The topic of messages sent to `topic1` on local node " "will be transformed to `bridge/aws/${node}/topic1`">>, zh => <<"桥接主题的挂载点:
" "示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题" @@ -126,8 +125,8 @@ enum => [<<"on">>, <<"off">>], title => #{en => <<"Disk Cache">>, zh => <<"磁盘缓存"/utf8>>}, - description => #{en => <<"The flag which determines whether messages" - "can be cached on local disk when bridge is" + description => #{en => <<"The flag which determines whether messages " + "can be cached on local disk when bridge is " "disconnected">>, zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁" "盘队列上"/utf8>>} @@ -244,181 +243,6 @@ } }). --define(RESOURCE_CONFIG_SPEC_MQTT_SUB, #{ - address => #{ - order => 1, - type => string, - required => true, - default => <<"127.0.0.1:1883">>, - title => #{en => <<" Broker Address">>, - zh => <<"远程 broker 地址"/utf8>>}, - description => #{en => <<"The MQTT Remote Address">>, - zh => <<"远程 MQTT Broker 的地址"/utf8>>} - }, - pool_size => #{ - order => 2, - type => number, - required => true, - default => 8, - title => #{en => <<"Pool Size">>, - zh => <<"连接池大小"/utf8>>}, - description => #{en => <<"MQTT Connection Pool Size">>, - zh => <<"连接池大小"/utf8>>} - }, - clientid => #{ - order => 3, - type => string, - required => true, - default => <<"client">>, - title => #{en => <<"ClientId">>, - zh => <<"客户端 Id"/utf8>>}, - description => #{en => <<"ClientId for connecting to remote MQTT broker">>, - zh => <<"连接远程 Broker 的 ClientId"/utf8>>} - }, - append => #{ - order => 4, - type => boolean, - required => true, - default => true, - title => #{en => <<"Append GUID">>, - zh => <<"附加 GUID"/utf8>>}, - description => #{en => <<"Append GUID to MQTT ClientId?">>, - zh => <<"是否将GUID附加到 MQTT ClientId 后"/utf8>>} - }, - username => #{ - order => 5, - type => string, - required => false, - default => <<"">>, - title => #{en => <<"Username">>, zh => <<"用户名"/utf8>>}, - description => #{en => <<"Username for connecting to remote MQTT Broker">>, - zh => <<"连接远程 Broker 的用户名"/utf8>>} - }, - password => #{ - order => 6, - type => password, - required => false, - default => <<"">>, - title => #{en => <<"Password">>, - zh => <<"密码"/utf8>>}, - description => #{en => <<"Password for connecting to remote MQTT Broker">>, - zh => <<"连接远程 Broker 的密码"/utf8>>} - }, - subscription_opts => #{ - order => 7, - type => array, - items => #{ - type => object, - schema => #{ - topic => #{ - order => 1, - type => string, - default => <<>>, - title => #{en => <<"MQTT Topic">>, - zh => <<"MQTT 主题"/utf8>>}, - description => #{en => <<"MQTT Topic">>, - zh => <<"MQTT 主题"/utf8>>} - }, - qos => #{ - order => 2, - type => number, - enum => [0, 1, 2], - default => 0, - title => #{en => <<"MQTT Topic QoS">>, - zh => <<"MQTT 服务质量"/utf8>>}, - description => #{en => <<"MQTT Topic QoS">>, - zh => <<"MQTT 服务质量"/utf8>>} - } - } - }, - default => [], - title => #{en => <<"Subscription Opts">>, - zh => <<"订阅选项"/utf8>>}, - description => #{en => <<"Subscription Opts">>, - zh => <<"订阅选项"/utf8>>} - }, - proto_ver => #{ - order => 8, - type => string, - required => false, - default => <<"mqttv4">>, - enum => [<<"mqttv3">>, <<"mqttv4">>, <<"mqttv5">>], - title => #{en => <<"Protocol Version">>, - zh => <<"协议版本"/utf8>>}, - description => #{en => <<"MQTTT Protocol version">>, - zh => <<"MQTT 协议版本"/utf8>>} - }, - keepalive => #{ - order => 9, - type => string, - required => false, - default => <<"60s">> , - title => #{en => <<"Keepalive">>, - zh => <<"心跳间隔"/utf8>>}, - description => #{en => <<"Keepalive">>, - zh => <<"心跳间隔"/utf8>>} - }, - reconnect_interval => #{ - order => 10, - type => string, - required => false, - default => <<"30s">>, - title => #{en => <<"Reconnect Interval">>, - zh => <<"重连间隔"/utf8>>}, - description => #{en => <<"Reconnect interval of bridge">>, - zh => <<"重连间隔"/utf8>>} - }, - ssl => #{ - order => 11, - type => boolean, - default => false, - title => #{en => <<"Bridge SSL">>, - zh => <<"Bridge SSL"/utf8>>}, - description => #{en => <<"Switch which used to enable ssl connection of the bridge">>, - zh => <<"是否启用 Bridge SSL 连接"/utf8>>} - }, - cacertfile => #{ - order => 12, - type => file, - required => false, - default => <<"etc/certs/cacert.pem">>, - title => #{en => <<"CA certificates">>, - zh => <<"CA 证书"/utf8>>}, - description => #{en => <<"The file path of the CA certificates">>, - zh => <<"CA 证书路径"/utf8>>} - }, - certfile => #{ - order => 13, - type => file, - required => false, - default => <<"etc/certs/client-cert.pem">>, - title => #{en => <<"SSL Certfile">>, - zh => <<"SSL 客户端证书"/utf8>>}, - description => #{en => <<"The file path of the client certfile">>, - zh => <<"客户端证书路径"/utf8>>} - }, - keyfile => #{ - order => 14, - type => file, - required => false, - default => <<"etc/certs/client-key.pem">>, - title => #{en => <<"SSL Keyfile">>, - zh => <<"SSL 密钥文件"/utf8>>}, - description => #{en => <<"The file path of the client keyfile">>, - zh => <<"客户端密钥路径"/utf8>>} - }, - ciphers => #{ - order => 15, - type => string, - required => false, - default => <<"ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384">>, - title => #{en => <<"SSL Ciphers">>, - zh => <<"SSL 加密算法"/utf8>>}, - description => #{en => <<"SSL Ciphers">>, - zh => <<"SSL 加密算法"/utf8>>} - } - }). - -define(RESOURCE_CONFIG_SPEC_RPC, #{ address => #{ order => 1, @@ -438,7 +262,7 @@ title => #{en => <<"Bridge MountPoint">>, zh => <<"桥接挂载点"/utf8>>}, description => #{en => <<"MountPoint for bridge topic
" - "Example: The topic of messages sent to `topic1` on local node" + "Example: The topic of messages sent to `topic1` on local node " "will be transformed to `bridge/aws/${node}/topic1`">>, zh => <<"桥接主题的挂载点
" "示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题" @@ -482,8 +306,8 @@ enum => [<<"on">>, <<"off">>], title => #{en => <<"Disk Cache">>, zh => <<"磁盘缓存"/utf8>>}, - description => #{en => <<"The flag which determines whether messages" - "can be cached on local disk when bridge is" + description => #{en => <<"The flag which determines whether messages " + "can be cached on local disk when bridge is " "disconnected">>, zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁" "盘队列上"/utf8>>} @@ -508,15 +332,6 @@ description => #{en => <<"MQTT Message Bridge">>, zh => <<"MQTT 消息桥接"/utf8>>} }). --resource_type(#{ - name => ?RESOURCE_TYPE_MQTT_SUB, - create => on_resource_create, - status => on_get_resource_status, - destroy => on_resource_destroy, - params => ?RESOURCE_CONFIG_SPEC_MQTT_SUB, - title => #{en => <<"MQTT Subscribe">>, zh => <<"MQTT Subscribe"/utf8>>}, - description => #{en => <<"MQTT Subscribe">>, zh => <<"MQTT 订阅消息"/utf8>>} - }). -resource_type(#{ name => ?RESOURCE_TYPE_RPC, @@ -542,7 +357,8 @@ default => <<"">>, title => #{en => <<"Forward Topic">>, zh => <<"转发消息主题"/utf8>>}, - description => #{en => <<"The topic used when forwarding the message. Defaults to the topic of the bridge message if not provided.">>, + description => #{en => <<"The topic used when forwarding the message. " + "Defaults to the topic of the bridge message if not provided.">>, zh => <<"转发消息时使用的主题。如果未提供,则默认为桥接消息的主题。"/utf8>>} }, payload_tmpl => #{ @@ -553,8 +369,11 @@ default => <<"">>, title => #{en => <<"Payload Template">>, zh => <<"消息内容模板"/utf8>>}, - description => #{en => <<"The payload template, variable interpolation is supported. If using empty template (default), then the payload will be all the available vars in JSON format">>, - zh => <<"消息内容模板,支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>} + description => #{en => <<"The payload template, variable interpolation is supported. " + "If using empty template (default), then the payload will be " + "all the available vars in JSON format">>, + zh => <<"消息内容模板,支持变量。" + "若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>} } }, title => #{en => <<"Data bridge to MQTT Broker">>, @@ -731,12 +550,6 @@ options(Options, PoolName, ResId) -> {connect_module, emqx_bridge_rpc}, {batch_size, Get(<<"batch_size">>)}]; false -> - Subscriptions = format_subscriptions(GetD(<<"subscription_opts">>, [])), - Subscriptions1 = case Get(<<"topic">>) of - undefined -> Subscriptions; - Topic -> - [{subscriptions, [{Topic, Get(<<"qos">>)}]} | Subscriptions] - end, [{address, binary_to_list(Address)}, {bridge_mode, GetD(<<"bridge_mode">>, true)}, {clean_start, true}, @@ -748,8 +561,7 @@ options(Options, PoolName, ResId) -> {password, str(Get(<<"password">>))}, {proto_ver, mqtt_ver(Get(<<"proto_ver">>))}, {retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)} - | maybe_ssl(Options, Get(<<"ssl">>), ResId) - ] ++ Subscriptions1 + | maybe_ssl(Options, Get(<<"ssl">>), ResId)] end. maybe_ssl(_Options, false, _ResId) -> @@ -765,8 +577,3 @@ mqtt_ver(ProtoVer) -> <<"mqttv5">> -> v5; _ -> v4 end. - -format_subscriptions(SubOpts) -> - lists:map(fun(Sub) -> - {maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)} - end, SubOpts).