From cf04e5cce364756ae226bbb7f7f6428061f5d0cb Mon Sep 17 00:00:00 2001 From: William Yang Date: Sat, 7 Aug 2021 11:46:10 +0200 Subject: [PATCH] feat(quic): adapt to new quicer --- apps/emqx/src/emqx_listeners.erl | 7 ++--- apps/emqx/src/emqx_quic_connection.erl | 41 ++++++++++++++++++++++++-- apps/emqx/src/emqx_quic_stream.erl | 7 +++-- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 521366877..411caeea9 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -178,9 +178,6 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> case [ A || {quicer, _, _} = A<-application:which_applications() ] of [_] -> - %% @fixme unsure why we need reopen lib and reopen config. - quicer_nif:open_lib(), - quicer_nif:reg_open(), DefAcceptors = erlang:system_info(schedulers_online) * 8, ListenOpts = [ {cert, maps:get(certfile, Opts)} , {key, maps:get(keyfile, Opts)} @@ -189,13 +186,13 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> , {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout])} ], - ConnectionOpts = #{conn_callback => emqx_quic_connection + ConnectionOpts = #{ conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 , peer_bidi_stream_count => 10 , zone => zone(Opts) , listener => {quic, ListenerName} }, - StreamOpts = [], + StreamOpts = [{stream_callback, emqx_quic_stream}], quicer:start_listener(listener_id(quic, ListenerName), port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts}); [] -> diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index cd41e74a7..7ac130278 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -17,8 +17,43 @@ -module(emqx_quic_connection). %% Callbacks --export([ new_conn/2 +-export([ init/1 + , new_conn/2 + , connected/2 + , shutdown/2 ]). -new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) -> - emqx_connection:start_link(emqx_quic_stream, Conn, COpts). +-type cb_state() :: map() | proplists:proplist(). + + +-spec init(cb_state()) -> cb_state(). +init(ConnOpts) when is_list(ConnOpts) -> + init(maps:from_list(ConnOpts)); +init(ConnOpts) when is_map(ConnOpts) -> + ConnOpts. + +-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +new_conn(Conn, S) -> + case emqx_connection:start_link(emqx_quic_stream, Conn, S) of + {ok, _Pid} -> + ok = quicer:async_handshake(Conn), + {ok, S}; + Other -> + {error, Other} + end. + +-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +connected(Conn, #{slow_start := false} = S) -> + case emqx_connection:start_link(emqx_quic_stream, Conn, S) of + {ok, _Pid} -> + {ok, S}; + Other -> + {error, Other} + end; +connected(_Conn, S) -> + {ok, S}. + +-spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +shutdown(Conn, S) -> + quicer:async_close_connection(Conn), + {ok, S}. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 236c11ad3..76a365340 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -88,5 +88,8 @@ ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) -> async_send(Stream, Data, Options) when is_list(Data) -> async_send(Stream, iolist_to_binary(Data), Options); async_send(Stream, Data, _Options) when is_binary(Data) -> - {ok, _Len} = quicer:send(Stream, Data), - ok. + case quicer:send(Stream, Data) of + {ok, _Len} -> ok; + Other -> + Other + end.