From 0fbf813ebf7dc5337e3b0d5d690f3f6a906b0977 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Tue, 28 Aug 2018 19:55:23 +0800 Subject: [PATCH 1/5] Add mqtt connect tests cases --- etc/emqx.conf | 2 +- test/emqx_client_SUITE.erl | 7 +++++-- test/emqx_ct_broker_helpers.erl | 35 +++++++++++++++++++++++++++++---- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4703f5083..31c5a11ed 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1066,7 +1066,7 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## Most of it was copied from Mozilla’s Server Side TLS article ## ## Value: Ciphers -## listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA +listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA ## SSL parameter renegotiation is a feature that allows a client and a server ## to renegotiate the parameters of the SSL connection on the fly. diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 7b2d5aaae..458c21d68 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -23,9 +23,9 @@ -include_lib("eunit/include/eunit.hrl"). -all() -> []. +all() -> [{group, connect}]. -groups() -> []. +groups() -> [{connect, [start]}]. init_per_suite(Config) -> Config. @@ -39,3 +39,6 @@ init_per_group(_Group, Config) -> end_per_group(_Group, _Config) -> ok. +start(_Config) -> + {ok, ClientPid, _} = emqx_client:start_link(). + diff --git a/test/emqx_ct_broker_helpers.erl b/test/emqx_ct_broker_helpers.erl index a62297a49..038ac0dc6 100644 --- a/test/emqx_ct_broker_helpers.erl +++ b/test/emqx_ct_broker_helpers.erl @@ -29,6 +29,31 @@ {cacertfile, "certs/cacert.pem"}, {certfile, "certs/client-cert.pem"}]). +-define(CIPHERS, [{ciphers, + ["ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES256-SHA384", + "ECDHE-RSA-AES256-SHA384","ECDHE-ECDSA-DES-CBC3-SHA", + "ECDH-ECDSA-AES256-GCM-SHA384", + "ECDH-RSA-AES256-GCM-SHA384", + "ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384", + "DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256", + "AES256-GCM-SHA384","AES256-SHA256", + "ECDHE-ECDSA-AES128-GCM-SHA256", + "ECDHE-RSA-AES128-GCM-SHA256", + "ECDHE-ECDSA-AES128-SHA256", + "ECDHE-RSA-AES128-SHA256", + "ECDH-ECDSA-AES128-GCM-SHA256", + "ECDH-RSA-AES128-GCM-SHA256", + "ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256", + "DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256", + "AES128-GCM-SHA256","AES128-SHA256", + "ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA", + "DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA", + "ECDH-RSA-AES256-SHA","AES256-SHA", + "ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA", + "DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA", + "ECDH-RSA-AES128-SHA","AES128-SHA"]}]). run_setup_steps() -> NewConfig = generate_config(), @@ -71,7 +96,7 @@ change_opts(SslType) -> lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) -> case Protocol of ssl -> - SslOpts = proplists:get_value(sslopts, Opts), + SslOpts = proplists:get_value(ssl_options, Opts), Keyfile = local_path(["etc/certs", "key.pem"]), Certfile = local_path(["etc/certs", "cert.pem"]), TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}), @@ -89,13 +114,15 @@ change_opts(SslType) -> (_) -> true end, TupleList2) end, - [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc]; + [{Protocol, Port, lists:keyreplace(ssl_options, 1, Opts, {ssl_options, TupleList3})} | Acc]; _ -> [Listener | Acc] end end, [], Listeners), application:set_env(?APP, listeners, NewListeners). -client_ssl() -> - [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT]. +client_ssl_twoway() -> + [{Key, local_path(["etc", File])} || {Key, File} <- ?MQTT_SSL_CLIENT] ++ ?CIPHERS. +client_ssl() -> + ?CIPHERS ++ [{reuse_sessions, true}]. From 37c1570e94f34c96f319746ea6127135274b02bc Mon Sep 17 00:00:00 2001 From: HuangDan Date: Tue, 28 Aug 2018 20:05:56 +0800 Subject: [PATCH 2/5] Add mqtt connect tests cases --- test/emqx_SUITE.erl | 136 ++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 67 deletions(-) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 752c40f5c..ed5c04ad8 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -25,21 +25,34 @@ -include_lib("common_test/include/ct.hrl"). +-include("emqx_mqtt.hrl"). + +-record(ssl_socket, {tcp, ssl}). + +-type(socket() :: inet:socket() | #ssl_socket{}). + -define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{ client_id = <<"mqtt_client">>, username = <<"admin">>, password = <<"public">>})). + +-define(CLIENT2, ?CONNECT_PACKET(#mqtt_packet_connect{ + username = <<"admin">>, + clean_start = false, + password = <<"public">>})). all() -> - [{group, connect}, - {group, cleanSession}]. + [{group, connect}%, + % {group, cleanSession} + ]. groups() -> [{connect, [non_parallel_tests], - [mqtt_connect, -% mqtt_connect_with_tcp, - mqtt_connect_with_ssl_oneway, - mqtt_connect_with_ssl_twoway%, - % mqtt_connect_with_ws + [ + % mqtt_connect, + % mqtt_connect_with_tcp, + % mqtt_connect_with_ssl_oneway, + % mqtt_connect_with_ssl_twoway%, + mqtt_connect_with_ws%, ]}, {cleanSession, [sequence], [cleanSession_validate] @@ -48,7 +61,6 @@ groups() -> init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), - % ct:log("Apps:~p", [Apps]), Config. end_per_suite(_Config) -> @@ -65,76 +77,65 @@ mqtt_connect(_) -> ?assertEqual(<<32,2,0,0>>, connect_broker_(<<16,12,0,4,77,81,84,84,4,2,0,90,0,0>>, 4)). connect_broker_(Packet, RecvSize) -> - {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]), - gen_tcp:send(Sock, Packet), + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + emqx_client_sock:send(Sock, Packet), {ok, Data} = gen_tcp:recv(Sock, RecvSize, 3000), - gen_tcp:close(Sock), + emqx_client_sock:close(Sock), Data. -%% mqtt_connect_with_tcp(_) -> -%% %% Issue #599 -%% %% Empty clientId and clean_session = false -%% {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]), -%% Packet = raw_send_serialise(?CLIENT), -%% gen_tcp:send(Sock, Packet), -%% {ok, Data} = gen_tcp:recv(Sock, 0), -%% % {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data), -%% gen_tcp:close(Sock). +mqtt_connect_with_tcp(_) -> + %% Issue #599 + %% Empty clientId and clean_session = false + {ok, Sock} = emqx_client_sock:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}], 3000), + Packet = raw_send_serialise(?CLIENT2), + emqx_client_sock:send(Sock, Packet), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ?CONNACK_PACKET(?CONNACK_INVALID_ID), _} = raw_recv_pase(Data), + emqx_client_sock:close(Sock). mqtt_connect_with_ssl_oneway(_) -> - emqx:stop(), + emqx:shutdown(), emqx_ct_broker_helpers:change_opts(ssl_oneway), emqx:start(), - timer:sleep(5000), - {ok, SslOneWay} = emqttc:start_link([{host, "localhost"}, - {port, 8883}, - {logger, debug}, - {client_id, <<"ssloneway">>}, ssl]), - timer:sleep(100), - emqttc:subscribe(SslOneWay, <<"topic">>, qos1), - {ok, Pub} = emqttc:start_link([{host, "localhost"}, - {client_id, <<"pub">>}]), - emqttc:publish(Pub, <<"topic">>, <<"SSL oneWay test">>, [{qos, 1}]), - timer:sleep(100), - receive {publish, _Topic, RM} -> - ?assertEqual(<<"SSL oneWay test">>, RM) - after 1000 -> false - end, - timer:sleep(100), - emqttc:disconnect(SslOneWay), - emqttc:disconnect(Pub). + ClientSsl = emqx_ct_broker_helpers:client_ssl(), + {ok, #ssl_socket{tcp = Sock, ssl = SslSock}} + = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), +%% Packet = raw_send_serialise(?CLIENT), +%% ssl:send(SslSock, Packet), +%% receive Data -> +%% ct:log("Data:~p~n", [Data]) +%% after 30000 -> +%% ok +%% end, + ssl:close(SslSock). mqtt_connect_with_ssl_twoway(_Config) -> - emqx:stop(), + emqx:shutdown(), emqx_ct_broker_helpers:change_opts(ssl_twoway), emqx:start(), - timer:sleep(3000), - ClientSSl = emqx_ct_broker_helpers:client_ssl(), - {ok, SslTwoWay} = emqttc:start_link([{host, "localhost"}, - {port, 8883}, - {client_id, <<"ssltwoway">>}, - {ssl, ClientSSl}]), - {ok, Sub} = emqttc:start_link([{host, "localhost"}, - {client_id, <<"sub">>}]), - emqttc:subscribe(Sub, <<"topic">>, qos1), - emqttc:publish(SslTwoWay, <<"topic">>, <<"ssl client pub message">>, [{qos, 1}]), - timer:sleep(10), - receive {publish, _Topic, RM} -> - ?assertEqual(<<"ssl client pub message">>, RM) - after 1000 -> false + ClientSsl = emqx_ct_broker_helpers:client_ssl_twoway(), + {ok, #ssl_socket{tcp = _Sock1, ssl = SslSock} = Sock} + = emqx_client_sock:connect("127.0.0.1", 8883, [{ssl_opts, ClientSsl}], 3000), + Packet = raw_send_serialise(?CLIENT), + emqx_client_sock:setopts(Sock, [{active, once}]), + emqx_client_sock:send(Sock, Packet), + timer:sleep(500), + receive {ssl, _, Data}-> + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data) + after 1000 -> + ok end, - emqttc:disconnect(SslTwoWay), - emqttc:disconnect(Sub). + emqx_client_sock:close(Sock). -%% mqtt_connect_with_ws(_Config) -> -%% WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), -%% {ok, _} = rfc6455_client:open(WS), -%% Packet = raw_send_serialise(?CLIENT), -%% ok = rfc6455_client:send_binary(WS, Packet), -%% {binary, P} = rfc6455_client:recv(WS), -%% % {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P), -%% {close, _} = rfc6455_client:close(WS), -%% ok. +mqtt_connect_with_ws(_Config) -> + WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), + {ok, _} = rfc6455_client:open(WS), + Packet = raw_send_serialise(?CLIENT), + ok = rfc6455_client:send_binary(WS, Packet), + {binary, P} = rfc6455_client:recv(WS), + ct:log(":~p", [P]), + {close, _} = rfc6455_client:close(WS), + ok. cleanSession_validate(_) -> {ok, C1} = emqttc:start_link([{host, "localhost"}, @@ -163,8 +164,9 @@ cleanSession_validate(_) -> emqttc:disconnect(C11). raw_send_serialise(Packet) -> - emqttc_serialiser:serialise(Packet). + emqx_frame:serialize(Packet). raw_recv_pase(P) -> - emqttc_parser:parse(P, emqttc_parser:new()). + emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4} }). From 9b06589eab6c386b50392686eb9fe149c51a2e2f Mon Sep 17 00:00:00 2001 From: HuangDan Date: Wed, 29 Aug 2018 13:49:54 +0800 Subject: [PATCH 3/5] Add SUBSCRIBER/PUBLISH Packet test cases --- test/emqx_SUITE.erl | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index ed5c04ad8..952fc6cb9 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -40,6 +40,17 @@ username = <<"admin">>, clean_start = false, password = <<"public">>})). + +-define(SUBCODE, [0]). + +-define(PACKETID, 1). + +-define(PUBQOS, 1). + +-define(SUBPACKET, ?SUBSCRIBE_PACKET(?PACKETID, [{<<"sub/topic">>, ?DEFAULT_SUBOPTS}])). + +-define(PUBPACKET, ?PUBLISH_PACKET(?PUBQOS, <<"sub/topic">>, ?PACKETID, <<"publish">>)). + all() -> [{group, connect}%, % {group, cleanSession} @@ -48,11 +59,11 @@ all() -> groups() -> [{connect, [non_parallel_tests], [ - % mqtt_connect, - % mqtt_connect_with_tcp, - % mqtt_connect_with_ssl_oneway, - % mqtt_connect_with_ssl_twoway%, - mqtt_connect_with_ws%, + mqtt_connect, + mqtt_connect_with_tcp, + mqtt_connect_with_ssl_oneway, + mqtt_connect_with_ssl_twoway, + mqtt_connect_with_ws ]}, {cleanSession, [sequence], [cleanSession_validate] @@ -130,10 +141,24 @@ mqtt_connect_with_ssl_twoway(_Config) -> mqtt_connect_with_ws(_Config) -> WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), {ok, _} = rfc6455_client:open(WS), + + %% Connect Packet Packet = raw_send_serialise(?CLIENT), ok = rfc6455_client:send_binary(WS, Packet), - {binary, P} = rfc6455_client:recv(WS), - ct:log(":~p", [P]), + {binary, CONACK} = rfc6455_client:recv(WS), + {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK), + + %% Sub Packet + SubPacket = raw_send_serialise(?SUBPACKET), + rfc6455_client:send_binary(WS, SubPacket), + {binary, SubAck} = rfc6455_client:recv(WS), + {ok, ?SUBACK_PACKET(?PACKETID, ?SUBCODE), _} = raw_recv_pase(SubAck), + + %% Pub Packet QoS 1 + PubPacket = raw_send_serialise(?PUBPACKET), + rfc6455_client:send_binary(WS, PubPacket), + {binary, PubAck} = rfc6455_client:recv(WS), + {ok, ?PUBACK_PACKET(?PACKETID), _} = raw_recv_pase(PubAck), {close, _} = rfc6455_client:close(WS), ok. From 53d7d0a9d462b90fa6fdcf9426de59a95adb2860 Mon Sep 17 00:00:00 2001 From: HuangDan Date: Wed, 29 Aug 2018 16:24:01 +0800 Subject: [PATCH 4/5] Update the peer_cert_as_username conf desc --- etc/emqx.conf | 6 +++--- priv/emqx.schema | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 31c5a11ed..588725446 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -752,8 +752,8 @@ listener.tcp.external.access.1 = allow all ## Enable the option for X.509 certificate based authentication. ## EMQX will use the common name of certificate as MQTT username. ## -## Value: boolean -## listener.tcp.external.peer_cert_as_username = true +## Value: cn | dn +## listener.tcp.external.peer_cert_as_username = cn ## The TCP backlog defines the maximum length that the queue of pending ## connections can grow to. @@ -1096,7 +1096,7 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Use the CN field from the client certificate as a username. ## Notice that 'verify' should be set as 'verify_peer'. ## -## Value: boolean +## Value: cn | en ## listener.ssl.external.peer_cert_as_username = cn ## TCP backlog for the SSL connection. diff --git a/priv/emqx.schema b/priv/emqx.schema index a0d2bc0e2..1a2209dd8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -857,8 +857,7 @@ end}. ]}. {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [ - {default, false}, - {datatype, {enum, [true, false]}} + {datatype, {enum, [cn, dn]}} ]}. {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ From d3ed0853ef4cecf8eeabee3f8f87aac0bc05d696 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 30 Aug 2018 10:41:04 +0800 Subject: [PATCH 5/5] Rename bridge module --- src/emqx_bridge.erl | 288 ++++++++++++------ src/emqx_bridge1.erl | 254 --------------- src/emqx_bridge1_sup.erl | 45 --- src/emqx_bridge_sup.erl | 31 +- src/emqx_local_bridge.erl | 162 ++++++++++ src/emqx_local_bridge_sup.erl | 26 ++ ..._sup.erl => emqx_local_bridge_sup_sup.erl} | 6 +- src/emqx_sup.erl | 7 +- 8 files changed, 410 insertions(+), 409 deletions(-) delete mode 100644 src/emqx_bridge1.erl delete mode 100644 src/emqx_bridge1_sup.erl create mode 100644 src/emqx_local_bridge.erl create mode 100644 src/emqx_local_bridge_sup.erl rename src/{emqx_bridge_sup_sup.erl => emqx_local_bridge_sup_sup.erl} (94%) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index d4ebab041..d1763d31c 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -19,76 +19,77 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([start_link/5]). +-import(proplists, [get_value/2, get_value/3]). + +-export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(PING_DOWN_INTERVAL, 1000). +-record(state, {client_pid, options, reconnect_time, reconnect_count, + def_reconnect_count, type, mountpoint, queue, store_type, + max_pending_messages}). --record(state, {pool, id, - node, subtopic, - qos = ?QOS_0, - topic_suffix = <<>>, - topic_prefix = <<>>, - mqueue :: emqx_mqueue:mqueue(), - max_queue_len = 10000, - ping_down_interval = ?PING_DOWN_INTERVAL, - status = up}). +-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, + packet_id, topic, props, payload}). --type(option() :: {qos, emqx_mqtt_types:qos()} | - {topic_suffix, binary()} | - {topic_prefix, binary()} | - {max_queue_len, pos_integer()} | - {ping_down_interval, pos_integer()}). +start_link(Name, Options) -> + gen_server:start_link({local, name(Name)}, ?MODULE, [Options], []). --export_type([option/0]). +start_bridge(Name) -> + gen_server:call(name(Name), start_bridge). -%% @doc Start a bridge --spec(start_link(term(), pos_integer(), atom(), binary(), [option()]) - -> {ok, pid()} | ignore | {error, term()}). -start_link(Pool, Id, Node, Topic, Options) -> - gen_server:start_link(?MODULE, [Pool, Id, Node, Topic, Options], [{hibernate_after, 5000}]). +stop_bridge(Name) -> + gen_server:call(name(Name), stop_bridge). + +status(Pid) -> + gen_server:call(Pid, status). %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ -init([Pool, Id, Node, Topic, Options]) -> +init([Options]) -> process_flag(trap_exit, true), - true = gproc_pool:connect_worker(Pool, {Pool, Id}), - case net_kernel:connect_node(Node) of - true -> - true = erlang:monitor_node(Node, true), - Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), - State = parse_opts(Options, #state{node = Node, subtopic = Topic}), - %%TODO: queue.... - MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]), - {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; - false -> - {stop, {cannot_connect_node, Node}} - end. + case get_value(start_type, Options, manual) of + manual -> ok; + auto -> erlang:send_after(1000, self(), start) + end, + ReconnectCount = get_value(reconnect_count, Options, 10), + ReconnectTime = get_value(reconnect_time, Options, 30000), + MaxPendingMsg = get_value(max_pending_messages, Options, 10000), + Mountpoint = format_mountpoint(get_value(mountpoint, Options)), + StoreType = get_value(store_type, Options, memory), + Type = get_value(type, Options, in), + Queue = [], + {ok, #state{type = Type, + mountpoint = Mountpoint, + queue = Queue, + store_type = StoreType, + options = Options, + reconnect_count = ReconnectCount, + reconnect_time = ReconnectTime, + def_reconnect_count = ReconnectCount, + max_pending_messages = MaxPendingMsg}}. -parse_opts([], State) -> - State; -parse_opts([{qos, QoS} | Opts], State) -> - parse_opts(Opts, State#state{qos = QoS}); -parse_opts([{topic_suffix, Suffix} | Opts], State) -> - parse_opts(Opts, State#state{topic_suffix= Suffix}); -parse_opts([{topic_prefix, Prefix} | Opts], State) -> - parse_opts(Opts, State#state{topic_prefix = Prefix}); -parse_opts([{max_queue_len, Len} | Opts], State) -> - parse_opts(Opts, State#state{max_queue_len = Len}); -parse_opts([{ping_down_interval, Interval} | Opts], State) -> - parse_opts(Opts, State#state{ping_down_interval = Interval}); -parse_opts([_Opt | Opts], State) -> - parse_opts(Opts, State). +handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> + {noreply, NewState} = handle_info(start, State), + {reply, <<"start bridge successfully">>, NewState}; -qname(Node, Topic) when is_atom(Node) -> - qname(atom_to_list(Node), Topic); -qname(Node, Topic) -> - iolist_to_binary(["Bridge:", Node, ":", Topic]). +handle_call(start_bridge, _From, State) -> + {reply, <<"bridge already started">>, State}; + +handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> + {reply, <<"bridge not started">>, State}; + +handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> + emqx_client:disconnect(Pid), + {reply, <<"stop bridge successfully">>, State}; + +handle_call(status, _From, State = #state{client_pid = undefined}) -> + {reply, <<"Stopped">>, State}; +handle_call(status, _From, State = #state{client_pid = _Pid})-> + {reply, <<"Running">>, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), @@ -98,65 +99,156 @@ handle_cast(Msg, State) -> emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) -> - %% TODO: how to drop??? - {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; - -handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> - ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), +handle_info(start, State = #state{reconnect_count = 0}) -> {noreply, State}; -handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> - emqx_logger:warning("[Bridge] node down: ~s", [Node]), - erlang:send_after(Interval, self(), ping_down_node), - {noreply, State#state{status = down}, hibernate}; - -handle_info({nodeup, Node}, State = #state{node = Node}) -> - %% TODO: Really fast?? - case emqx:is_running(Node) of - true -> emqx_logger:warning("[Bridge] Node up: ~s", [Node]), - {noreply, dequeue(State#state{status = up})}; - false -> self() ! {nodedown, Node}, - {noreply, State#state{status = down}} +%%---------------------------------------------------------------- +%% start in message bridge +%%---------------------------------------------------------------- +handle_info(start, State = #state{options = Options, + client_pid = undefined, + reconnect_time = ReconnectTime, + reconnect_count = ReconnectCount, + type = in}) -> + case emqx_client:start_link([{owner, self()}|options(Options)]) of + {ok, ClientPid, _} -> + Subs = get_value(subscriptions, Options, []), + [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], + {noreply, State#state{client_pid = ClientPid}}; + {error,_} -> + erlang:send_after(ReconnectTime, self(), start), + {noreply, State = #state{reconnect_count = ReconnectCount-1}} end; -handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> - Self = self(), - spawn_link(fun() -> - case net_kernel:connect_node(Node) of - true -> Self ! {nodeup, Node}; - false -> erlang:send_after(Interval, Self, ping_down_node) - end - end), +%%---------------------------------------------------------------- +%% start out message bridge +%%---------------------------------------------------------------- +handle_info(start, State = #state{options = Options, + client_pid = undefined, + reconnect_time = ReconnectTime, + reconnect_count = ReconnectCount, + type = out}) -> + case emqx_client:start_link([{owner, self()}|options(Options)]) of + {ok, ClientPid, _} -> + Subs = get_value(subscriptions, Options, []), + [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], + ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), + [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})], + {noreply, State#state{client_pid = ClientPid}}; + {error,_} -> + erlang:send_after(ReconnectTime, self(), start), + {noreply, State = #state{reconnect_count = ReconnectCount-1}} + end; + +%%---------------------------------------------------------------- +%% received local node message +%%---------------------------------------------------------------- +handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, + State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue, + store_type = StoreType, max_pending_messages = MaxPendingMsg}) -> + Msg = #mqtt_msg{qos = 1, + retain = Retain, + topic = mountpoint(Mountpoint, Topic), + payload = Payload}, + case emqx_client:publish(Pid, Msg) of + {ok, PkgId} -> + {noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; + {error, Reason} -> + emqx_logger:error("Publish fail:~p", [Reason]), + {noreply, State} + end; + +%%---------------------------------------------------------------- +%% received remote node message +%%---------------------------------------------------------------- +handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, + properties := Props, payload := Payload}}, State) -> + NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), + NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain=> Retain}, NewMsg0)), + emqx_broker:publish(NewMsg1), {noreply, State}; -handle_info({'EXIT', _Pid, normal}, State) -> - {noreply, State}; +%%---------------------------------------------------------------- +%% received remote puback message +%%---------------------------------------------------------------- +handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) -> + % lists:keydelete(PkgId, 1, Queue) + {noreply, State#state{queue = delete(StoreType, PkgId, Queue)}}; + +handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> + {noreply, State#state{client_pid = undefined}}; + +handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, + reconnect_time = ReconnectTime, + def_reconnect_count = DefReconnectCount}) -> + lager:warning("emqx bridge stop reason:~p", [Reason]), + erlang:send_after(ReconnectTime, self(), start), + {noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}}; handle_info(Info, State) -> emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> - gproc_pool:disconnect_worker(Pool, {Pool, Id}). +terminate(_Reason, #state{}) -> + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -dequeue(State = #state{mqueue = MQ}) -> - case emqx_mqueue:out(MQ) of - {empty, MQ1} -> - State#state{mqueue = MQ1}; - {{value, Msg}, MQ1} -> - handle_info({dispatch, Msg#message.topic, Msg}, State), - dequeue(State#state{mqueue = MQ1}) +proto_ver(mqtt3) -> v3; +proto_ver(mqtt4) -> v4; +proto_ver(mqtt5) -> v5. +address(Address) -> + case string:tokens(Address, ":") of + [Host] -> {Host, 1883}; + [Host, Port] -> {Host, list_to_integer(Port)} end. +options(Options) -> + options(Options, []). +options([], Acc) -> + Acc; +options([{username, Username}| Options], Acc) -> + options(Options, [{username, Username}|Acc]); +options([{proto_ver, ProtoVer}| Options], Acc) -> + options(Options, [{proto_ver, proto_ver(ProtoVer)}|Acc]); +options([{password, Password}| Options], Acc) -> + options(Options, [{password, Password}|Acc]); +options([{keepalive, Keepalive}| Options], Acc) -> + options(Options, [{keepalive, Keepalive}|Acc]); +options([{client_id, ClientId}| Options], Acc) -> + options(Options, [{client_id, ClientId}|Acc]); +options([{clean_start, CleanStart}| Options], Acc) -> + options(Options, [{clean_start, CleanStart}|Acc]); +options([{address, Address}| Options], Acc) -> + {Host, Port} = address(Address), + options(Options, [{host, Host}, {port, Port}|Acc]); +options([_Option | Options], Acc) -> + options(Options, Acc). -transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, - topic_suffix = Suffix}) -> - Msg#message{topic = <>}. +name(Id) -> + list_to_atom(lists:concat([?MODULE, "_", Id])). +i2b(L) -> iolist_to_binary(L). + +mountpoint(undefined, Topic) -> + Topic; +mountpoint(Prefix, Topic) -> + <>. + +format_mountpoint(undefined) -> + undefined; +format_mountpoint(Prefix) -> + binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + +store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> + [Data | Queue]; +store(memory, _Data, Queue, _MaxPendingMsg) -> + lager:error("Beyond max pending messages"), + Queue; +store(disk, Data, Queue, _MaxPendingMsg)-> + [Data | Queue]. + +delete(memory, PkgId, Queue) -> + lists:keydelete(PkgId, 1, Queue); +delete(disk, PkgId, Queue) -> + lists:keydelete(PkgId, 1, Queue). \ No newline at end of file diff --git a/src/emqx_bridge1.erl b/src/emqx_bridge1.erl deleted file mode 100644 index cfad74803..000000000 --- a/src/emqx_bridge1.erl +++ /dev/null @@ -1,254 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge1). - --behaviour(gen_server). - --include("emqx.hrl"). --include("emqx_mqtt.hrl"). - --import(proplists, [get_value/2, get_value/3]). - --export([start_link/2, start_bridge/1, stop_bridge/1, status/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --record(state, {client_pid, options, reconnect_time, reconnect_count, - def_reconnect_count, type, mountpoint, queue, store_type, - max_pending_messages}). - --record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, - packet_id, topic, props, payload}). - -start_link(Name, Options) -> - gen_server:start_link({local, name(Name)}, ?MODULE, [Options], []). - -start_bridge(Name) -> - gen_server:call(name(Name), start_bridge). - -stop_bridge(Name) -> - gen_server:call(name(Name), stop_bridge). - -status(Pid) -> - gen_server:call(Pid, status). - -%%------------------------------------------------------------------------------ -%% gen_server callbacks -%%------------------------------------------------------------------------------ - -init([Options]) -> - process_flag(trap_exit, true), - case get_value(start_type, Options, manual) of - manual -> ok; - auto -> erlang:send_after(1000, self(), start) - end, - ReconnectCount = get_value(reconnect_count, Options, 10), - ReconnectTime = get_value(reconnect_time, Options, 30000), - MaxPendingMsg = get_value(max_pending_messages, Options, 10000), - Mountpoint = format_mountpoint(get_value(mountpoint, Options)), - StoreType = get_value(store_type, Options, memory), - Type = get_value(type, Options, in), - Queue = [], - {ok, #state{type = Type, - mountpoint = Mountpoint, - queue = Queue, - store_type = StoreType, - options = Options, - reconnect_count = ReconnectCount, - reconnect_time = ReconnectTime, - def_reconnect_count = ReconnectCount, - max_pending_messages = MaxPendingMsg}}. - -handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> - {noreply, NewState} = handle_info(start, State), - {reply, <<"start bridge successfully">>, NewState}; - -handle_call(start_bridge, _From, State) -> - {reply, <<"bridge already started">>, State}; - -handle_call(stop_bridge, _From, State = #state{client_pid = undefined}) -> - {reply, <<"bridge not started">>, State}; - -handle_call(stop_bridge, _From, State = #state{client_pid = Pid}) -> - emqx_client:disconnect(Pid), - {reply, <<"stop bridge successfully">>, State}; - -handle_call(status, _From, State = #state{client_pid = undefined}) -> - {reply, <<"Stopped">>, State}; -handle_call(status, _From, State = #state{client_pid = _Pid})-> - {reply, <<"Running">>, State}; - -handle_call(Req, _From, State) -> - emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info(start, State = #state{reconnect_count = 0}) -> - {noreply, State}; - -%%---------------------------------------------------------------- -%% start in message bridge -%%---------------------------------------------------------------- -handle_info(start, State = #state{options = Options, - client_pid = undefined, - reconnect_time = ReconnectTime, - reconnect_count = ReconnectCount, - type = in}) -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid, _} -> - Subs = get_value(subscriptions, Options, []), - [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], - {noreply, State#state{client_pid = ClientPid}}; - {error,_} -> - erlang:send_after(ReconnectTime, self(), start), - {noreply, State = #state{reconnect_count = ReconnectCount-1}} - end; - -%%---------------------------------------------------------------- -%% start out message bridge -%%---------------------------------------------------------------- -handle_info(start, State = #state{options = Options, - client_pid = undefined, - reconnect_time = ReconnectTime, - reconnect_count = ReconnectCount, - type = out}) -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid, _} -> - Subs = get_value(subscriptions, Options, []), - [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], - ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), - [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})], - {noreply, State#state{client_pid = ClientPid}}; - {error,_} -> - erlang:send_after(ReconnectTime, self(), start), - {noreply, State = #state{reconnect_count = ReconnectCount-1}} - end; - -%%---------------------------------------------------------------- -%% received local node message -%%---------------------------------------------------------------- -handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue, - store_type = StoreType, max_pending_messages = MaxPendingMsg}) -> - Msg = #mqtt_msg{qos = 1, - retain = Retain, - topic = mountpoint(Mountpoint, Topic), - payload = Payload}, - case emqx_client:publish(Pid, Msg) of - {ok, PkgId} -> - {noreply, State#state{queue = store(StoreType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; - {error, Reason} -> - emqx_logger:error("Publish fail:~p", [Reason]), - {noreply, State} - end; - -%%---------------------------------------------------------------- -%% received remote node message -%%---------------------------------------------------------------- -handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic, - properties := Props, payload := Payload}}, State) -> - NewMsg0 = emqx_message:make(bridge, QoS, Topic, Payload), - NewMsg1 = emqx_message:set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain=> Retain}, NewMsg0)), - emqx_broker:publish(NewMsg1), - {noreply, State}; - -%%---------------------------------------------------------------- -%% received remote puback message -%%---------------------------------------------------------------- -handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, store_type = StoreType}) -> - % lists:keydelete(PkgId, 1, Queue) - {noreply, State#state{queue = delete(StoreType, PkgId, Queue)}}; - -handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> - {noreply, State#state{client_pid = undefined}}; - -handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, - reconnect_time = ReconnectTime, - def_reconnect_count = DefReconnectCount}) -> - lager:warning("emqx bridge stop reason:~p", [Reason]), - erlang:send_after(ReconnectTime, self(), start), - {noreply, State#state{client_pid = undefined, reconnect_count = DefReconnectCount}}; - -handle_info(Info, State) -> - emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, #state{}) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -proto_ver(mqtt3) -> v3; -proto_ver(mqtt4) -> v4; -proto_ver(mqtt5) -> v5. -address(Address) -> - case string:tokens(Address, ":") of - [Host] -> {Host, 1883}; - [Host, Port] -> {Host, list_to_integer(Port)} - end. -options(Options) -> - options(Options, []). -options([], Acc) -> - Acc; -options([{username, Username}| Options], Acc) -> - options(Options, [{username, Username}|Acc]); -options([{proto_ver, ProtoVer}| Options], Acc) -> - options(Options, [{proto_ver, proto_ver(ProtoVer)}|Acc]); -options([{password, Password}| Options], Acc) -> - options(Options, [{password, Password}|Acc]); -options([{keepalive, Keepalive}| Options], Acc) -> - options(Options, [{keepalive, Keepalive}|Acc]); -options([{client_id, ClientId}| Options], Acc) -> - options(Options, [{client_id, ClientId}|Acc]); -options([{clean_start, CleanStart}| Options], Acc) -> - options(Options, [{clean_start, CleanStart}|Acc]); -options([{address, Address}| Options], Acc) -> - {Host, Port} = address(Address), - options(Options, [{host, Host}, {port, Port}|Acc]); -options([_Option | Options], Acc) -> - options(Options, Acc). - -name(Id) -> - list_to_atom(lists:concat([?MODULE, "_", Id])). - -i2b(L) -> iolist_to_binary(L). - -mountpoint(undefined, Topic) -> - Topic; -mountpoint(Prefix, Topic) -> - <>. - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - -store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> - [Data | Queue]; -store(memory, _Data, Queue, _MaxPendingMsg) -> - lager:error("Beyond max pending messages"), - Queue; -store(disk, Data, Queue, _MaxPendingMsg)-> - [Data | Queue]. - -delete(memory, PkgId, Queue) -> - lists:keydelete(PkgId, 1, Queue); -delete(disk, PkgId, Queue) -> - lists:keydelete(PkgId, 1, Queue). \ No newline at end of file diff --git a/src/emqx_bridge1_sup.erl b/src/emqx_bridge1_sup.erl deleted file mode 100644 index f4e8c8f01..000000000 --- a/src/emqx_bridge1_sup.erl +++ /dev/null @@ -1,45 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_bridge1_sup). - --behavior(supervisor). - --include("emqx.hrl"). - --export([start_link/0, bridges/0]). - -%% Supervisor callbacks --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% @doc List all bridges --spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). -bridges() -> - [{Name, emqx_bridge1:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. - -init([]) -> - BridgesOpts = emqx_config:get_env(bridges, []), - Bridges = [spec(Opts)|| Opts <- BridgesOpts], - {ok, {{one_for_one, 10, 100}, Bridges}}. - -spec({Id, Options})-> - #{id => Id, - start => {emqx_bridge1, start_link, [Id, Options]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_bridge1]}. diff --git a/src/emqx_bridge_sup.erl b/src/emqx_bridge_sup.erl index 1735e3b99..0d8a0d887 100644 --- a/src/emqx_bridge_sup.erl +++ b/src/emqx_bridge_sup.erl @@ -14,13 +14,32 @@ -module(emqx_bridge_sup). +-behavior(supervisor). + -include("emqx.hrl"). --export([start_link/3]). +-export([start_link/0, bridges/0]). --spec(start_link(node(), emqx_topic:topic(), [emqx_bridge:option()]) - -> {ok, pid()} | {error, term()}). -start_link(Node, Topic, Options) -> - MFA = {emqx_bridge, start_link, [Node, Topic, Options]}, - emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA). +%% Supervisor callbacks +-export([init/1]). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% @doc List all bridges +-spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). +bridges() -> + [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. + +init([]) -> + BridgesOpts = emqx_config:get_env(bridges, []), + Bridges = [spec(Opts)|| Opts <- BridgesOpts], + {ok, {{one_for_one, 10, 100}, Bridges}}. + +spec({Id, Options})-> + #{id => Id, + start => {emqx_bridge, start_link, [Id, Options]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [emqx_bridge]}. diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl new file mode 100644 index 000000000..9ed8fdbac --- /dev/null +++ b/src/emqx_local_bridge.erl @@ -0,0 +1,162 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_local_bridge). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). + +-export([start_link/5]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(PING_DOWN_INTERVAL, 1000). + +-record(state, {pool, id, + node, subtopic, + qos = ?QOS_0, + topic_suffix = <<>>, + topic_prefix = <<>>, + mqueue :: emqx_mqueue:mqueue(), + max_queue_len = 10000, + ping_down_interval = ?PING_DOWN_INTERVAL, + status = up}). + +-type(option() :: {qos, emqx_mqtt_types:qos()} | + {topic_suffix, binary()} | + {topic_prefix, binary()} | + {max_queue_len, pos_integer()} | + {ping_down_interval, pos_integer()}). + +-export_type([option/0]). + +%% @doc Start a bridge +-spec(start_link(term(), pos_integer(), atom(), binary(), [option()]) + -> {ok, pid()} | ignore | {error, term()}). +start_link(Pool, Id, Node, Topic, Options) -> + gen_server:start_link(?MODULE, [Pool, Id, Node, Topic, Options], [{hibernate_after, 5000}]). + +%%------------------------------------------------------------------------------ +%% gen_server callbacks +%%------------------------------------------------------------------------------ + +init([Pool, Id, Node, Topic, Options]) -> + process_flag(trap_exit, true), + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + case net_kernel:connect_node(Node) of + true -> + true = erlang:monitor_node(Node, true), + Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), + emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), + State = parse_opts(Options, #state{node = Node, subtopic = Topic}), + %%TODO: queue.... + MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]), + {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; + false -> + {stop, {cannot_connect_node, Node}} + end. + +parse_opts([], State) -> + State; +parse_opts([{qos, QoS} | Opts], State) -> + parse_opts(Opts, State#state{qos = QoS}); +parse_opts([{topic_suffix, Suffix} | Opts], State) -> + parse_opts(Opts, State#state{topic_suffix= Suffix}); +parse_opts([{topic_prefix, Prefix} | Opts], State) -> + parse_opts(Opts, State#state{topic_prefix = Prefix}); +parse_opts([{max_queue_len, Len} | Opts], State) -> + parse_opts(Opts, State#state{max_queue_len = Len}); +parse_opts([{ping_down_interval, Interval} | Opts], State) -> + parse_opts(Opts, State#state{ping_down_interval = Interval}); +parse_opts([_Opt | Opts], State) -> + parse_opts(Opts, State). + +qname(Node, Topic) when is_atom(Node) -> + qname(atom_to_list(Node), Topic); +qname(Node, Topic) -> + iolist_to_binary(["Bridge:", Node, ":", Topic]). + +handle_call(Req, _From, State) -> + emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + emqx_logger:error("[Bridge] unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) -> + %% TODO: how to drop??? + {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; + +handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> + ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), + {noreply, State}; + +handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> + emqx_logger:warning("[Bridge] node down: ~s", [Node]), + erlang:send_after(Interval, self(), ping_down_node), + {noreply, State#state{status = down}, hibernate}; + +handle_info({nodeup, Node}, State = #state{node = Node}) -> + %% TODO: Really fast?? + case emqx:is_running(Node) of + true -> emqx_logger:warning("[Bridge] Node up: ~s", [Node]), + {noreply, dequeue(State#state{status = up})}; + false -> self() ! {nodedown, Node}, + {noreply, State#state{status = down}} + end; + +handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) -> + Self = self(), + spawn_link(fun() -> + case net_kernel:connect_node(Node) of + true -> Self ! {nodeup, Node}; + false -> erlang:send_after(Interval, Self, ping_down_node) + end + end), + {noreply, State}; + +handle_info({'EXIT', _Pid, normal}, State) -> + {noreply, State}; + +handle_info(Info, State) -> + emqx_logger:error("[Bridge] unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #state{pool = Pool, id = Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +dequeue(State = #state{mqueue = MQ}) -> + case emqx_mqueue:out(MQ) of + {empty, MQ1} -> + State#state{mqueue = MQ1}; + {{value, Msg}, MQ1} -> + handle_info({dispatch, Msg#message.topic, Msg}, State), + dequeue(State#state{mqueue = MQ1}) + end. + +transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, + topic_suffix = Suffix}) -> + Msg#message{topic = <>}. + diff --git a/src/emqx_local_bridge_sup.erl b/src/emqx_local_bridge_sup.erl new file mode 100644 index 000000000..fed9f28a7 --- /dev/null +++ b/src/emqx_local_bridge_sup.erl @@ -0,0 +1,26 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_local_bridge_sup). + +-include("emqx.hrl"). + +-export([start_link/3]). + +-spec(start_link(node(), emqx_topic:topic(), [emqx_local_bridge:option()]) + -> {ok, pid()} | {error, term()}). +start_link(Node, Topic, Options) -> + MFA = {emqx_local_bridge, start_link, [Node, Topic, Options]}, + emqx_pool_sup:start_link({bridge, Node, Topic}, random, MFA). + diff --git a/src/emqx_bridge_sup_sup.erl b/src/emqx_local_bridge_sup_sup.erl similarity index 94% rename from src/emqx_bridge_sup_sup.erl rename to src/emqx_local_bridge_sup_sup.erl index 2ef05df8c..0483552b2 100644 --- a/src/emqx_bridge_sup_sup.erl +++ b/src/emqx_local_bridge_sup_sup.erl @@ -12,7 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. --module(emqx_bridge_sup_sup). +-module(emqx_local_bridge_sup_sup). -behavior(supervisor). @@ -66,9 +66,9 @@ init([]) -> bridge_spec(Node, Topic, Options) -> #{id => ?CHILD_ID(Node, Topic), - start => {emqx_bridge_sup, start_link, [Node, Topic, Options]}, + start => {emqx_local_bridge_sup, start_link, [Node, Topic, Options]}, restart => permanent, shutdown => infinity, type => supervisor, - modules => [emqx_bridge_sup]}. + modules => [emqx_local_bridge_sup]}. diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index cddfea8b5..2f29dbfee 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -62,8 +62,9 @@ init([]) -> %% Broker Sup BrokerSup = supervisor_spec(emqx_broker_sup), %% BridgeSup - BridgeSup = supervisor_spec(emqx_bridge_sup_sup), - BridgeSup1 = supervisor_spec(emqx_bridge1_sup), + LocalBridgeSup = supervisor_spec(emqx_local_bridge_sup_sup), + + BridgeSup = supervisor_spec(emqx_bridge_sup), %% AccessControl AccessControl = worker_spec(emqx_access_control), %% Session Manager @@ -78,8 +79,8 @@ init([]) -> [KernelSup, RouterSup, BrokerSup, + LocalBridgeSup, BridgeSup, - BridgeSup1, AccessControl, SMSup, SessionSup,