feat(quic): reuse emqx_connection module for quic.

This commit is contained in:
William Yang 2021-03-30 23:39:09 +02:00
parent 899ba579fc
commit 70f22d2c1b
4 changed files with 118 additions and 2 deletions

View File

@ -416,6 +416,13 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
ok = emqx_metrics:inc('bytes.received', Oct),
parse_incoming(Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) ->
?LOG(debug, "RECV ~0p", [Data]),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct),
parse_incoming(Data, State);
handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}) ->
ok = emqx_misc:cancel_timer(IdleTimer),

View File

@ -149,11 +149,11 @@ start_listener(quic, ListenOn, Options) ->
, {alpn, ["mqtt"]}
, {conn_acceptors, 32}
],
ConnectionOpts = [ {conn_callback, quicer_server_conn_callback}
ConnectionOpts = [ {conn_callback, emqx_quic_connection}
, {idle_timeout_ms, 5000}
, {peer_unidi_stream_count, 1}
, {peer_bidi_stream_count, 10}],
StreamOpts = [{stream_callback, quicer_echo_server_stream_callback}],
StreamOpts = [],
quicer:start_listener('mqtt:quic', ListenOn, {ListenOpts, ConnectionOpts, StreamOpts}).
replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)].

View File

@ -0,0 +1,26 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_quic_connection).
%% Callbacks
-export([ new_conn/2
]).
new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) ->
new_conn(Conn, maps:to_list(COpts));
new_conn(Conn, COpts) ->
emqx_connection:start_link(emqx_quic_stream, Conn, COpts).

View File

@ -0,0 +1,83 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT/QUIC Stream
-module(emqx_quic_stream).
%% emqx transport Callbacks
-export([ type/1
, wait/1
, getstat/2
, fast_close/1
, ensure_ok_or_exit/2
, async_send/3
, setopts/2
, getopts/2
, peername/1
, sockname/1
, peercert/1
]).
wait(Conn) ->
quicer:accept_stream(Conn, []).
type(_) ->
quic.
peername(S) ->
quicer:peername(S).
sockname(S) ->
quicer:sockname(S).
peercert(_S) ->
nossl.
getstat(Socket, Stats) ->
Res = quicer:getstats(Socket, Stats),
{ok, lists:keyreplace(send_pend, 1, Res, {send_pend, 0})}.
setopts(_Socket, _Opts) ->
ok.
getopts(_Socket, _Opts) ->
%% todo
{ ok, [{high_watermark, 0},
{high_msgq_watermark, 0},
{sndbuf, 0},
{recbuf, 0},
{buffer,80000}]}.
fast_close(Stream) ->
quicer:close_stream(Stream).
-spec(ensure_ok_or_exit(atom(), list(term())) -> term()).
ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) ->
case erlang:apply(?MODULE, Fun, Args) of
{error, Reason} when Reason =:= enotconn; Reason =:= closed ->
fast_close(Sock),
exit(normal);
{error, Reason} ->
fast_close(Sock),
exit({shutdown, Reason});
Result -> Result
end.
async_send(Stream, Data, Options) when is_list(Data) ->
async_send(Stream, iolist_to_binary(Data), Options);
async_send(Stream, Data, _Options) when is_binary(Data) ->
{ok, _Len} = quicer:send(Stream, Data),
ok.