diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 3387a6439..c3368a0c7 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1846,7 +1846,7 @@ listener.ws.external.check_origins = "http://localhost:18083, http://127.0.0.1:1 ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol -## listener.wss.$name is the IP address and port that the MQTT/WebSocket/SSL +## listener.wss.$name.endpoint is the IP address and port that the MQTT/WebSocket/SSL ## listener will bind. ## ## Value: IP:Port | Port @@ -2160,6 +2160,147 @@ listener.wss.external.allow_origin_absence = true ## Value: http://url eg. https://localhost:8084, https://127.0.0.1:8084 listener.wss.external.check_origins = "https://localhost:8084, https://127.0.0.1:8084" + +##-------------------------------------------------------------------- +## External QUIC listener for MQTT Protocol + +## listener.quic.$name.endpoint is the IP address and port that the MQTT/QUIC +## listener will bind. +## +## Value: IP:Port | Port +## +## Examples: 14567, 127.0.0.1:14567, ::1:14567 +listener.quic.external.endpoint = 14567 + +## The acceptor pool for external MQTT/QUIC listener. +## +## Value: Number +listener.quic.external.acceptors = 4 + +## Maximum number of concurrent MQTT/Webwocket/SSL connections. +## +## Value: Number +listener.quic.external.max_connections = 16 + +## Maximum MQTT/QUIC connections per second. +## +## See: listener.tcp.$name.max_conn_rate +## +## Value: Number +listener.quic.external.max_conn_rate = 1000 + +## Simulate the {active, N} option for the MQTT/QUIC connections. +## @todo +## Value: Number +## listener.quic.external.active_n = 100 + +## Zone of the external MQTT/QUIC listener belonged to. +## +## Value: String +listener.quic.external.zone = external + +## Path to the file containing the user's private PEM-encoded key. +## +## See: listener.ssl.$name.keyfile +## +## Value: File +listener.quic.external.keyfile = "{{ platform_etc_dir }}/certs/key.pem" + +## Path to a file containing the user certificate. +## +## See: listener.ssl.$name.certfile +## +## Value: File +listener.quic.external.certfile = "{{ platform_etc_dir }}/certs/cert.pem" + +## Path to the file containing PEM-encoded CA certificates. +## @todo +## See: listener.ssl.$name.cacert +## +## Value: File +## listener.quic.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## String containing the user's password. Only used if the private keyfile +## is password-protected. +## @todo +## See: listener.ssl.$name.key_password +## +## Value: String +## listener.quic.external.key_password = yourpass + +## See: listener.ssl.$name.verify +## @todo +## Value: verify_peer | verify_none +## listener.quic.external.verify = verify_peer + +## See: listener.ssl.$name.fail_if_no_peer_cert +## @todo +## Value: false | true +## listener.quic.external.fail_if_no_peer_cert = true + +## See: listener.ssl.$name.ciphers +## @todo +## Value: Ciphers +listener.quic.external.ciphers = "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256" + +## Ciphers for TLS PSK. +## @todo +## Note that 'listener.quic.external.ciphers' and 'listener.quic.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +## listener.quic.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA + +## See: listener.ssl.$name.honor_cipher_order +## @todo +## Value: on | off +## listener.quic.external.honor_cipher_order = on + +## The send timeout for the QUIC stream. +## @todo +## +## Value: Duration +# listener.quic.external.send_timeout = 15s + +## Close the QUIC connection if send timeout. +## @todo +## See: listener.tcp.$name.send_timeout_close +## +## Value: on | off +## listener.quic.external.send_timeout_close = on + +## The receive buffer for the QUIC connections. +## @todo +## See: listener.tcp.$name.recbuf +## +## Value: Bytes +## listener.quic.external.recbuf = 4KB + +## The TCP send buffer(os kernel) for the QUIC connections. +## @todo +## See: listener.tcp.$name.sndbuf +## +## Value: Bytes +## listener.quic.external.sndbuf = 4KB + +## The size of the user-level software buffer used by the driver. +## @todo +## See: listener.tcp.$name.buffer +## +## Value: Bytes +## listener.quic.external.buffer = 4KB + +## The idle timeout for external QUIC connections. +## @todo +## See: listener.quic.$name.idle_timeout +## +## Value: Duration +## listener.quic.external.idle_timeout = 60s + +## The max frame size for external QUIC connections. +## @todo +## Value: Number +## listener.quic.external.max_frame_size = 0 + ## CONFIG_SECTION_END=listeners ================================================ ## CONFIG_SECTION_BGN=modules ================================================== diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 22f31a345..83129bc1a 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -20,6 +20,7 @@ , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} + , {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.3"}}} ]}. {plugins, [rebar3_proper]}. diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index e909702ae..a6984370e 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -4,7 +4,7 @@ {vsn, "5.0.0"}, % strict semver, bump manually! {modules, []}, {registered, []}, - {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, + {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,quicer]}, {mod, {emqx_app,[]}}, {env, []}, {licenses, ["Apache-2.0"]}, diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 9c42e96a3..c700239ca 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -237,10 +237,10 @@ reboot() -> -ifdef(EMQX_ENTERPRISE). default_started_applications() -> - [gproc, esockd, ranch, cowboy, ekka, emqx]. + [gproc, esockd, ranch, cowboy, ekka, quicer, emqx]. -else. default_started_applications() -> - [gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules]. + [gproc, esockd, ranch, cowboy, ekka, quicer, emqx, emqx_modules]. -endif. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ab91c02b4..8e3ee400b 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -416,6 +416,13 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> ok = emqx_metrics:inc('bytes.received', Oct), parse_incoming(Data, State); +handle_msg({quic, Data, _Sock, _, _, _}, State) -> + ?LOG(debug, "RECV ~0p", [Data]), + Oct = iolist_size(Data), + inc_counter(incoming_bytes, Oct), + ok = emqx_metrics:inc('bytes.received', Oct), + parse_incoming(Data, State); + handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> ok = emqx_misc:cancel_timer(IdleTimer), @@ -731,6 +738,9 @@ handle_info({sock_error, Reason}, State) -> end, handle_info({sock_closed, Reason}, close_socket(State)); +handle_info({quic, closed, _Channel, ReasonFlag}, State) -> + handle_info({sock_closed, ReasonFlag}, State); + handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 1f3d1776b..c7d42e2e4 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -139,7 +139,27 @@ start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, - ranch_opts(Options), ws_opts(Options)). + ranch_opts(Options), ws_opts(Options)); + +%% Start MQTT/QUIC listener +start_listener(quic, ListenOn, Options) -> + %% @fixme unsure why we need reopen lib and reopen config. + quicer_nif:open_lib(), + quicer_nif:reg_open(), + SSLOpts = proplists:get_value(ssl_options, Options), + DefAcceptors = erlang:system_info(schedulers_online) * 8, + ListenOpts = [ {cert, proplists:get_value(certfile, SSLOpts)} + , {key, proplists:get_value(keyfile, SSLOpts)} + , {alpn, ["mqtt"]} + , {conn_acceptors, proplists:get_value(acceptors, Options, DefAcceptors)} + , {idle_timeout_ms, proplists:get_value(idle_timeout, Options, 60000)} + ], + ConnectionOpts = [ {conn_callback, emqx_quic_connection} + , {peer_unidi_stream_count, 1} + , {peer_bidi_stream_count, 10} + ], + StreamOpts = [], + quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}). replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)]. @@ -238,6 +258,8 @@ stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws -> cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)); stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss -> cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)); +stop_listener(quic, _ListenOn, _Opts) -> + quicer:stop_listener('mqtt:quic'); stop_listener(Proto, ListenOn, _Opts) -> esockd:close(Proto, ListenOn). diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl new file mode 100644 index 000000000..b83522c6e --- /dev/null +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_quic_connection). + +%% Callbacks +-export([ new_conn/2 + ]). + +new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) -> + new_conn(Conn, maps:to_list(COpts)); +new_conn(Conn, COpts) -> + emqx_connection:start_link(emqx_quic_stream, Conn, COpts). diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl new file mode 100644 index 000000000..e5cd4c3fc --- /dev/null +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -0,0 +1,87 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% MQTT/QUIC Stream +-module(emqx_quic_stream). + +%% emqx transport Callbacks +-export([ type/1 + , wait/1 + , getstat/2 + , fast_close/1 + , ensure_ok_or_exit/2 + , async_send/3 + , setopts/2 + , getopts/2 + , peername/1 + , sockname/1 + , peercert/1 + ]). + +wait(Conn) -> + quicer:accept_stream(Conn, []). + +type(_) -> + quic. + +peername(S) -> + quicer:peername(S). + +sockname(S) -> + quicer:sockname(S). + +peercert(_S) -> + nossl. + +getstat(Socket, Stats) -> + case quicer:getstat(Socket, Stats) of + {error, _} -> {error, closed}; + Res -> Res + end. + +setopts(_Socket, _Opts) -> + ok. + +getopts(_Socket, _Opts) -> + %% @todo + {ok, [{high_watermark, 0}, + {high_msgq_watermark, 0}, + {sndbuf, 0}, + {recbuf, 0}, + {buffer,80000}]}. + +fast_close(Stream) -> + quicer:close_stream(Stream), + %% Stream might be closed already. + ok. + +-spec(ensure_ok_or_exit(atom(), list(term())) -> term()). +ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) -> + case erlang:apply(?MODULE, Fun, Args) of + {error, Reason} when Reason =:= enotconn; Reason =:= closed -> + fast_close(Sock), + exit(normal); + {error, Reason} -> + fast_close(Sock), + exit({shutdown, Reason}); + Result -> Result + end. + +async_send(Stream, Data, Options) when is_list(Data) -> + async_send(Stream, iolist_to_binary(Data), Options); +async_send(Stream, Data, _Options) when is_binary(Data) -> + {ok, _Len} = quicer:send(Stream, Data), + ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 83e611ee0..11ffe664a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -306,6 +306,7 @@ fields("listener") -> , {"ssl", ref("ssl_listener")} , {"ws", ref("ws_listener")} , {"wss", ref("wss_listener")} + , {"quic", ref("quic_listener")} ]; fields("tcp_listener") -> @@ -320,6 +321,9 @@ fields("ws_listener") -> fields("wss_listener") -> [ {"$name", ref("wss_listener_settings")}]; +fields("quic_listener") -> + [ {"$name", ref("quic_listener_settings")}]; + fields("listener_settings") -> [ {"endpoint", t(union(ip_port(), integer()))} , {"acceptors", t(integer(), undefined, 8)} @@ -380,6 +384,32 @@ fields("wss_listener_settings") -> Settings = lists:ukeymerge(1, Ssl, fields("ws_listener_settings")), lists:keydelete("high_watermark", 1, Settings); +fields("quic_listener_settings") -> + Unsupported = [ "active_n" + , "access" + , "proxy_protocol" + , "proxy_protocol_timeout" + , "backlog" + , "send_timeout" + , "send_timeout_close" + , "recvbuf" + , "sndbuf" + , "buffer" + , "high_watermark" + , "tune_buffer" + , "nodelay" + , "reuseaddr" + ], + lists:foldl(fun(K, Acc) -> + lists:keydelete(K, 1, Acc) + end, + [ {"certfile", t(string(), undefined, undefined)} + , {"keyfile", t(string(), undefined, undefined)} + , {"ciphers", t(comma_separated_list(), undefined, "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256")} + , {"idle_timeout", t(duration(), undefined, 60000)} + | fields("listener_settings")], + Unsupported); + fields("access") -> [ {"$id", t(string(), undefined, undefined)}]; @@ -792,7 +822,9 @@ tr_listeners(Conf) -> lists:flatten([TcpListeners("tcp", Name) || Name <- keys("listener.tcp", Conf)] ++ [TcpListeners("ws", Name) || Name <- keys("listener.ws", Conf)] ++ [SslListeners("ssl", Name) || Name <- keys("listener.ssl", Conf)] - ++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]). + ++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)] + ++ [SslListeners("quic", Name) || Name <- keys("listener.quic", Conf)] + ). tr_modules(Conf) -> Subscriptions = fun() -> diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index 53f388dfa..41b9126b0 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -28,6 +28,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> NewConfig = generate_config(), application:ensure_all_started(esockd), + application:ensure_all_started(quicer), application:ensure_all_started(cowboy), lists:foreach(fun set_app_env/1, NewConfig), Config. diff --git a/rebar.config b/rebar.config index 4c086c312..205d892aa 100644 --- a/rebar.config +++ b/rebar.config @@ -54,6 +54,7 @@ , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}} + , {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.3"}}} ]}. {xref_ignores, diff --git a/rebar.config.erl b/rebar.config.erl index 14a8f93bc..f28b43856 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -243,6 +243,7 @@ relx_apps(ReleaseType) -> , compiler , runtime_tools , cuttlefish + , quicer , emqx , {mnesia, load} , {ekka, load}