diff --git a/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf b/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf index 3cc719e40..4593e04f0 100644 --- a/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf +++ b/apps/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf @@ -2,173 +2,55 @@ ## Configuration for EMQ X MQTT Broker Bridge ##==================================================================== -##-------------------------------------------------------------------- -## Bridges to aws -##-------------------------------------------------------------------- - -## Bridge address: node name for local bridge, host:port for remote. -## -## Value: String -## Example: emqx@127.0.0.1, "127.0.0.1:1883" -bridge.mqtt.aws.address = "127.0.0.1:1883" - -## Protocol version of the bridge. -## -## Value: Enum -## - mqttv5 -## - mqttv4 -## - mqttv3 -bridge.mqtt.aws.proto_ver = mqttv4 - -## Start type of the bridge. -## -## Value: enum -## manual -## auto -bridge.mqtt.aws.start_type = manual - -## Whether to enable bridge mode for mqtt bridge -## -## This option is prepared for the mqtt broker which does not -## support bridge_mode such as the mqtt-plugin of the rabbitmq -## -## Value: boolean -#bridge.mqtt.aws.bridge_mode = false - -## The ClientId of a remote bridge. -## -## Placeholders: -## ${node}: Node name -## -## Value: String -bridge.mqtt.aws.clientid = bridge_aws - -## The Clean start flag of a remote bridge. -## -## Value: boolean -## Default: true -## -## NOTE: Some IoT platforms require clean_start -## must be set to 'true' -bridge.mqtt.aws.clean_start = true - -## The username for a remote bridge. -## -## Value: String -bridge.mqtt.aws.username = user - -## The password for a remote bridge. -## -## Value: String -bridge.mqtt.aws.password = passwd - -## Topics that need to be forward to AWS IoTHUB -## -## Value: String -## Example: "topic1/#,topic2/#" -bridge.mqtt.aws.forwards = "topic1/#,topic2/#" - -## Forward messages to the mountpoint of an AWS IoTHUB -## -## Value: String -bridge.mqtt.aws.forward_mountpoint = "bridge/aws/${node}/" - -## Need to subscribe to AWS topics -## -## Value: String -## bridge.mqtt.aws.subscription.1.topic = "cmd/topic1" - -## Need to subscribe to AWS topics QoS. -## -## Value: Number -## bridge.mqtt.aws.subscription.1.qos = 1 - -## A mountpoint that receives messages from AWS IoTHUB -## -## Value: String -## bridge.mqtt.aws.receive_mountpoint = "receive/aws/" - - -## Bribge to remote server via SSL. -## -## Value: on | off -bridge.mqtt.aws.ssl = off - -## PEM-encoded CA certificates of the bridge. -## -## Value: File -bridge.mqtt.aws.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" - -## Client SSL Certfile of the bridge. -## -## Value: File -bridge.mqtt.aws.certfile = "{{ platform_etc_dir }}/certs/client-cert.pem" - -## Client SSL Keyfile of the bridge. -## -## Value: File -bridge.mqtt.aws.keyfile = "{{ platform_etc_dir }}/certs/client-key.pem" - -## SSL Ciphers used by the bridge. -## -## Value: String -bridge.mqtt.aws.ciphers = "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_CCM_SHA256,TLS_AES_128_CCM_8_SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA" - -## Ciphers for TLS PSK. -## Note that 'bridge.${BridgeName}.ciphers' and 'bridge.${BridgeName}.psk_ciphers' cannot -## be configured at the same time. -## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#bridge.mqtt.aws.psk_ciphers = "PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA" - -## Ping interval of a down bridge. -## -## Value: Duration -## Default: 10 seconds -bridge.mqtt.aws.keepalive = 60s - -## TLS versions used by the bridge. -## -## NOTE: Do not use tlsv1.3 if emqx is running on OTP-22 or earlier -## Value: String -bridge.mqtt.aws.tls_versions = "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1" - -## Bridge reconnect time. -## -## Value: Duration -## Default: 30 seconds -bridge.mqtt.aws.reconnect_interval = 30s - -## Retry interval for bridge QoS1 message delivering. -## -## Value: Duration -bridge.mqtt.aws.retry_interval = 20s - -## Publish messages in batches, only RPC Bridge supports -## -## Value: Integer -## default: 32 -bridge.mqtt.aws.batch_size = 32 - -## Inflight size. -## 0 means infinity (no limit on the inflight window) -## -## Value: Integer -bridge.mqtt.aws.max_inflight_size = 32 - -## Base directory for replayq to store messages on disk -## If this config entry is missing or set to undefined, -## replayq works in a mem-only manner. -## -## Value: String -bridge.mqtt.aws.queue.replayq_dir = "{{ platform_data_dir }}/replayq/emqx_aws_bridge/" - -## Replayq segment size -## -## Value: Bytesize -bridge.mqtt.aws.queue.replayq_seg_bytes = 10MB - -## Replayq max total size -## -## Value: Bytesize -bridge.mqtt.aws.queue.max_total_size = 5GB - +emqx_bridge_mqtt:{ + bridges:[{ + name: "mqtt1" + start_type: auto + forwards: ["test/#"], + forward_mountpoint: "" + reconnect_interval: "30s" + batch_size: 100 + queue:{ + replayq_dir: false + # replayq_seg_bytes: "100MB" + # replayq_offload_mode: false + # replayq_max_total_bytes: "1GB" + }, + config:{ + conn_type: mqtt + address: "127.0.0.1:1883" + proto_ver: v4 + bridge_mode: true + clientid: "client1" + clean_start: true + username: "username1" + password: "" + keepalive: 300 + subscriptions: [{ + topic: "t/#" + qos: 1 + }] + receive_mountpoint: "" + retry_interval: "30s" + max_inflight: 32 + } + }, + { + name: "rpc1" + start_type: auto + forwards: ["test/#"], + forward_mountpoint: "" + reconnect_interval: "30s" + batch_size: 100 + queue:{ + replayq_dir: "{{ platform_data_dir }}/replayq/bridge_mqtt/" + replayq_seg_bytes: "100MB" + replayq_offload_mode: false + replayq_max_total_bytes: "1GB" + }, + config:{ + conn_type: rpc + node: "emqx@127.0.0.1" + } + }] +} diff --git a/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema b/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema deleted file mode 100644 index 3168bfc14..000000000 --- a/apps/emqx_bridge_mqtt/priv/emqx_bridge_mqtt.schema +++ /dev/null @@ -1,244 +0,0 @@ -%%-*- mode: erlang -*- -%%-------------------------------------------------------------------- -%% Bridges -%%-------------------------------------------------------------------- -{mapping, "bridge.mqtt.$name.address", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.proto_ver", "emqx_bridge_mqtt.bridges", [ - {datatype, {enum, [mqttv3, mqttv4, mqttv5]}} -]}. - -{mapping, "bridge.mqtt.$name.bridge_mode", "emqx_bridge_mqtt.bridges", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "bridge.mqtt.$name.start_type", "emqx_bridge_mqtt.bridges", [ - {datatype, {enum, [manual, auto]}}, - {default, auto} -]}. - -{mapping, "bridge.mqtt.$name.clientid", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.clean_start", "emqx_bridge_mqtt.bridges", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "bridge.mqtt.$name.username", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.password", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.forwards", "emqx_bridge_mqtt.bridges", [ - {datatype, string}, - {default, ""} -]}. - -{mapping, "bridge.mqtt.$name.forward_mountpoint", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.subscription.$id.topic", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.subscription.$id.qos", "emqx_bridge_mqtt.bridges", [ - {datatype, integer} -]}. - -{mapping, "bridge.mqtt.$name.receive_mountpoint", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.ssl", "emqx_bridge_mqtt.bridges", [ - {datatype, flag}, - {default, off} -]}. - -{mapping, "bridge.mqtt.$name.cacertfile", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.certfile", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.keyfile", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.ciphers", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.psk_ciphers", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.keepalive", "emqx_bridge_mqtt.bridges", [ - {default, "10s"}, - {datatype, {duration, s}} -]}. - -{mapping, "bridge.mqtt.$name.tls_versions", "emqx_bridge_mqtt.bridges", [ - {datatype, string}, - {default, "tlsv1.3,tlsv1.2,tlsv1.1,tlsv1"} -]}. - -{mapping, "bridge.mqtt.$name.reconnect_interval", "emqx_bridge_mqtt.bridges", [ - {default, "30s"}, - {datatype, {duration, ms}} -]}. - -{mapping, "bridge.mqtt.$name.retry_interval", "emqx_bridge_mqtt.bridges", [ - {default, "20s"}, - {datatype, {duration, s}} -]}. - -{mapping, "bridge.mqtt.$name.max_inflight_size", "emqx_bridge_mqtt.bridges", [ - {default, 0}, - {datatype, integer} - ]}. - -{mapping, "bridge.mqtt.$name.batch_size", "emqx_bridge_mqtt.bridges", [ - {default, 0}, - {datatype, integer} -]}. - -{mapping, "bridge.mqtt.$name.queue.replayq_dir", "emqx_bridge_mqtt.bridges", [ - {datatype, string} -]}. - -{mapping, "bridge.mqtt.$name.queue.replayq_seg_bytes", "emqx_bridge_mqtt.bridges", [ - {datatype, bytesize} -]}. - -{mapping, "bridge.mqtt.$name.queue.max_total_size", "emqx_bridge_mqtt.bridges", [ - {datatype, bytesize} -]}. - -{translation, "emqx_bridge_mqtt.bridges", fun(Conf) -> - - MapPSKCiphers = fun(PSKCiphers) -> - lists:map( - fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; - ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; - ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; - ("PSK-RC4-SHA") -> {psk, rc4_128, sha} - end, PSKCiphers) - end, - - Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - - IsSsl = fun(cacertfile) -> true; - (certfile) -> true; - (keyfile) -> true; - (ciphers) -> true; - (psk_ciphers) -> true; - (tls_versions) -> true; - (_Opt) -> false - end, - - Parse = fun(tls_versions, Vers) -> - [{versions, [list_to_atom(S) || S <- Split(Vers)]}]; - (ciphers, Ciphers) -> - [{ciphers, Split(Ciphers)}]; - (psk_ciphers, Ciphers) -> - [{ciphers, MapPSKCiphers(Split(Ciphers))}, {user_lookup_fun, {fun emqx_psk:lookup/3, <<>>}}]; - (Opt, Val) -> - [{Opt, Val}] - end, - - Merge = fun(forwards, Val, Opts) -> - [{forwards, string:tokens(Val, ",")}|Opts]; - (Opt, Val, Opts) -> - case IsSsl(Opt) of - true -> - SslOpts = Parse(Opt, Val) ++ proplists:get_value(ssl_opts, Opts, []), - lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); - false -> - [{Opt, Val}|Opts] - end - end, - Queue = fun(Name) -> - Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".queue", Conf), - - QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, _, "queue", QOpt], QValue} <- Configs], - maps:from_list(QOpts) - end, - Subscriptions = fun(Name) -> - Configs = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".subscription", Conf), - lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, _, "subscription", I, "topic"], Topic} <- Configs])], - [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, _, "subscription", I, "qos"], QoS} <- Configs])]) - end, - IsNodeAddr = fun(Addr) -> - case string:tokens(Addr, "@") of - [_NodeName, _Hostname] -> true; - _ -> false - end - end, - ConnMod = fun(Name) -> - - [AddrConfig] = cuttlefish_variable:filter_by_prefix("bridge.mqtt." ++ Name ++ ".address", Conf), - {_, Addr} = AddrConfig, - - Subs = Subscriptions(Name), - case IsNodeAddr(Addr) of - true when Subs =/= [] -> - error({"subscriptions are not supported when bridging between emqx nodes", Name, Subs}); - true -> - emqx_bridge_rpc; - false -> - emqx_bridge_mqtt - end - end, - - %% to be backward compatible - Translate = - fun Tr(queue, Q, Cfg) -> - NewQ = maps:fold(Tr, #{}, Q), - Cfg#{queue => NewQ}; - Tr(address, Addr0, Cfg) -> - Addr = case IsNodeAddr(Addr0) of - true -> list_to_atom(Addr0); - false -> Addr0 - end, - Cfg#{address => Addr}; - Tr(reconnect_interval, Ms, Cfg) -> - Cfg#{reconnect_delay_ms => Ms}; - Tr(proto_ver, Ver, Cfg) -> - Cfg#{proto_ver => - case Ver of - mqttv3 -> v3; - mqttv4 -> v4; - mqttv5 -> v5; - _ -> v4 - end}; - Tr(max_inflight_size, Size, Cfg) -> - Cfg#{max_inflight => Size}; - Tr(Key, Value, Cfg) -> - Cfg#{Key => Value} - end, - C = lists:foldl( - fun({["bridge", "mqtt", Name, Opt], Val}, Acc) -> - %% e.g #{aws => [{OptKey, OptVal}]} - Init = [{list_to_atom(Opt), Val}, - {connect_module, ConnMod(Name)}, - {subscriptions, Subscriptions(Name)}, - {queue, Queue(Name)}], - maps:update_with(list_to_atom(Name), fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); - (_, Acc) -> Acc - end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.mqtt", Conf))), - C1 = maps:map(fun(Bn, Bc) -> - maps:to_list(maps:fold(Translate, #{}, maps:from_list(Bc))) - end, C), - maps:to_list(C1) -end}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl deleted file mode 100644 index ece6002a7..000000000 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_connect.erl +++ /dev/null @@ -1,74 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_bridge_connect). - --export([start/2]). - --export_type([config/0, connection/0]). - --optional_callbacks([ensure_subscribed/3, ensure_unsubscribed/2]). - -%% map fields depend on implementation --type(config() :: map()). --type(connection() :: term()). --type(batch() :: emqx_protal:batch()). --type(ack_ref() :: emqx_bridge_worker:ack_ref()). --type(topic() :: emqx_topic:topic()). --type(qos() :: emqx_mqtt_types:qos()). - --include_lib("emqx/include/logger.hrl"). - --logger_header("[Bridge Connect]"). - -%% establish the connection to remote node/cluster -%% protal worker (the caller process) should be expecting -%% a message {disconnected, conn_ref()} when disconnected. --callback start(config()) -> {ok, connection()} | {error, any()}. - -%% send to remote node/cluster -%% bridge worker (the caller process) should be expecting -%% a message {batch_ack, reference()} when batch is acknowledged by remote node/cluster --callback send(connection(), batch()) -> {ok, ack_ref()} | {ok, integer()} | {error, any()}. - -%% called when owner is shutting down. --callback stop(connection()) -> ok. - --callback ensure_subscribed(connection(), topic(), qos()) -> ok. - --callback ensure_unsubscribed(connection(), topic()) -> ok. - -start(Module, Config) -> - case Module:start(Config) of - {ok, Conn} -> - {ok, Conn}; - {error, Reason} -> - Config1 = obfuscate(Config), - ?LOG(error, "Failed to connect with module=~p\n" - "config=~p\nreason:~p", [Module, Config1, Reason]), - {error, Reason} - end. - -obfuscate(Map) -> - maps:fold(fun(K, V, Acc) -> - case is_sensitive(K) of - true -> [{K, '***'} | Acc]; - false -> [{K, V} | Acc] - end - end, [], Map). - -is_sensitive(password) -> true; -is_sensitive(_) -> false. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index d612af668..8d442463b 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -18,15 +18,11 @@ -module(emqx_bridge_mqtt). --behaviour(emqx_bridge_connect). - -%% behaviour callbacks -export([ start/1 , send/2 , stop/1 ]). -%% optional behaviour callbacks -export([ ensure_subscribed/3 , ensure_unsubscribed/2 ]). @@ -37,6 +33,9 @@ , handle_disconnected/2 ]). +-export([ check_subscriptions/1 + ]). + -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -49,26 +48,31 @@ %% emqx_bridge_connect callbacks %%-------------------------------------------------------------------- -start(Config = #{address := Address}) -> +start(Config) -> Parent = self(), + Address = maps:get(address, Config), Mountpoint = maps:get(receive_mountpoint, Config, undefined), + Subscriptions = maps:get(subscriptions, Config, []), + Subscriptions1 = check_subscriptions(Subscriptions), Handlers = make_hdlr(Parent, Mountpoint), {Host, Port} = case string:tokens(Address, ":") of [H] -> {H, 1883}; [H, P] -> {H, list_to_integer(P)} end, - ClientConfig = Config#{msg_handler => Handlers, - host => Host, - port => Port, - force_ping => true - }, - case emqtt:start_link(replvar(ClientConfig)) of + Config1 = Config#{ + msg_handler => Handlers, + host => Host, + port => Port, + force_ping => true, + proto_ver => maps:get(proto_ver, Config, v4) + }, + case emqtt:start_link(without_config(Config1)) of {ok, Pid} -> case emqtt:connect(Pid) of {ok, _} -> try - subscribe_remote_topics(Pid, maps:get(subscriptions, Config, [])), - {ok, #{client_pid => Pid}} + Subscriptions2 = subscribe_remote_topics(Pid, Subscriptions1), + {ok, #{client_pid => Pid, subscriptions => Subscriptions2}} catch throw : Reason -> ok = stop(#{client_pid => Pid}), @@ -86,25 +90,25 @@ stop(#{client_pid := Pid}) -> safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), ok. -ensure_subscribed(#{client_pid := Pid}, Topic, QoS) when is_pid(Pid) -> +ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when is_pid(Pid) -> case emqtt:subscribe(Pid, Topic, QoS) of - {ok, _, _} -> ok; - Error -> Error + {ok, _, _} -> Conn#{subscriptions => [{Topic, QoS}|Subs]}; + Error -> {error, Error} end; ensure_subscribed(_Conn, _Topic, _QoS) -> %% return ok for now %% next re-connect should should call start with new topic added to config ok. -ensure_unsubscribed(#{client_pid := Pid}, Topic) when is_pid(Pid) -> +ensure_unsubscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic) when is_pid(Pid) -> case emqtt:unsubscribe(Pid, Topic) of - {ok, _, _} -> ok; - Error -> Error + {ok, _, _} -> Conn#{subscriptions => lists:keydelete(Topic, 1, Subs)}; + Error -> {error, Error} end; -ensure_unsubscribed(_, _) -> +ensure_unsubscribed(Conn, _) -> %% return ok for now %% next re-connect should should call start with this topic deleted from config - ok. + Conn. safe_stop(Pid, StopF, Timeout) -> MRef = monitor(process, Pid), @@ -169,36 +173,18 @@ make_hdlr(Parent, Mountpoint) -> }. subscribe_remote_topics(ClientPid, Subscriptions) -> - lists:foreach(fun({Topic, Qos}) -> - case emqtt:subscribe(ClientPid, Topic, Qos) of - {ok, _, _} -> ok; - Error -> throw(Error) - end - end, Subscriptions). + lists:map(fun({Topic, Qos}) -> + case emqtt:subscribe(ClientPid, Topic, Qos) of + {ok, _, _} -> {Topic, Qos}; + Error -> throw(Error) + end + end, Subscriptions). -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- +without_config(Config) -> + maps:without([conn_type, address, receive_mountpoint, subscriptions], Config). -replvar(Options) -> - replvar([clientid, max_inflight], Options). - -replvar([], Options) -> - Options; -replvar([Key|More], Options) -> - case maps:get(Key, Options, undefined) of - undefined -> - replvar(More, Options); - Val -> - replvar(More, maps:put(Key, feedvar(Key, Val, Options), Options)) - end. - -%% ${node} => node() -feedvar(clientid, ClientId, _) -> - iolist_to_binary(re:replace(ClientId, "\\${node}", atom_to_list(node()))); - -feedvar(max_inflight, 0, _) -> - infinity; - -feedvar(max_inflight, Size, _) -> - Size. +check_subscriptions(Subscriptions) -> + lists:map(fun(#{qos := QoS, topic := Topic}) -> + true = emqx_topic:validate({filter, Topic}), + {Topic, QoS} + end, Subscriptions). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl new file mode 100644 index 000000000..fcd2f2c1d --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl @@ -0,0 +1,89 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_schema). + +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([ structs/0 + , fields/1]). + +structs() -> ["emqx_bridge_mqtt"]. + +fields("emqx_bridge_mqtt") -> + [ {bridges, hoconsc:array("bridges")} + ]; + +fields("bridges") -> + [ {name, emqx_schema:t(string(), undefined, true)} + , {start_type, fun start_type/1} + , {forwards, fun forwards/1} + , {forward_mountpoint, emqx_schema:t(string())} + , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} + , {batch_size, emqx_schema:t(integer(), undefined, 100)} + , {queue, emqx_schema:t(hoconsc:ref("queue"))} + , {config, hoconsc:union([hoconsc:ref("mqtt"), hoconsc:ref("rpc")])} + ]; + +fields("mqtt") -> + [ {conn_type, fun conn_type/1} + , {address, emqx_schema:t(string(), undefined, "127.0.0.1:1883")} + , {proto_ver, fun proto_ver/1} + , {bridge_mode, emqx_schema:t(boolean(), undefined, true)} + , {clientid, emqx_schema:t(string())} + , {username, emqx_schema:t(string())} + , {password, emqx_schema:t(string())} + , {clean_start, emqx_schema:t(boolean(), undefined, true)} + , {keepalive, emqx_schema:t(integer(), undefined, 300)} + , {subscriptions, hoconsc:array("subscriptions")} + , {receive_mountpoint, emqx_schema:t(string())} + , {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} + , {max_inflight, emqx_schema:t(integer(), undefined, 32)} + ]; + +fields("rpc") -> + [ {conn_type, fun conn_type/1} + , {node, emqx_schema:t(atom(), undefined, 'emqx@127.0.0.1')} + ]; + +fields("subscriptions") -> + [ {topic, #{type => binary(), nullable => false}} + , {qos, emqx_schema:t(integer(), undefined, 1)} + ]; + +fields("queue") -> + [ {replayq_dir, hoconsc:union([boolean(), string()])} + , {replayq_seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")} + , {replayq_offload_mode, emqx_schema:t(boolean(), undefined, false)} + , {replayq_max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")} + ]. + +conn_type(type) -> hoconsc:enum([mqtt, rpc]); +conn_type(_) -> undefined. + +proto_ver(type) -> hoconsc:enum([v3, v4, v5]); +proto_ver(default) -> v4; +proto_ver(_) -> undefined. + +start_type(type) -> hoconsc:enum([auto, manual]); +start_type(default) -> auto; +start_type(_) -> undefined. + +forwards(type) -> hoconsc:array(binary()); +forwards(default) -> []; +forwards(_) -> undefined. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl index 80a11c1c0..0075b4a1d 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_sup.erl @@ -24,37 +24,33 @@ %% APIs -export([ start_link/0 - , start_link/1 ]). --export([ create_bridge/2 +-export([ create_bridge/1 , drop_bridge/1 , bridges/0 - , is_bridge_exist/1 ]). %% supervisor callbacks -export([init/1]). --define(SUP, ?MODULE). -define(WORKER_SUP, emqx_bridge_worker_sup). -start_link() -> start_link(?SUP). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). -start_link(Name) -> - supervisor:start_link({local, Name}, ?MODULE, Name). - -init(?SUP) -> - BridgesConf = application:get_env(?APP, bridges, []), +init([]) -> + BridgesConf = emqx_config:get([?APP, bridges], []), BridgeSpec = lists:map(fun bridge_spec/1, BridgesConf), SupFlag = #{strategy => one_for_one, intensity => 100, period => 10}, {ok, {SupFlag, BridgeSpec}}. -bridge_spec({Name, Config}) -> +bridge_spec(Config) -> + Name = list_to_atom(maps:get(name, Config)), #{id => Name, - start => {emqx_bridge_worker, start_link, [Name, Config]}, + start => {emqx_bridge_worker, start_link, [Config]}, restart => permanent, shutdown => 5000, type => worker, @@ -62,22 +58,15 @@ bridge_spec({Name, Config}) -> -spec(bridges() -> [{node(), map()}]). bridges() -> - [{Name, emqx_bridge_worker:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?SUP)]. + [{Name, emqx_bridge_worker:status(Name)} || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)]. --spec(is_bridge_exist(atom() | pid()) -> boolean()). -is_bridge_exist(Id) -> - case supervisor:get_childspec(?SUP, Id) of - {ok, _ChildSpec} -> true; - {error, _Error} -> false - end. +create_bridge(Config) -> + supervisor:start_child(?MODULE, bridge_spec(Config)). -create_bridge(Id, Config) -> - supervisor:start_child(?SUP, bridge_spec({Id, Config})). - -drop_bridge(Id) -> - case supervisor:terminate_child(?SUP, Id) of +drop_bridge(Name) -> + case supervisor:terminate_child(?MODULE, Name) of ok -> - supervisor:delete_child(?SUP, Id); + supervisor:delete_child(?MODULE, Name); {error, Error} -> ?LOG(error, "Delete bridge failed, error : ~p", [Error]), {error, Error} diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl index 0cf4b5bc5..33511cc03 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_rpc.erl @@ -18,9 +18,6 @@ -module(emqx_bridge_rpc). --behaviour(emqx_bridge_connect). - -%% behaviour callbacks -export([ start/1 , send/2 , stop/1 @@ -33,17 +30,15 @@ -type ack_ref() :: emqx_bridge_worker:ack_ref(). -type batch() :: emqx_bridge_worker:batch(). --type node_or_tuple() :: atom() | {atom(), term()}. - -define(HEARTBEAT_INTERVAL, timer:seconds(1)). -define(RPC, emqx_rpc). -start(#{address := Remote}) -> - case poke(Remote) of +start(#{node := RemoteNode}) -> + case poke(RemoteNode) of ok -> - Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), Remote]), - {ok, #{client_pid => Pid, address => Remote}}; + Pid = proc_lib:spawn_link(?MODULE, heartbeat, [self(), RemoteNode]), + {ok, #{client_pid => Pid, remote_node => RemoteNode}}; Error -> Error end. @@ -62,9 +57,9 @@ stop(#{client_pid := Pid}) when is_pid(Pid) -> ok. %% @doc Callback for `emqx_bridge_connect' behaviour --spec send(#{address := node_or_tuple(), _ => _}, batch()) -> {ok, ack_ref()} | {error, any()}. -send(#{address := Remote}, Batch) -> - case ?RPC:call(Remote, ?MODULE, handle_send, [Batch]) of +-spec send(#{remote_node := atom(), _ => _}, batch()) -> {ok, ack_ref()} | {error, any()}. +send(#{remote_node := RemoteNode}, Batch) -> + case ?RPC:call(RemoteNode, ?MODULE, handle_send, [Batch]) of ok -> Ref = make_ref(), self() ! {batch_ack, Ref}, @@ -93,8 +88,8 @@ heartbeat(Parent, RemoteNode) -> end end. -poke(Node) -> - case ?RPC:call(Node, erlang, node, []) of - Node -> ok; +poke(RemoteNode) -> + case ?RPC:call(RemoteNode, erlang, node, []) of + RemoteNode -> ok; {badrpc, Reason} -> {error, Reason} end. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index e7414683c..dfef6973e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -66,7 +66,6 @@ %% APIs -export([ start_link/1 - , start_link/2 , register_metrics/0 , stop/1 ]). @@ -86,7 +85,6 @@ %% management APIs -export([ ensure_started/1 , ensure_stopped/1 - , ensure_stopped/2 , status/1 ]). @@ -125,14 +123,13 @@ -define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). -define(DEFAULT_SEG_BYTES, (1 bsl 20)). -define(DEFAULT_MAX_TOTAL_SIZE, (1 bsl 31)). --define(NO_BRIDGE_HANDLER, undefined). %% @doc Start a bridge worker. Supported configs: %% start_type: 'manual' (default) or 'auto', when manual, bridge will stay %% at 'idle' state until a manual call to start it. %% connect_module: The module which implements emqx_bridge_connect behaviour %% and work as message batch transport layer -%% reconnect_delay_ms: Delay in milli-seconds for the bridge worker to retry +%% reconnect_interval: Delay in milli-seconds for the bridge worker to retry %% in case of transportation failure. %% max_inflight: Max number of batches allowed to send-ahead before receiving %% confirmation from remote node/cluster @@ -148,128 +145,98 @@ %% %% Find more connection specific configs in the callback modules %% of emqx_bridge_connect behaviour. -start_link(Config) when is_list(Config) -> - start_link(maps:from_list(Config)); -start_link(Config) -> - gen_statem:start_link(?MODULE, Config, []). - -start_link(Name, Config) when is_list(Config) -> - start_link(Name, maps:from_list(Config)); -start_link(Name, Config) -> - Name1 = name(Name), - gen_statem:start_link({local, Name1}, ?MODULE, Config#{name => Name1}, []). +start_link(Opts) when is_list(Opts) -> + start_link(maps:from_list(Opts)); +start_link(Opts) -> + case maps:get(name, Opts, undefined) of + undefined -> + gen_statem:start_link(?MODULE, Opts, []); + Name -> + Name1 = name(Name), + gen_statem:start_link({local, Name1}, ?MODULE, Opts#{name => Name1}, []) + end. ensure_started(Name) -> gen_statem:call(name(Name), ensure_started). %% @doc Manually stop bridge worker. State idempotency ensured. -ensure_stopped(Id) -> - ensure_stopped(Id, 1000). - -ensure_stopped(Id, Timeout) -> - Pid = case id(Id) of - P when is_pid(P) -> P; - N -> whereis(N) - end, - case Pid of - undefined -> - ok; - _ -> - MRef = monitor(process, Pid), - unlink(Pid), - _ = gen_statem:call(id(Id), ensure_stopped, Timeout), - receive - {'DOWN', MRef, _, _, _} -> - ok - after - Timeout -> - exit(Pid, kill) - end - end. +ensure_stopped(Name) -> + gen_statem:call(name(Name), ensure_stopped, 5000). stop(Pid) -> gen_statem:stop(Pid). status(Pid) when is_pid(Pid) -> gen_statem:call(Pid, status); -status(Id) -> - gen_statem:call(name(Id), status). +status(Name) -> + gen_statem:call(name(Name), status). %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. -get_forwards(Id) -> gen_statem:call(id(Id), get_forwards, timer:seconds(1000)). +get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)). %% @doc Return all subscriptions (subscription over mqtt connection to remote broker). -spec get_subscriptions(id()) -> [{emqx_topic:topic(), qos()}]. -get_subscriptions(Id) -> gen_statem:call(id(Id), get_subscriptions). +get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions). %% @doc Add a new forward (local topic subscription). -spec ensure_forward_present(id(), topic()) -> ok. -ensure_forward_present(Id, Topic) -> - gen_statem:call(id(Id), {ensure_present, forwards, topic(Topic)}). +ensure_forward_present(Name, Topic) -> + gen_statem:call(name(Name), {ensure_forward_present, topic(Topic)}). %% @doc Ensure a forward topic is deleted. -spec ensure_forward_absent(id(), topic()) -> ok. -ensure_forward_absent(Id, Topic) -> - gen_statem:call(id(Id), {ensure_absent, forwards, topic(Topic)}). +ensure_forward_absent(Name, Topic) -> + gen_statem:call(name(Name), {ensure_forward_absent, topic(Topic)}). %% @doc Ensure subscribed to remote topic. %% NOTE: only applicable when connection module is emqx_bridge_mqtt %% return `{error, no_remote_subscription_support}' otherwise. -spec ensure_subscription_present(id(), topic(), qos()) -> ok | {error, any()}. -ensure_subscription_present(Id, Topic, QoS) -> - gen_statem:call(id(Id), {ensure_present, subscriptions, {topic(Topic), QoS}}). +ensure_subscription_present(Name, Topic, QoS) -> + gen_statem:call(name(Name), {ensure_subscription_present, topic(Topic), QoS}). %% @doc Ensure unsubscribed from remote topic. %% NOTE: only applicable when connection module is emqx_bridge_mqtt -spec ensure_subscription_absent(id(), topic()) -> ok. -ensure_subscription_absent(Id, Topic) -> - gen_statem:call(id(Id), {ensure_absent, subscriptions, topic(Topic)}). +ensure_subscription_absent(Name, Topic) -> + gen_statem:call(name(Name), {ensure_subscription_absent, topic(Topic)}). callback_mode() -> [state_functions]. %% @doc Config should be a map(). -init(Config) -> +init(Opts) -> erlang:process_flag(trap_exit, true), - ConnectModule = maps:get(connect_module, Config), - Subscriptions = maps:get(subscriptions, Config, []), - Forwards = maps:get(forwards, Config, []), - Queue = open_replayq(Config), - State = init_opts(Config), - Topics = [iolist_to_binary(T) || T <- Forwards], - Subs = check_subscriptions(Subscriptions), - ConnectCfg = get_conn_cfg(Config), + ConnectOpts = maps:get(config, Opts), + ConnectModule = conn_type(maps:get(conn_type, ConnectOpts)), + Forwards = maps:get(forwards, Opts, []), + Queue = open_replayq(maps:get(queue, Opts, #{})), + State = init_opts(Opts), self() ! idle, {ok, idle, State#{connect_module => ConnectModule, - connect_cfg => ConnectCfg, - forwards => Topics, - subscriptions => Subs, + connect_opts => ConnectOpts, + forwards => Forwards, replayq => Queue }}. -init_opts(Config) -> - IfRecordMetrics = maps:get(if_record_metrics, Config, true), - ReconnDelayMs = maps:get(reconnect_delay_ms, Config, ?DEFAULT_RECONNECT_DELAY_MS), - StartType = maps:get(start_type, Config, manual), - BridgeHandler = maps:get(bridge_handler, Config, ?NO_BRIDGE_HANDLER), - Mountpoint = maps:get(forward_mountpoint, Config, undefined), - ReceiveMountpoint = maps:get(receive_mountpoint, Config, undefined), - MaxInflightSize = maps:get(max_inflight, Config, ?DEFAULT_BATCH_SIZE), - BatchSize = maps:get(batch_size, Config, ?DEFAULT_BATCH_SIZE), - Name = maps:get(name, Config, undefined), +init_opts(Opts) -> + IfRecordMetrics = maps:get(if_record_metrics, Opts, true), + ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS), + StartType = maps:get(start_type, Opts, manual), + Mountpoint = maps:get(forward_mountpoint, Opts, undefined), + MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_BATCH_SIZE), + BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), + Name = maps:get(name, Opts, undefined), #{start_type => StartType, - reconnect_delay_ms => ReconnDelayMs, + reconnect_interval => ReconnDelayMs, batch_size => BatchSize, mountpoint => format_mountpoint(Mountpoint), - receive_mountpoint => ReceiveMountpoint, inflight => [], max_inflight => MaxInflightSize, connection => undefined, - bridge_handler => BridgeHandler, if_record_metrics => IfRecordMetrics, name => Name}. -open_replayq(Config) -> - QCfg = maps:get(queue, Config, #{}), +open_replayq(QCfg) -> Dir = maps:get(replayq_dir, QCfg, undefined), SegBytes = maps:get(replayq_seg_bytes, QCfg, ?DEFAULT_SEG_BYTES), MaxTotalSize = maps:get(max_total_size, QCfg, ?DEFAULT_MAX_TOTAL_SIZE), @@ -280,22 +247,6 @@ open_replayq(Config) -> replayq:open(QueueConfig#{sizer => fun emqx_bridge_msg:estimate_size/1, marshaller => fun ?MODULE:msg_marshaller/1}). -check_subscriptions(Subscriptions) -> - lists:map(fun({Topic, QoS}) -> - Topic1 = iolist_to_binary(Topic), - true = emqx_topic:validate({filter, Topic1}), - {Topic1, QoS} - end, Subscriptions). - -get_conn_cfg(Config) -> - maps:without([connect_module, - queue, - reconnect_delay_ms, - forwards, - mountpoint, - name - ], Config). - code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. @@ -321,14 +272,10 @@ idle(info, idle, #{start_type := auto} = State) -> idle(state_timeout, reconnect, State) -> connecting(State); -idle(info, {batch_ack, Ref}, State) -> - NewState = handle_batch_ack(State, Ref), - {keep_state, NewState}; - idle(Type, Content, State) -> common(idle, Type, Content, State). -connecting(#{reconnect_delay_ms := ReconnectDelayMs} = State) -> +connecting(#{reconnect_interval := ReconnectDelayMs} = State) -> case do_connect(State) of {ok, State1} -> {next_state, connected, State1, {state_timeout, 0, connected}}; @@ -348,7 +295,7 @@ connected(internal, maybe_send, State) -> {keep_state, NewState}; connected(info, {disconnected, Conn, Reason}, - #{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) -> + #{connection := Connection, name := Name, reconnect_interval := ReconnectDelayMs} = State) -> ?tp(info, disconnected, #{name => Name, reason => Reason}), case Conn =:= maps:get(client_pid, Connection, undefined) of true -> @@ -365,19 +312,27 @@ connected(Type, Content, State) -> %% Common handlers common(StateName, {call, From}, status, _State) -> {keep_state_and_data, [{reply, From, StateName}]}; -common(_StateName, {call, From}, ensure_started, _State) -> - {keep_state_and_data, [{reply, From, connected}]}; -common(_StateName, {call, From}, ensure_stopped, _State) -> - {stop_and_reply, {shutdown, manual}, [{reply, From, ok}]}; +common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) -> + {keep_state_and_data, [{reply, From, ok}]}; +common(_StateName, {call, From}, ensure_stopped, #{connection := Conn, + connect_module := ConnectModule} = State) -> + Reply = ConnectModule:stop(Conn), + {next_state, idle, State#{connection => undefined}, [{reply, From, Reply}]}; common(_StateName, {call, From}, get_forwards, #{forwards := Forwards}) -> {keep_state_and_data, [{reply, From, Forwards}]}; -common(_StateName, {call, From}, get_subscriptions, #{subscriptions := Subs}) -> - {keep_state_and_data, [{reply, From, Subs}]}; -common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> - {Result, NewState} = ensure_present(What, Topic, State), +common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> + {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, [])}]}; +common(_StateName, {call, From}, {ensure_forward_present, Topic}, State) -> + {Result, NewState} = do_ensure_forward_present(Topic, State), {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> - {Result, NewState} = ensure_absent(What, Topic, State), +common(_StateName, {call, From}, {ensure_subscription_present, Topic, QoS}, State) -> + {Result, NewState} = do_ensure_subscription_present(Topic, QoS, State), + {keep_state, NewState, [{reply, From, Result}]}; +common(_StateName, {call, From}, {ensure_forward_absent, Topic}, State) -> + {Result, NewState} = do_ensure_forward_absent(Topic, State), + {keep_state, NewState, [{reply, From, Result}]}; +common(_StateName, {call, From}, {ensure_subscription_absent, Topic}, State) -> + {Result, NewState} = do_ensure_subscription_absent(Topic, State), {keep_state, NewState, [{reply, From, Result}]}; common(_StateName, info, {deliver, _, Msg}, State = #{replayq := Q, if_record_metrics := IfRecordMetric}) -> @@ -395,77 +350,79 @@ common(StateName, Type, Content, #{name := Name} = State) -> [Name, Type, StateName, Content]), {keep_state, State}. -eval_bridge_handler(State = #{bridge_handler := ?NO_BRIDGE_HANDLER}, _Msg) -> - State; -eval_bridge_handler(State = #{bridge_handler := Handler}, Msg) -> - Handler(Msg), - State. - -ensure_present(Key, Topic, State) -> - Topics = maps:get(Key, State), - case is_topic_present(Topic, Topics) of +do_ensure_forward_present(Topic, #{forwards := Forwards, name := Name} = State) -> + case is_topic_present(Topic, Forwards) of true -> {ok, State}; false -> - R = do_ensure_present(Key, Topic, State), - {R, State#{Key := lists:usort([Topic | Topics])}} + R = subscribe_local_topic(Topic, Name), + {R, State#{forwards => [Topic | Forwards]}} end. -ensure_absent(Key, Topic, State) -> - Topics = maps:get(Key, State), - case is_topic_present(Topic, Topics) of +do_ensure_subscription_present(_Topic, _QoS, #{connection := undefined} = State) -> + {{error, no_connection}, State}; +do_ensure_subscription_present(_Topic, _QoS, #{connect_module := emqx_bridge_rpc} = State) -> + {{error, no_remote_subscription_support}, State}; +do_ensure_subscription_present(Topic, QoS, #{connect_module := ConnectModule, + connection := Conn} = State) -> + case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of true -> - R = do_ensure_absent(Key, Topic, State), - {R, State#{Key := ensure_topic_absent(Topic, Topics)}}; + {ok, State}; + false -> + case ConnectModule:ensure_subscribed(Conn, Topic, QoS) of + {error, Error} -> + {{error, Error}, State}; + Conn1 -> + {ok, State#{connection => Conn1}} + end + end. + +do_ensure_forward_absent(Topic, #{forwards := Forwards} = State) -> + case is_topic_present(Topic, Forwards) of + true -> + R = do_unsubscribe(Topic), + {R, State#{forwards => lists:delete(Topic, Forwards)}}; + false -> + {ok, State} + end. +do_ensure_subscription_absent(_Topic, #{connection := undefined} = State) -> + {{error, no_connection}, State}; +do_ensure_subscription_absent(_Topic, #{connect_module := emqx_bridge_rpc} = State) -> + {{error, no_remote_subscription_support}, State}; +do_ensure_subscription_absent(Topic, #{connect_module := ConnectModule, + connection := Conn} = State) -> + case is_topic_present(Topic, maps:get(subscriptions, Conn, [])) of + true -> + case ConnectModule:ensure_unsubscribed(Conn, Topic) of + {error, Error} -> + {{error, Error}, State}; + Conn1 -> + {ok, State#{connection => Conn1}} + end; false -> {ok, State} end. -ensure_topic_absent(_Topic, []) -> []; -ensure_topic_absent(Topic, [{_, _} | _] = L) -> lists:keydelete(Topic, 1, L); -ensure_topic_absent(Topic, L) -> lists:delete(Topic, L). - -is_topic_present({Topic, _QoS}, Topics) -> - is_topic_present(Topic, Topics); is_topic_present(Topic, Topics) -> lists:member(Topic, Topics) orelse false =/= lists:keyfind(Topic, 1, Topics). do_connect(#{forwards := Forwards, - subscriptions := Subs, connect_module := ConnectModule, - connect_cfg := ConnectCfg, + connect_opts := ConnectOpts, inflight := Inflight, name := Name} = State) -> ok = subscribe_local_topics(Forwards, Name), - case emqx_bridge_connect:start(ConnectModule, ConnectCfg#{subscriptions => Subs}) of + case ConnectModule:start(ConnectOpts) of {ok, Conn} -> - Res = eval_bridge_handler(State#{connection => Conn}, connected), ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), - {ok, Res}; + {ok, State#{connection => Conn}}; {error, Reason} -> + ConnectOpts1 = obfuscate(ConnectOpts), + ?LOG(error, "Failed to connect with module=~p\n" + "config=~p\nreason:~p", [ConnectModule, ConnectOpts1, Reason]), {error, Reason, State} end. -do_ensure_present(forwards, Topic, #{name := Name}) -> - subscribe_local_topic(Topic, Name); -do_ensure_present(subscriptions, _Topic, #{connection := undefined}) -> - {error, no_connection}; -do_ensure_present(subscriptions, _Topic, #{connect_module := emqx_bridge_rpc}) -> - {error, no_remote_subscription_support}; -do_ensure_present(subscriptions, {Topic, QoS}, #{connect_module := ConnectModule, - connection := Conn}) -> - ConnectModule:ensure_subscribed(Conn, Topic, QoS). - -do_ensure_absent(forwards, Topic, _) -> - do_unsubscribe(Topic); -do_ensure_absent(subscriptions, _Topic, #{connection := undefined}) -> - {error, no_connection}; -do_ensure_absent(subscriptions, _Topic, #{connect_module := emqx_bridge_rpc}) -> - {error, no_remote_subscription_support}; -do_ensure_absent(subscriptions, Topic, #{connect_module := ConnectModule, - connection := Conn}) -> - ConnectModule:ensure_unsubscribed(Conn, Topic). - collect(Acc) -> receive {deliver, _, Msg} -> @@ -605,10 +562,9 @@ disconnect(#{connection := Conn, connect_module := Module } = State) when Conn =/= undefined -> Module:stop(Conn), - State0 = State#{connection => undefined}, - eval_bridge_handler(State0, disconnected); + State#{connection => undefined}; disconnect(State) -> - eval_bridge_handler(State, disconnected). + State. %% Called only when replayq needs to dump it to disk. msg_marshaller(Bin) when is_binary(Bin) -> emqx_bridge_msg:from_binary(Bin); @@ -621,9 +577,6 @@ format_mountpoint(Prefix) -> name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). -id(Pid) when is_pid(Pid) -> Pid; -id(Name) -> name(Name). - register_metrics() -> lists:foreach(fun emqx_metrics:ensure/1, ['bridge.mqtt.message_sent', @@ -639,3 +592,21 @@ bridges_metrics_inc(true, Metric, Value) -> emqx_metrics:inc(Metric, Value); bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) -> ok. + +obfuscate(Map) -> + maps:fold(fun(K, V, Acc) -> + case is_sensitive(K) of + true -> [{K, '***'} | Acc]; + false -> [{K, V} | Acc] + end + end, [], Map). + +is_sensitive(password) -> true; +is_sensitive(_) -> false. + +conn_type(rpc) -> + emqx_bridge_rpc; +conn_type(mqtt) -> + emqx_bridge_mqtt; +conn_type(Mod) when is_atom(Mod) -> + Mod. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl index 830fb1fe0..5babe0ed9 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_tests.erl @@ -44,4 +44,4 @@ send_and_ack_test() -> ok = emqx_bridge_mqtt:stop(Conn) after meck:unload(emqtt) - end. \ No newline at end of file + end. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_rpc_tests.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_rpc_tests.erl index fdcc25d5f..cbd80ba3d 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_rpc_tests.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_rpc_tests.erl @@ -30,8 +30,8 @@ send_and_ack_test() -> end), meck:new(emqx_bridge_worker, [passthrough, no_history]), try - {ok, #{client_pid := Pid, address := Node}} = emqx_bridge_rpc:start(#{address => node()}), - {ok, Ref} = emqx_bridge_rpc:send(#{address => Node}, []), + {ok, #{client_pid := Pid, remote_node := Node}} = emqx_bridge_rpc:start(#{node => node()}), + {ok, Ref} = emqx_bridge_rpc:send(#{remote_node => Node}, []), receive {batch_ack, Ref} -> ok diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl index d38663fcd..4c2fde6dd 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl @@ -16,9 +16,6 @@ -module(emqx_bridge_stub_conn). --behaviour(emqx_bridge_connect). - -%% behaviour callbacks -export([ start/1 , send/2 , stop/1 diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl index 680756742..f3f5d5ceb 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl @@ -59,13 +59,12 @@ init_per_suite(Config) -> nonode@nohost -> net_kernel:start(['emqx@127.0.0.1', longnames]); _ -> ok end, - ok = application:set_env(gen_rpc, tcp_client_num, 1), - emqx_ct_helpers:start_apps([emqx_modules, emqx_bridge_mqtt]), + emqx_ct_helpers:start_apps([emqx_bridge_mqtt]), emqx_logger:set_log_level(error), [{log_level, error} | Config]. end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_bridge_mqtt, emqx_modules]). + emqx_ct_helpers:stop_apps([emqx_bridge_mqtt]). init_per_testcase(_TestCase, Config) -> ok = snabbkaffe:start_trace(), @@ -74,260 +73,290 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> ok = snabbkaffe:stop(). -t_mngr(Config) when is_list(Config) -> - Subs = [{<<"a">>, 1}, {<<"b">>, 2}], - Cfg = #{address => node(), - forwards => [<<"mngr">>], - connect_module => emqx_bridge_rpc, - mountpoint => <<"forwarded">>, - subscriptions => Subs, - start_type => auto}, - Name = ?FUNCTION_NAME, - {ok, Pid} = emqx_bridge_worker:start_link(Name, Cfg), - try - ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)), - ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr")), - ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr2")), - ?assertEqual([<<"mngr">>, <<"mngr2">>], emqx_bridge_worker:get_forwards(Pid)), - ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr2")), - ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr3")), - ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Pid)), - ?assertEqual({error, no_remote_subscription_support}, - emqx_bridge_worker:ensure_subscription_present(Pid, <<"t">>, 0)), - ?assertEqual({error, no_remote_subscription_support}, - emqx_bridge_worker:ensure_subscription_absent(Pid, <<"t">>)), - ?assertEqual(Subs, emqx_bridge_worker:get_subscriptions(Pid)) - after - ok = emqx_bridge_worker:stop(Pid) - end. +t_rpc_mngr(_Config) -> + Name = "rpc_name", + Cfg = #{ + name => Name, + forwards => [<<"mngr">>], + forward_mountpoint => <<"forwarded">>, + start_type => auto, + config => #{ + conn_type => rpc, + node => node() + } + }, + {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg), + ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr")), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr2")), + ?assertEqual([<<"mngr2">>, <<"mngr">>], emqx_bridge_worker:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr2")), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr3")), + ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)), + ?assertEqual({error, no_remote_subscription_support}, + emqx_bridge_worker:ensure_subscription_present(Name, <<"t">>, 0)), + ?assertEqual({error, no_remote_subscription_support}, + emqx_bridge_worker:ensure_subscription_absent(Name, <<"t">>)), + ok = emqx_bridge_worker:stop(Pid). + +t_mqtt_mngr(_Config) -> + Name = "mqtt_name", + Cfg = #{ + name => Name, + forwards => [<<"mngr">>], + forward_mountpoint => <<"forwarded">>, + start_type => auto, + config => #{ + address => "127.0.0.1:1883", + conn_type => mqtt, + clientid => <<"client1">>, + keepalive => 300, + subscriptions => [#{topic => <<"t/#">>, qos => 1}] + } + }, + {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg), + ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr")), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_present(Name, "mngr2")), + ?assertEqual([<<"mngr2">>, <<"mngr">>], emqx_bridge_worker:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr2")), + ?assertEqual(ok, emqx_bridge_worker:ensure_forward_absent(Name, "mngr3")), + ?assertEqual([<<"mngr">>], emqx_bridge_worker:get_forwards(Name)), + ?assertEqual(ok, emqx_bridge_worker:ensure_subscription_present(Name, <<"t">>, 0)), + ?assertEqual(ok, emqx_bridge_worker:ensure_subscription_absent(Name, <<"t">>)), + ?assertEqual([{<<"t/#">>,1}], emqx_bridge_worker:get_subscriptions(Name)), + ok = emqx_bridge_worker:stop(Pid). %% A loopback RPC to local node -t_rpc(Config) when is_list(Config) -> - Cfg = #{address => node(), - forwards => [<<"t_rpc/#">>], - connect_module => emqx_bridge_rpc, - forward_mountpoint => <<"forwarded">>, - start_type => auto}, - {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), - ClientId = <<"ClientId">>, - try - {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _Props} = emqtt:connect(ConnPid), - {ok, _Props, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}), - timer:sleep(100), - {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_rpc/one">>, <<"hello">>, ?QOS_1), - timer:sleep(100), - ?assertEqual(1, length(receive_messages(1))), - emqtt:disconnect(ConnPid) - after - ok = emqx_bridge_worker:stop(Pid) - end. +t_rpc(_Config) -> + Name = "rpc", + Cfg = #{ + name => Name, + forwards => [<<"t_rpc/#">>], + forward_mountpoint => <<"forwarded">>, + start_type => auto, + config => #{ + conn_type => rpc, + node => node() + } + }, + {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg), + {ok, ConnPid} = emqtt:start_link([{clientid, <<"ClientId">>}]), + {ok, _Props} = emqtt:connect(ConnPid), + {ok, _Props, [1]} = emqtt:subscribe(ConnPid, {<<"forwarded/t_rpc/one">>, ?QOS_1}), + timer:sleep(100), + {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_rpc/one">>, <<"hello">>, ?QOS_1), + timer:sleep(100), + ?assertEqual(1, length(receive_messages(1))), + emqtt:disconnect(ConnPid), + emqx_bridge_worker:stop(Pid). %% Full data loopback flow explained: %% mqtt-client ----> local-broker ---(local-subscription)---> %% bridge(export) --- (mqtt-connection)--> local-broker ---(remote-subscription) --> %% bridge(import) --> mqtt-client -t_mqtt(Config) when is_list(Config) -> +t_mqtt(_Config) -> SendToTopic = <<"t_mqtt/one">>, SendToTopic2 = <<"t_mqtt/two">>, SendToTopic3 = <<"t_mqtt/three">>, Mountpoint = <<"forwarded/${node}/">>, - Cfg = #{address => "127.0.0.1:1883", - forwards => [SendToTopic], - connect_module => emqx_bridge_mqtt, - forward_mountpoint => Mountpoint, - username => "user", - clean_start => true, - clientid => "bridge_aws", - keepalive => 60000, - password => "passwd", - proto_ver => mqttv4, - queue => #{replayq_dir => "data/t_mqtt/", - replayq_seg_bytes => 10000, - batch_bytes_limit => 1000, - batch_count_limit => 10 - }, - reconnect_delay_ms => 1000, - ssl => false, - %% Consume back to forwarded message for verification - %% NOTE: this is a indefenite loopback without mocking emqx_bridge_worker:import_batch/1 - subscriptions => [{SendToTopic2, _QoS = 1}], - receive_mountpoint => <<"receive/aws/">>, - start_type => auto}, - {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), - ClientId = <<"client-1">>, - try - ?assertEqual([{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Pid)), - ok = emqx_bridge_worker:ensure_subscription_present(Pid, SendToTopic3, _QoS = 1), - ?assertEqual([{SendToTopic3, 1},{SendToTopic2, 1}], - emqx_bridge_worker:get_subscriptions(Pid)), - {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _Props} = emqtt:connect(ConnPid), - emqtt:subscribe(ConnPid, <<"forwarded/+/t_mqtt/one">>, 1), - %% message from a different client, to avoid getting terminated by no-local - Max = 10, - Msgs = lists:seq(1, Max), - lists:foreach(fun(I) -> - {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic, integer_to_binary(I), ?QOS_1) - end, Msgs), - ?assertEqual(10, length(receive_messages(200))), + Name = "mqtt", + Cfg = #{ + name => Name, + forwards => [SendToTopic], + forward_mountpoint => Mountpoint, + start_type => auto, + config => #{ + address => "127.0.0.1:1883", + conn_type => mqtt, + clientid => <<"client1">>, + keepalive => 300, + subscriptions => [#{topic => SendToTopic2, qos => 1}], + receive_mountpoint => <<"receive/aws/">> + }, + queue => #{ + replayq_dir => "data/t_mqtt/", + replayq_seg_bytes => 10000, + batch_bytes_limit => 1000, + batch_count_limit => 10 + } + }, + {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg), + ?assertEqual([{SendToTopic2, 1}], emqx_bridge_worker:get_subscriptions(Name)), + ok = emqx_bridge_worker:ensure_subscription_present(Name, SendToTopic3, _QoS = 1), + ?assertEqual([{SendToTopic3, 1},{SendToTopic2, 1}], + emqx_bridge_worker:get_subscriptions(Name)), + {ok, ConnPid} = emqtt:start_link([{clientid, <<"client-1">>}]), + {ok, _Props} = emqtt:connect(ConnPid), + emqtt:subscribe(ConnPid, <<"forwarded/+/t_mqtt/one">>, 1), + %% message from a different client, to avoid getting terminated by no-local + Max = 10, + Msgs = lists:seq(1, Max), + lists:foreach(fun(I) -> + {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic, integer_to_binary(I), ?QOS_1) + end, Msgs), + ?assertEqual(10, length(receive_messages(200))), - emqtt:subscribe(ConnPid, <<"receive/aws/t_mqtt/two">>, 1), - %% message from a different client, to avoid getting terminated by no-local - Max = 10, - Msgs = lists:seq(1, Max), - lists:foreach(fun(I) -> - {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic2, integer_to_binary(I), ?QOS_1) - end, Msgs), - ?assertEqual(10, length(receive_messages(200))), + emqtt:subscribe(ConnPid, <<"receive/aws/t_mqtt/two">>, 1), + %% message from a different client, to avoid getting terminated by no-local + Max = 10, + Msgs = lists:seq(1, Max), + lists:foreach(fun(I) -> + {ok, _PacketId} = emqtt:publish(ConnPid, SendToTopic2, integer_to_binary(I), ?QOS_1) + end, Msgs), + ?assertEqual(10, length(receive_messages(200))), - emqtt:disconnect(ConnPid) - after - ok = emqx_bridge_worker:stop(Pid) - end. + emqtt:disconnect(ConnPid), + ok = emqx_bridge_worker:stop(Pid). t_stub_normal(Config) when is_list(Config) -> - Cfg = #{forwards => [<<"t_stub_normal/#">>], - connect_module => emqx_bridge_stub_conn, - forward_mountpoint => <<"forwarded">>, - start_type => auto, + Name = "stub_normal", + Cfg = #{ + name => Name, + forwards => [<<"t_stub_normal/#">>], + forward_mountpoint => <<"forwarded">>, + start_type => auto, + config => #{ + conn_type => emqx_bridge_stub_conn, client_pid => self() - }, - {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + } + }, + {ok, Pid} = emqx_bridge_mqtt_sup:create_bridge(Cfg), receive {Pid, emqx_bridge_stub_conn, ready} -> ok after 5000 -> error(timeout) end, - ClientId = <<"ClientId">>, - try - {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _} = emqtt:connect(ConnPid), - {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1), - receive - {stub_message, WorkerPid, BatchRef, _Batch} -> - WorkerPid ! {batch_ack, BatchRef}, - ok - after - 5000 -> - error(timeout) - end, - ?SNK_WAIT(inflight_drained), - ?SNK_WAIT(replayq_drained), - emqtt:disconnect(ConnPid) + {ok, ConnPid} = emqtt:start_link([{clientid, <<"ClientId">>}]), + {ok, _} = emqtt:connect(ConnPid), + {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1), + receive + {stub_message, WorkerPid, BatchRef, _Batch} -> + WorkerPid ! {batch_ack, BatchRef}, + ok after - ok = emqx_bridge_worker:stop(Pid) - end. + 5000 -> + error(timeout) + end, + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid), + ok = emqx_bridge_worker:stop(Pid). -t_stub_overflow(Config) when is_list(Config) -> +t_stub_overflow(_Config) -> Topic = <<"t_stub_overflow/one">>, MaxInflight = 20, - Cfg = #{forwards => [Topic], - connect_module => emqx_bridge_stub_conn, - forward_mountpoint => <<"forwarded">>, - start_type => auto, - client_pid => self(), - max_inflight => MaxInflight - }, - {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), - ClientId = <<"ClientId">>, - try - {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _} = emqtt:connect(ConnPid), - lists:foreach( - fun(I) -> - Data = integer_to_binary(I), - _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) - end, lists:seq(1, MaxInflight * 2)), - ?SNK_WAIT(inflight_full), - Acks = stub_receive(MaxInflight), - lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks), - Acks2 = stub_receive(MaxInflight), - lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2), - ?SNK_WAIT(inflight_drained), - ?SNK_WAIT(replayq_drained), - emqtt:disconnect(ConnPid) - after - ok = emqx_bridge_worker:stop(Worker) - end. + Name = "stub_overflow", + Cfg = #{ + name => Name, + forwards => [<<"t_stub_overflow/one">>], + forward_mountpoint => <<"forwarded">>, + start_type => auto, + max_inflight => MaxInflight, + config => #{ + conn_type => emqx_bridge_stub_conn, + client_pid => self() + } + }, + {ok, Worker} = emqx_bridge_mqtt_sup:create_bridge(Cfg), + {ok, ConnPid} = emqtt:start_link([{clientid, <<"ClientId">>}]), + {ok, _} = emqtt:connect(ConnPid), + lists:foreach( + fun(I) -> + Data = integer_to_binary(I), + _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) + end, lists:seq(1, MaxInflight * 2)), + ?SNK_WAIT(inflight_full), + Acks = stub_receive(MaxInflight), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks), + Acks2 = stub_receive(MaxInflight), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2), + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid), + ok = emqx_bridge_worker:stop(Worker). -t_stub_random_order(Config) when is_list(Config) -> +t_stub_random_order(_Config) -> Topic = <<"t_stub_random_order/a">>, MaxInflight = 10, - Cfg = #{forwards => [Topic], - connect_module => emqx_bridge_stub_conn, - forward_mountpoint => <<"forwarded">>, - start_type => auto, - client_pid => self(), - max_inflight => MaxInflight - }, - {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + Name = "stub_random_order", + Cfg = #{ + name => Name, + forwards => [Topic], + forward_mountpoint => <<"forwarded">>, + start_type => auto, + max_inflight => MaxInflight, + config => #{ + conn_type => emqx_bridge_stub_conn, + client_pid => self() + } + }, + {ok, Worker} = emqx_bridge_mqtt_sup:create_bridge(Cfg), ClientId = <<"ClientId">>, - try - {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _} = emqtt:connect(ConnPid), - lists:foreach( - fun(I) -> - Data = integer_to_binary(I), - _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) - end, lists:seq(1, MaxInflight)), - Acks = stub_receive(MaxInflight), - lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, - lists:reverse(Acks)), - ?SNK_WAIT(inflight_drained), - ?SNK_WAIT(replayq_drained), - emqtt:disconnect(ConnPid) - after - ok = emqx_bridge_worker:stop(Worker) - end. + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(ConnPid), + lists:foreach( + fun(I) -> + Data = integer_to_binary(I), + _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) + end, lists:seq(1, MaxInflight)), + Acks = stub_receive(MaxInflight), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, + lists:reverse(Acks)), + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid), + ok = emqx_bridge_worker:stop(Worker). -t_stub_retry_inflight(Config) when is_list(Config) -> +t_stub_retry_inflight(_Config) -> Topic = <<"to_stub_retry_inflight/a">>, MaxInflight = 10, - Cfg = #{forwards => [Topic], - connect_module => emqx_bridge_stub_conn, - forward_mountpoint => <<"forwarded">>, - reconnect_delay_ms => 10, - start_type => auto, - client_pid => self(), - max_inflight => MaxInflight - }, - {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + Name = "stub_retry_inflight", + Cfg = #{ + name => Name, + forwards => [Topic], + forward_mountpoint => <<"forwarded">>, + reconnect_interval => 10, + start_type => auto, + max_inflight => MaxInflight, + config => #{ + conn_type => emqx_bridge_stub_conn, + client_pid => self() + } + }, + {ok, Worker} = emqx_bridge_mqtt_sup:create_bridge(Cfg), ClientId = <<"ClientId2">>, - try - case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of - {ok, #{inflight := 0}} -> ok; - Other -> ct:fail("~p", [Other]) - end, - {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), - {ok, _} = emqtt:connect(ConnPid), - lists:foreach( - fun(I) -> - Data = integer_to_binary(I), - _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) - end, lists:seq(1, MaxInflight)), - %% receive acks but do not ack - Acks1 = stub_receive(MaxInflight), - ?assertEqual(MaxInflight, length(Acks1)), - %% simulate a disconnect - Worker ! {disconnected, self(), test}, - ?SNK_WAIT(disconnected), - case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of - {ok, _} -> ok; - Error -> ct:fail("~p", [Error]) - end, - %% expect worker to retry inflight, so to receive acks again - Acks2 = stub_receive(MaxInflight), - ?assertEqual(MaxInflight, length(Acks2)), - lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, - lists:reverse(Acks2)), - ?SNK_WAIT(inflight_drained), - ?SNK_WAIT(replayq_drained), - emqtt:disconnect(ConnPid) - after - ok = emqx_bridge_worker:stop(Worker) - end. + case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of + {ok, #{inflight := 0}} -> ok; + Other -> ct:fail("~p", [Other]) + end, + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(ConnPid), + lists:foreach( + fun(I) -> + Data = integer_to_binary(I), + _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) + end, lists:seq(1, MaxInflight)), + %% receive acks but do not ack + Acks1 = stub_receive(MaxInflight), + ?assertEqual(MaxInflight, length(Acks1)), + %% simulate a disconnect + Worker ! {disconnected, self(), test}, + ?SNK_WAIT(disconnected), + case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of + {ok, _} -> ok; + Error -> ct:fail("~p", [Error]) + end, + %% expect worker to retry inflight, so to receive acks again + Acks2 = stub_receive(MaxInflight), + ?assertEqual(MaxInflight, length(Acks2)), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, + lists:reverse(Acks2)), + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid), + ok = emqx_bridge_worker:stop(Worker). stub_receive(N) -> stub_receive(N, []). diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_tests.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_tests.erl index 69ff87356..ffa2e9ee5 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_tests.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_tests.erl @@ -15,7 +15,6 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_worker_tests). --behaviour(emqx_bridge_connect). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/emqx.hrl"). @@ -69,14 +68,14 @@ disturbance_test() -> emqx_bridge_worker:register_metrics(), Ref = make_ref(), TestPid = self(), - Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), - {ok, Pid} = emqx_bridge_worker:start_link(?BRIDGE_NAME, Config), - ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), + Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), + {ok, Pid} = emqx_bridge_worker:start_link(Config#{name => disturbance}), + ?assertEqual(Pid, whereis(emqx_bridge_worker_disturbance)), ?WAIT({connection_start_attempt, Ref}, 1000), Pid ! {disconnected, TestPid, test}, ?WAIT({connection_start_attempt, Ref}, 1000), emqx_metrics:stop(), - ok = emqx_bridge_worker:stop(?BRIDGE_REG_NAME). + ok = emqx_bridge_worker:stop(Pid). % % %% buffer should continue taking in messages when disconnected % buffer_when_disconnected_test_() -> @@ -113,22 +112,24 @@ manual_start_stop_test() -> emqx_bridge_worker:register_metrics(), Ref = make_ref(), TestPid = self(), + BridgeName = manual_start_stop, Config0 = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), Config = Config0#{start_type := manual}, - {ok, Pid} = emqx_bridge_worker:start_link(?BRIDGE_NAME, Config), + {ok, Pid} = emqx_bridge_worker:start_link(Config#{name => BridgeName}), %% call ensure_started again should yeld the same result - ok = emqx_bridge_worker:ensure_started(?BRIDGE_NAME), - ?assertEqual(Pid, whereis(?BRIDGE_REG_NAME)), - emqx_bridge_worker:ensure_stopped(unknown), - emqx_bridge_worker:ensure_stopped(Pid), - emqx_bridge_worker:ensure_stopped(?BRIDGE_REG_NAME), - emqx_metrics:stop(). + ok = emqx_bridge_worker:ensure_started(BridgeName), + emqx_bridge_worker:ensure_stopped(BridgeName), + emqx_metrics:stop(), + ok = emqx_bridge_worker:stop(Pid). make_config(Ref, TestPid, Result) -> - #{test_pid => TestPid, - test_ref => Ref, - connect_module => ?MODULE, - reconnect_delay_ms => 50, - connect_result => Result, - start_type => auto - }. + #{ + start_type => auto, + reconnect_interval => 50, + config => #{ + test_pid => TestPid, + test_ref => Ref, + conn_type => ?MODULE, + connect_result => Result + } + }. diff --git a/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl index 3f685a72a..80f51d2cc 100644 --- a/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_rule_actions/src/emqx_bridge_mqtt_actions.erl @@ -526,7 +526,7 @@ connect(Options = #{disk_cache := DiskCache, ecpool_worker_id := Id, pool_name : end end, Options2 = maps:without([ecpool_worker_id, pool_name, append], Options1), - emqx_bridge_worker:start_link(name(Pool, Id), Options2). + emqx_bridge_worker:start_link(Options2#{name => name(Pool, Id)}). name(Pool, Id) -> list_to_atom(atom_to_list(Pool) ++ ":" ++ integer_to_list(Id)). pool_name(ResId) -> diff --git a/data/loaded_plugins.tmpl b/data/loaded_plugins.tmpl index d26d56abf..80a0c832c 100644 --- a/data/loaded_plugins.tmpl +++ b/data/loaded_plugins.tmpl @@ -2,4 +2,3 @@ {emqx_dashboard, true}. {emqx_modules, {{enable_plugin_emqx_modules}}}. {emqx_retainer, {{enable_plugin_emqx_retainer}}}. -{emqx_bridge_mqtt, {{enable_plugin_emqx_bridge_mqtt}}}. diff --git a/rebar.config.erl b/rebar.config.erl index ff66f746c..5d5d02d05 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -192,8 +192,7 @@ overlay_vars_rel(RelType) -> cloud -> "vm.args"; edge -> "vm.args.edge" end, - [ {enable_plugin_emqx_bridge_mqtt, RelType =:= edge} - , {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce + [ {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce , {enable_plugin_emqx_retainer, true} , {vm_args_file, VmArgs} ]. @@ -256,6 +255,7 @@ relx_apps(ReleaseType) -> , emqx_connector , emqx_data_bridge , emqx_rule_engine + , emqx_bridge_mqtt ] ++ [emqx_telemetry || not is_enterprise()] ++ [emqx_modules || not is_enterprise()] @@ -282,7 +282,6 @@ relx_plugin_apps(ReleaseType) -> [ emqx_retainer , emqx_management , emqx_dashboard - , emqx_bridge_mqtt , emqx_sn , emqx_coap , emqx_stomp @@ -375,6 +374,8 @@ emqx_etc_overlay_common() -> {"{{base_dir}}/lib/emqx_telemetry/etc/emqx_telemetry.conf", "etc/plugins/emqx_telemetry.conf"}, {"{{base_dir}}/lib/emqx_authn/etc/emqx_authn.conf", "etc/plugins/emqx_authn.conf"}, {"{{base_dir}}/lib/emqx_authz/etc/emqx_authz.conf", "etc/plugins/authz.conf"}, + {"{{base_dir}}/lib/emqx_rule_engine/etc/emqx_rule_engine.conf", "etc/plugins/emqx_rule_engine.conf"}, + {"{{base_dir}}/lib/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf", "etc/plugins/emqx_bridge_mqtt.conf"}, %% TODO: check why it has to end with .paho %% and why it is put to etc/plugins dir {"{{base_dir}}/lib/emqx/etc/acl.conf.paho", "etc/plugins/acl.conf.paho"}].