Merge pull request #5002 from qzhuyan/dev/william/quic-support-centos7

Quic support part 1
This commit is contained in:
William Yang 2021-06-28 10:49:44 +02:00 committed by GitHub
commit 5571c54607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 328 additions and 6 deletions

View File

@ -1846,7 +1846,7 @@ listener.ws.external.check_origins = "http://localhost:18083, http://127.0.0.1:1
##--------------------------------------------------------------------
## External WebSocket/SSL listener for MQTT Protocol
## listener.wss.$name is the IP address and port that the MQTT/WebSocket/SSL
## listener.wss.$name.endpoint is the IP address and port that the MQTT/WebSocket/SSL
## listener will bind.
##
## Value: IP:Port | Port
@ -2160,6 +2160,147 @@ listener.wss.external.allow_origin_absence = true
## Value: http://url eg. https://localhost:8084, https://127.0.0.1:8084
listener.wss.external.check_origins = "https://localhost:8084, https://127.0.0.1:8084"
##--------------------------------------------------------------------
## External QUIC listener for MQTT Protocol
## listener.quic.$name.endpoint is the IP address and port that the MQTT/QUIC
## listener will bind.
##
## Value: IP:Port | Port
##
## Examples: 14567, 127.0.0.1:14567, ::1:14567
listener.quic.external.endpoint = 14567
## The acceptor pool for external MQTT/QUIC listener.
##
## Value: Number
listener.quic.external.acceptors = 4
## Maximum number of concurrent MQTT/Webwocket/SSL connections.
##
## Value: Number
listener.quic.external.max_connections = 16
## Maximum MQTT/QUIC connections per second.
##
## See: listener.tcp.$name.max_conn_rate
##
## Value: Number
listener.quic.external.max_conn_rate = 1000
## Simulate the {active, N} option for the MQTT/QUIC connections.
## @todo
## Value: Number
## listener.quic.external.active_n = 100
## Zone of the external MQTT/QUIC listener belonged to.
##
## Value: String
listener.quic.external.zone = external
## Path to the file containing the user's private PEM-encoded key.
##
## See: listener.ssl.$name.keyfile
##
## Value: File
listener.quic.external.keyfile = "{{ platform_etc_dir }}/certs/key.pem"
## Path to a file containing the user certificate.
##
## See: listener.ssl.$name.certfile
##
## Value: File
listener.quic.external.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
## Path to the file containing PEM-encoded CA certificates.
## @todo
## See: listener.ssl.$name.cacert
##
## Value: File
## listener.quic.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## String containing the user's password. Only used if the private keyfile
## is password-protected.
## @todo
## See: listener.ssl.$name.key_password
##
## Value: String
## listener.quic.external.key_password = yourpass
## See: listener.ssl.$name.verify
## @todo
## Value: verify_peer | verify_none
## listener.quic.external.verify = verify_peer
## See: listener.ssl.$name.fail_if_no_peer_cert
## @todo
## Value: false | true
## listener.quic.external.fail_if_no_peer_cert = true
## See: listener.ssl.$name.ciphers
## @todo
## Value: Ciphers
listener.quic.external.ciphers = "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256"
## Ciphers for TLS PSK.
## @todo
## Note that 'listener.quic.external.ciphers' and 'listener.quic.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## listener.quic.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## See: listener.ssl.$name.honor_cipher_order
## @todo
## Value: on | off
## listener.quic.external.honor_cipher_order = on
## The send timeout for the QUIC stream.
## @todo
##
## Value: Duration
# listener.quic.external.send_timeout = 15s
## Close the QUIC connection if send timeout.
## @todo
## See: listener.tcp.$name.send_timeout_close
##
## Value: on | off
## listener.quic.external.send_timeout_close = on
## The receive buffer for the QUIC connections.
## @todo
## See: listener.tcp.$name.recbuf
##
## Value: Bytes
## listener.quic.external.recbuf = 4KB
## The TCP send buffer(os kernel) for the QUIC connections.
## @todo
## See: listener.tcp.$name.sndbuf
##
## Value: Bytes
## listener.quic.external.sndbuf = 4KB
## The size of the user-level software buffer used by the driver.
## @todo
## See: listener.tcp.$name.buffer
##
## Value: Bytes
## listener.quic.external.buffer = 4KB
## The idle timeout for external QUIC connections.
## @todo
## See: listener.quic.$name.idle_timeout
##
## Value: Duration
## listener.quic.external.idle_timeout = 60s
## The max frame size for external QUIC connections.
## @todo
## Value: Number
## listener.quic.external.max_frame_size = 0
## CONFIG_SECTION_END=listeners ================================================
## CONFIG_SECTION_BGN=modules ==================================================

