revert(bridge-mqtt): remove the mqtt_sub resource

We have several reasons to remove this feature:

1. The design does not make sense. A rule engine resource should
not have an impact on the system's messages directly after it is created.
This mqtt_sub actually conflicts with any design concept of the rules engine.

2. The implementation is incorrect. mqtt_sub uses a client pool to establish
a subscription relationship to an MQTT Broker. This causes a message to be
sent repeatedly to EMQ X. Unless a shared subscription is used,
or a Pool Size of 1 is configured.

3. The emqx-bridge-mqtt supports all the features of mqtt_sub.

This feature introduced by https://github.com/emqx/emqx-bridge-mqtt/pull/78.
And it released to v4.2.0 (NOT WORK), v4.2.1-v4.2.7 (FIXED)
This commit is contained in:
JianBo He 2021-03-02 17:35:13 +08:00 committed by turtleDeng
parent 7bf5bb26e7
commit 2fb5dbd546
1 changed files with 14 additions and 207 deletions

View File

@ -39,7 +39,6 @@
]).
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
-define(RESOURCE_TYPE_MQTT_SUB, 'bridge_mqtt_sub').
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
@ -111,7 +110,7 @@
zh => <<"桥接挂载点"/utf8>>},
description => #{
en => <<"MountPoint for bridge topic:<br/>"
"Example: The topic of messages sent to `topic1` on local node"
"Example: The topic of messages sent to `topic1` on local node "
"will be transformed to `bridge/aws/${node}/topic1`">>,
zh => <<"桥接主题的挂载点:<br/>"
"示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题"
@ -126,8 +125,8 @@
enum => [<<"on">>, <<"off">>],
title => #{en => <<"Disk Cache">>,
zh => <<"磁盘缓存"/utf8>>},
description => #{en => <<"The flag which determines whether messages"
"can be cached on local disk when bridge is"
description => #{en => <<"The flag which determines whether messages "
"can be cached on local disk when bridge is "
"disconnected">>,
zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁"
"盘队列上"/utf8>>}
@ -244,181 +243,6 @@
}
}).
-define(RESOURCE_CONFIG_SPEC_MQTT_SUB, #{
address => #{
order => 1,
type => string,
required => true,
default => <<"127.0.0.1:1883">>,
title => #{en => <<" Broker Address">>,
zh => <<"远程 broker 地址"/utf8>>},
description => #{en => <<"The MQTT Remote Address">>,
zh => <<"远程 MQTT Broker 的地址"/utf8>>}
},
pool_size => #{
order => 2,
type => number,
required => true,
default => 8,
title => #{en => <<"Pool Size">>,
zh => <<"连接池大小"/utf8>>},
description => #{en => <<"MQTT Connection Pool Size">>,
zh => <<"连接池大小"/utf8>>}
},
clientid => #{
order => 3,
type => string,
required => true,
default => <<"client">>,
title => #{en => <<"ClientId">>,
zh => <<"客户端 Id"/utf8>>},
description => #{en => <<"ClientId for connecting to remote MQTT broker">>,
zh => <<"连接远程 Broker 的 ClientId"/utf8>>}
},
append => #{
order => 4,
type => boolean,
required => true,
default => true,
title => #{en => <<"Append GUID">>,
zh => <<"附加 GUID"/utf8>>},
description => #{en => <<"Append GUID to MQTT ClientId?">>,
zh => <<"是否将GUID附加到 MQTT ClientId 后"/utf8>>}
},
username => #{
order => 5,
type => string,
required => false,
default => <<"">>,
title => #{en => <<"Username">>, zh => <<"用户名"/utf8>>},
description => #{en => <<"Username for connecting to remote MQTT Broker">>,
zh => <<"连接远程 Broker 的用户名"/utf8>>}
},
password => #{
order => 6,
type => password,
required => false,
default => <<"">>,
title => #{en => <<"Password">>,
zh => <<"密码"/utf8>>},
description => #{en => <<"Password for connecting to remote MQTT Broker">>,
zh => <<"连接远程 Broker 的密码"/utf8>>}
},
subscription_opts => #{
order => 7,
type => array,
items => #{
type => object,
schema => #{
topic => #{
order => 1,
type => string,
default => <<>>,
title => #{en => <<"MQTT Topic">>,
zh => <<"MQTT 主题"/utf8>>},
description => #{en => <<"MQTT Topic">>,
zh => <<"MQTT 主题"/utf8>>}
},
qos => #{
order => 2,
type => number,
enum => [0, 1, 2],
default => 0,
title => #{en => <<"MQTT Topic QoS">>,
zh => <<"MQTT 服务质量"/utf8>>},
description => #{en => <<"MQTT Topic QoS">>,
zh => <<"MQTT 服务质量"/utf8>>}
}
}
},
default => [],
title => #{en => <<"Subscription Opts">>,
zh => <<"订阅选项"/utf8>>},
description => #{en => <<"Subscription Opts">>,
zh => <<"订阅选项"/utf8>>}
},
proto_ver => #{
order => 8,
type => string,
required => false,
default => <<"mqttv4">>,
enum => [<<"mqttv3">>, <<"mqttv4">>, <<"mqttv5">>],
title => #{en => <<"Protocol Version">>,
zh => <<"协议版本"/utf8>>},
description => #{en => <<"MQTTT Protocol version">>,
zh => <<"MQTT 协议版本"/utf8>>}
},
keepalive => #{
order => 9,
type => string,
required => false,
default => <<"60s">> ,
title => #{en => <<"Keepalive">>,
zh => <<"心跳间隔"/utf8>>},
description => #{en => <<"Keepalive">>,
zh => <<"心跳间隔"/utf8>>}
},
reconnect_interval => #{
order => 10,
type => string,
required => false,
default => <<"30s">>,
title => #{en => <<"Reconnect Interval">>,
zh => <<"重连间隔"/utf8>>},
description => #{en => <<"Reconnect interval of bridge">>,
zh => <<"重连间隔"/utf8>>}
},
ssl => #{
order => 11,
type => boolean,
default => false,
title => #{en => <<"Bridge SSL">>,
zh => <<"Bridge SSL"/utf8>>},
description => #{en => <<"Switch which used to enable ssl connection of the bridge">>,
zh => <<"是否启用 Bridge SSL 连接"/utf8>>}
},
cacertfile => #{
order => 12,
type => file,
required => false,
default => <<"etc/certs/cacert.pem">>,
title => #{en => <<"CA certificates">>,
zh => <<"CA 证书"/utf8>>},
description => #{en => <<"The file path of the CA certificates">>,
zh => <<"CA 证书路径"/utf8>>}
},
certfile => #{
order => 13,
type => file,
required => false,
default => <<"etc/certs/client-cert.pem">>,
title => #{en => <<"SSL Certfile">>,
zh => <<"SSL 客户端证书"/utf8>>},
description => #{en => <<"The file path of the client certfile">>,
zh => <<"客户端证书路径"/utf8>>}
},
keyfile => #{
order => 14,
type => file,
required => false,
default => <<"etc/certs/client-key.pem">>,
title => #{en => <<"SSL Keyfile">>,
zh => <<"SSL 密钥文件"/utf8>>},
description => #{en => <<"The file path of the client keyfile">>,
zh => <<"客户端密钥路径"/utf8>>}
},
ciphers => #{
order => 15,
type => string,
required => false,
default => <<"ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384">>,
title => #{en => <<"SSL Ciphers">>,
zh => <<"SSL 加密算法"/utf8>>},
description => #{en => <<"SSL Ciphers">>,
zh => <<"SSL 加密算法"/utf8>>}
}
}).
-define(RESOURCE_CONFIG_SPEC_RPC, #{
address => #{
order => 1,
@ -438,7 +262,7 @@
title => #{en => <<"Bridge MountPoint">>,
zh => <<"桥接挂载点"/utf8>>},
description => #{en => <<"MountPoint for bridge topic<br/>"
"Example: The topic of messages sent to `topic1` on local node"
"Example: The topic of messages sent to `topic1` on local node "
"will be transformed to `bridge/aws/${node}/topic1`">>,
zh => <<"桥接主题的挂载点<br/>"
"示例: 本地节点向 `topic1` 发消息,远程桥接节点的主题"
@ -482,8 +306,8 @@
enum => [<<"on">>, <<"off">>],
title => #{en => <<"Disk Cache">>,
zh => <<"磁盘缓存"/utf8>>},
description => #{en => <<"The flag which determines whether messages"
"can be cached on local disk when bridge is"
description => #{en => <<"The flag which determines whether messages "
"can be cached on local disk when bridge is "
"disconnected">>,
zh => <<"当桥接断开时用于控制是否将消息缓存到本地磁"
"盘队列上"/utf8>>}
@ -508,15 +332,6 @@
description => #{en => <<"MQTT Message Bridge">>, zh => <<"MQTT 消息桥接"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE_MQTT_SUB,
create => on_resource_create,
status => on_get_resource_status,
destroy => on_resource_destroy,
params => ?RESOURCE_CONFIG_SPEC_MQTT_SUB,
title => #{en => <<"MQTT Subscribe">>, zh => <<"MQTT Subscribe"/utf8>>},
description => #{en => <<"MQTT Subscribe">>, zh => <<"MQTT 订阅消息"/utf8>>}
}).
-resource_type(#{
name => ?RESOURCE_TYPE_RPC,
@ -542,7 +357,8 @@
default => <<"">>,
title => #{en => <<"Forward Topic">>,
zh => <<"转发消息主题"/utf8>>},
description => #{en => <<"The topic used when forwarding the message. Defaults to the topic of the bridge message if not provided.">>,
description => #{en => <<"The topic used when forwarding the message. "
"Defaults to the topic of the bridge message if not provided.">>,
zh => <<"转发消息时使用的主题。如果未提供,则默认为桥接消息的主题。"/utf8>>}
},
payload_tmpl => #{
@ -553,8 +369,11 @@
default => <<"">>,
title => #{en => <<"Payload Template">>,
zh => <<"消息内容模板"/utf8>>},
description => #{en => <<"The payload template, variable interpolation is supported. If using empty template (default), then the payload will be all the available vars in JSON format">>,
zh => <<"消息内容模板,支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
description => #{en => <<"The payload template, variable interpolation is supported. "
"If using empty template (default), then the payload will be "
"all the available vars in JSON format">>,
zh => <<"消息内容模板,支持变量。"
"若使用空模板(默认),消息内容为 JSON 格式的所有字段"/utf8>>}
}
},
title => #{en => <<"Data bridge to MQTT Broker">>,
@ -731,12 +550,6 @@ options(Options, PoolName, ResId) ->
{connect_module, emqx_bridge_rpc},
{batch_size, Get(<<"batch_size">>)}];
false ->
Subscriptions = format_subscriptions(GetD(<<"subscription_opts">>, [])),
Subscriptions1 = case Get(<<"topic">>) of
undefined -> Subscriptions;
Topic ->
[{subscriptions, [{Topic, Get(<<"qos">>)}]} | Subscriptions]
end,
[{address, binary_to_list(Address)},
{bridge_mode, GetD(<<"bridge_mode">>, true)},
{clean_start, true},
@ -748,8 +561,7 @@ options(Options, PoolName, ResId) ->
{password, str(Get(<<"password">>))},
{proto_ver, mqtt_ver(Get(<<"proto_ver">>))},
{retry_interval, cuttlefish_duration:parse(str(GetD(<<"retry_interval">>, "30s")), s)}
| maybe_ssl(Options, Get(<<"ssl">>), ResId)
] ++ Subscriptions1
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
end.
maybe_ssl(_Options, false, _ResId) ->
@ -765,8 +577,3 @@ mqtt_ver(ProtoVer) ->
<<"mqttv5">> -> v5;
_ -> v4
end.
format_subscriptions(SubOpts) ->
lists:map(fun(Sub) ->
{maps:get(<<"topic">>, Sub), maps:get(<<"qos">>, Sub)}
end, SubOpts).