From 70f22d2c1b980e885f96354fea35dae57fbd75ee Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 30 Mar 2021 23:39:09 +0200 Subject: [PATCH] feat(quic): reuse emqx_connection module for quic. --- apps/emqx/src/emqx_connection.erl | 7 +++ apps/emqx/src/emqx_listeners.erl | 4 +- apps/emqx/src/emqx_quic_connection.erl | 26 ++++++++ apps/emqx/src/emqx_quic_stream.erl | 83 ++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 apps/emqx/src/emqx_quic_connection.erl create mode 100644 apps/emqx/src/emqx_quic_stream.erl diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ab91c02b4..6900e4f1e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -416,6 +416,13 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> ok = emqx_metrics:inc('bytes.received', Oct), parse_incoming(Data, State); +handle_msg({quic, Data, _Sock, _, _, _}, State) -> + ?LOG(debug, "RECV ~0p", [Data]), + Oct = iolist_size(Data), + inc_counter(incoming_bytes, Oct), + ok = emqx_metrics:inc('bytes.received', Oct), + parse_incoming(Data, State); + handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> ok = emqx_misc:cancel_timer(IdleTimer), diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index d97fe32ed..b3d6bf319 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -149,11 +149,11 @@ start_listener(quic, ListenOn, Options) -> , {alpn, ["mqtt"]} , {conn_acceptors, 32} ], - ConnectionOpts = [ {conn_callback, quicer_server_conn_callback} + ConnectionOpts = [ {conn_callback, emqx_quic_connection} , {idle_timeout_ms, 5000} , {peer_unidi_stream_count, 1} , {peer_bidi_stream_count, 10}], - StreamOpts = [{stream_callback, quicer_echo_server_stream_callback}], + StreamOpts = [], quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}). replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)]. diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl new file mode 100644 index 000000000..b83522c6e --- /dev/null +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -0,0 +1,26 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_quic_connection). + +%% Callbacks +-export([ new_conn/2 + ]). + +new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) -> + new_conn(Conn, maps:to_list(COpts)); +new_conn(Conn, COpts) -> + emqx_connection:start_link(emqx_quic_stream, Conn, COpts). diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl new file mode 100644 index 000000000..e12d95f30 --- /dev/null +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -0,0 +1,83 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% MQTT/QUIC Stream +-module(emqx_quic_stream). + +%% emqx transport Callbacks +-export([ type/1 + , wait/1 + , getstat/2 + , fast_close/1 + , ensure_ok_or_exit/2 + , async_send/3 + , setopts/2 + , getopts/2 + , peername/1 + , sockname/1 + , peercert/1 + ]). + +wait(Conn) -> + quicer:accept_stream(Conn, []). + +type(_) -> + quic. + +peername(S) -> + quicer:peername(S). + +sockname(S) -> + quicer:sockname(S). + +peercert(_S) -> + nossl. + +getstat(Socket, Stats) -> + Res = quicer:getstats(Socket, Stats), + {ok, lists:keyreplace(send_pend, 1, Res, {send_pend, 0})}. + +setopts(_Socket, _Opts) -> + ok. + +getopts(_Socket, _Opts) -> + %% todo + { ok, [{high_watermark, 0}, + {high_msgq_watermark, 0}, + {sndbuf, 0}, + {recbuf, 0}, + {buffer,80000}]}. + +fast_close(Stream) -> + quicer:close_stream(Stream). + +-spec(ensure_ok_or_exit(atom(), list(term())) -> term()). +ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) -> + case erlang:apply(?MODULE, Fun, Args) of + {error, Reason} when Reason =:= enotconn; Reason =:= closed -> + fast_close(Sock), + exit(normal); + {error, Reason} -> + fast_close(Sock), + exit({shutdown, Reason}); + Result -> Result + end. + +async_send(Stream, Data, Options) when is_list(Data) -> + async_send(Stream, iolist_to_binary(Data), Options); +async_send(Stream, Data, _Options) when is_binary(Data) -> + {ok, _Len} = quicer:send(Stream, Data), + ok.