View File

@ -20,6 +20,7 @@
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
, {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.3"}}}
]}.
{plugins, [rebar3_proper]}.

View File

@ -4,7 +4,7 @@
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,quicer]},
{mod, {emqx_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -237,10 +237,10 @@ reboot() ->
-ifdef(EMQX_ENTERPRISE).
default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, emqx].
[gproc, esockd, ranch, cowboy, ekka, quicer, emqx].
-else.
default_started_applications() ->
[gproc, esockd, ranch, cowboy, ekka, emqx, emqx_modules].
[gproc, esockd, ranch, cowboy, ekka, quicer, emqx, emqx_modules].
-endif.
%%--------------------------------------------------------------------

View File

@ -416,6 +416,13 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
ok = emqx_metrics:inc('bytes.received', Oct),
parse_incoming(Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
parse_incoming(Data, State);
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}) ->
ok = emqx_misc:cancel_timer(IdleTimer),
@ -731,6 +738,9 @@ handle_info({sock_error, Reason}, State) ->
end,
handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
handle_info({sock_closed, ReasonFlag}, State);
handle_info(Info, State) ->
with_channel(handle_info, [Info], State).

View File

@ -139,7 +139,27 @@ start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
%% Start MQTT/WSS listener
start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn,
ranch_opts(Options), ws_opts(Options)).
ranch_opts(Options), ws_opts(Options));
%% Start MQTT/QUIC listener
start_listener(quic, ListenOn, Options) ->
%% @fixme unsure why we need reopen lib and reopen config.
quicer_nif:open_lib(),
quicer_nif:reg_open(),
SSLOpts = proplists:get_value(ssl_options, Options),
DefAcceptors = erlang:system_info(schedulers_online) * 8,
ListenOpts = [ {cert, proplists:get_value(certfile, SSLOpts)}
, {key, proplists:get_value(keyfile, SSLOpts)}
, {alpn, ["mqtt"]}
, {conn_acceptors, proplists:get_value(acceptors, Options, DefAcceptors)}
, {idle_timeout_ms, proplists:get_value(idle_timeout, Options, 60000)}
],
ConnectionOpts = [ {conn_callback, emqx_quic_connection}
, {peer_unidi_stream_count, 1}
, {peer_bidi_stream_count, 10}
],
StreamOpts = [],
quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}).
replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)].
@ -238,6 +258,8 @@ stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws ->
cowboy:stop_listener(ws_name('mqtt:ws', ListenOn));
stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss ->
cowboy:stop_listener(ws_name('mqtt:wss', ListenOn));
stop_listener(quic, _ListenOn, _Opts) ->
quicer:stop_listener('mqtt:quic');
stop_listener(Proto, ListenOn, _Opts) ->
esockd:close(Proto, ListenOn).

View File

@ -0,0 +1,26 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_quic_connection).
%% Callbacks
-export([ new_conn/2
]).
new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) ->
new_conn(Conn, maps:to_list(COpts));
new_conn(Conn, COpts) ->
emqx_connection:start_link(emqx_quic_stream, Conn, COpts).

View File

