diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 5e545ab2c..b3fba9146 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ [ meck , {bbmustache,"1.10.0"} , {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {branch,"hocon"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.2"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} ]}, {extra_src_dirs, [{"test",[recursive]}]} ]} diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index 860be0a10..a56403dec 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -18,7 +18,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.7"}}}, +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.8"}}}, ExtraDeps = fun(C) -> {deps, Deps0} = lists:keyfind(deps, 1, C), diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 7d7f215c2..26eb346a4 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -135,7 +135,9 @@ , system_code_change/4 ]}). --spec(start_link(esockd:transport(), esockd:socket(), emqx_channel:opts()) +-spec(start_link(esockd:transport(), + esockd:socket() | {pid(), quicer:connection_handler()}, + emqx_channel:opts()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> Args = [self(), Transport, Socket, Options], diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 521366877..2d3357f37 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -178,24 +178,23 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> case [ A || {quicer, _, _} = A<-application:which_applications() ] of [_] -> - %% @fixme unsure why we need reopen lib and reopen config. - quicer_nif:open_lib(), - quicer_nif:reg_open(), DefAcceptors = erlang:system_info(schedulers_online) * 8, ListenOpts = [ {cert, maps:get(certfile, Opts)} , {key, maps:get(keyfile, Opts)} , {alpn, ["mqtt"]} - , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)} - , {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts), - [mqtt, idle_timeout])} + , {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])} + , {idle_timeout_ms, lists:max([ + emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3 + , timer:seconds(maps:get(idle_timeout, Opts))] + )} ], - ConnectionOpts = #{conn_callback => emqx_quic_connection + ConnectionOpts = #{ conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 , peer_bidi_stream_count => 10 , zone => zone(Opts) , listener => {quic, ListenerName} }, - StreamOpts = [], + StreamOpts = [{stream_callback, emqx_quic_stream}], quicer:start_listener(listener_id(quic, ListenerName), port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts}); [] -> diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index cd41e74a7..c23aec17b 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -17,8 +17,41 @@ -module(emqx_quic_connection). %% Callbacks --export([ new_conn/2 +-export([ init/1 + , new_conn/2 + , connected/2 + , shutdown/2 ]). -new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) -> - emqx_connection:start_link(emqx_quic_stream, Conn, COpts). +-type cb_state() :: map() | proplists:proplist(). + + +-spec init(cb_state()) -> cb_state(). +init(ConnOpts) when is_list(ConnOpts) -> + init(maps:from_list(ConnOpts)); +init(ConnOpts) when is_map(ConnOpts) -> + ConnOpts. + +-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +new_conn(Conn, S) -> + process_flag(trap_exit, true), + {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S), + receive + {Pid, stream_acceptor_ready} -> + ok = quicer:async_handshake(Conn), + {ok, S}; + {'EXIT', Pid, _Reason} -> + {error, stream_accept_error} + end. + +-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +connected(Conn, #{slow_start := false} = S) -> + {ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S), + {ok, S}; +connected(_Conn, S) -> + {ok, S}. + +-spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +shutdown(Conn, S) -> + quicer:async_close_connection(Conn), + {ok, S}. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 236c11ad3..bba1876c4 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -31,8 +31,16 @@ , peercert/1 ]). -wait(Conn) -> - quicer:accept_stream(Conn, []). +wait({ConnOwner, Conn}) -> + {ok, Conn} = quicer:async_accept_stream(Conn, []), + ConnOwner ! {self(), stream_acceptor_ready}, + receive + %% from msquic + {quic, new_stream, Stream} -> + {ok, Stream}; + {'EXIT', ConnOwner, _Reason} -> + {error, enotconn} + end. type(_) -> quic. @@ -44,6 +52,7 @@ sockname(S) -> quicer:sockname(S). peercert(_S) -> + %% @todo but unsupported by msquic nossl. getstat(Socket, Stats) -> @@ -88,5 +97,8 @@ ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) -> async_send(Stream, Data, Options) when is_list(Data) -> async_send(Stream, iolist_to_binary(Data), Options); async_send(Stream, Data, _Options) when is_binary(Data) -> - {ok, _Len} = quicer:send(Stream, Data), - ok. + case quicer:send(Stream, Data) of + {ok, _Len} -> ok; + Other -> + Other + end. diff --git a/rebar.config b/rebar.config index 4a6b51cfa..65ecd8bf9 100644 --- a/rebar.config +++ b/rebar.config @@ -55,7 +55,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.2"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {getopt, "1.0.2"} diff --git a/rebar.config.erl b/rebar.config.erl index 3a189dba0..be08dc68c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -16,7 +16,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.7"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.8"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config),