@ -0,0 +1,87 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT/QUIC Stream
-module(emqx_quic_stream).
%% emqx transport Callbacks
-export([ type/1
, wait/1
, getstat/2
, fast_close/1
, ensure_ok_or_exit/2
, async_send/3
, setopts/2
, getopts/2
, peername/1
, sockname/1
, peercert/1
]).
wait(Conn) ->
quicer:accept_stream(Conn, []).
type(_) ->
quic.
peername(S) ->
quicer:peername(S).
sockname(S) ->
quicer:sockname(S).
peercert(_S) ->
nossl.
getstat(Socket, Stats) ->
case quicer:getstat(Socket, Stats) of
{error, _} -> {error, closed};
Res -> Res
end.
setopts(_Socket, _Opts) ->
ok.
getopts(_Socket, _Opts) ->
%% @todo
{ok, [{high_watermark, 0},
{high_msgq_watermark, 0},
{sndbuf, 0},
{recbuf, 0},
{buffer,80000}]}.
fast_close(Stream) ->
quicer:close_stream(Stream),
%% Stream might be closed already.
ok.
-spec(ensure_ok_or_exit(atom(), list(term())) -> term()).
ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) ->
case erlang:apply(?MODULE, Fun, Args) of
{error, Reason} when Reason =:= enotconn; Reason =:= closed ->
fast_close(Sock),
exit(normal);
{error, Reason} ->
fast_close(Sock),
exit({shutdown, Reason});
Result -> Result
end.
async_send(Stream, Data, Options) when is_list(Data) ->
async_send(Stream, iolist_to_binary(Data), Options);
async_send(Stream, Data, _Options) when is_binary(Data) ->
{ok, _Len} = quicer:send(Stream, Data),
ok.

View File

@ -306,6 +306,7 @@ fields("listener") ->
, {"ssl", ref("ssl_listener")}
, {"ws", ref("ws_listener")}
, {"wss", ref("wss_listener")}
, {"quic", ref("quic_listener")}
];
fields("tcp_listener") ->
@ -320,6 +321,9 @@ fields("ws_listener") ->
fields("wss_listener") ->
[ {"$name", ref("wss_listener_settings")}];
fields("quic_listener") ->
[ {"$name", ref("quic_listener_settings")}];
fields("listener_settings") ->
[ {"endpoint", t(union(ip_port(), integer()))}
, {"acceptors", t(integer(), undefined, 8)}
@ -380,6 +384,32 @@ fields("wss_listener_settings") ->
Settings = lists:ukeymerge(1, Ssl, fields("ws_listener_settings")),
lists:keydelete("high_watermark", 1, Settings);
fields("quic_listener_settings") ->
Unsupported = [ "active_n"
, "access"
, "proxy_protocol"
, "proxy_protocol_timeout"
, "backlog"
, "send_timeout"
, "send_timeout_close"
, "recvbuf"
, "sndbuf"
, "buffer"
, "high_watermark"
, "tune_buffer"
, "nodelay"
, "reuseaddr"
],
lists:foldl(fun(K, Acc) ->
lists:keydelete(K, 1, Acc)
end,
[ {"certfile", t(string(), undefined, undefined)}
, {"keyfile", t(string(), undefined, undefined)}
, {"ciphers", t(comma_separated_list(), undefined, "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256")}
, {"idle_timeout", t(duration(), undefined, 60000)}
| fields("listener_settings")],
Unsupported);
fields("access") ->
[ {"$id", t(string(), undefined, undefined)}];
@ -792,7 +822,9 @@ tr_listeners(Conf) ->
lists:flatten([TcpListeners("tcp", Name) || Name <- keys("listener.tcp", Conf)]
++ [TcpListeners("ws", Name) || Name <- keys("listener.ws", Conf)]
++ [SslListeners("ssl", Name) || Name <- keys("listener.ssl", Conf)]
++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]).
++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]
++ [SslListeners("quic", Name) || Name <- keys("listener.quic", Conf)]
).
tr_modules(Conf) ->
Subscriptions = fun() ->

View File

@ -28,6 +28,7 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
NewConfig = generate_config(),
application:ensure_all_started(esockd),
application:ensure_all_started(quicer),
application:ensure_all_started(cowboy),
lists:foreach(fun set_app_env/1, NewConfig),
Config.

View File

@ -54,6 +54,7 @@
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}}
, {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.3"}}}
]}.
{xref_ignores,

View File

@ -243,6 +243,7 @@ relx_apps(ReleaseType) ->
, compiler
, runtime_tools
, cuttlefish
, quicer
, emqx
, {mnesia, load}
, {ekka, load}