From cf04e5cce364756ae226bbb7f7f6428061f5d0cb Mon Sep 17 00:00:00 2001 From: William Yang Date: Sat, 7 Aug 2021 11:46:10 +0200 Subject: [PATCH 1/7] feat(quic): adapt to new quicer --- apps/emqx/src/emqx_listeners.erl | 7 ++--- apps/emqx/src/emqx_quic_connection.erl | 41 ++++++++++++++++++++++++-- apps/emqx/src/emqx_quic_stream.erl | 7 +++-- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 521366877..411caeea9 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -178,9 +178,6 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> case [ A || {quicer, _, _} = A<-application:which_applications() ] of [_] -> - %% @fixme unsure why we need reopen lib and reopen config. - quicer_nif:open_lib(), - quicer_nif:reg_open(), DefAcceptors = erlang:system_info(schedulers_online) * 8, ListenOpts = [ {cert, maps:get(certfile, Opts)} , {key, maps:get(keyfile, Opts)} @@ -189,13 +186,13 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> , {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout])} ], - ConnectionOpts = #{conn_callback => emqx_quic_connection + ConnectionOpts = #{ conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 , peer_bidi_stream_count => 10 , zone => zone(Opts) , listener => {quic, ListenerName} }, - StreamOpts = [], + StreamOpts = [{stream_callback, emqx_quic_stream}], quicer:start_listener(listener_id(quic, ListenerName), port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts}); [] -> diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index cd41e74a7..7ac130278 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -17,8 +17,43 @@ -module(emqx_quic_connection). %% Callbacks --export([ new_conn/2 +-export([ init/1 + , new_conn/2 + , connected/2 + , shutdown/2 ]). -new_conn(Conn, {_L, COpts, _S}) when is_map(COpts) -> - emqx_connection:start_link(emqx_quic_stream, Conn, COpts). +-type cb_state() :: map() | proplists:proplist(). + + +-spec init(cb_state()) -> cb_state(). +init(ConnOpts) when is_list(ConnOpts) -> + init(maps:from_list(ConnOpts)); +init(ConnOpts) when is_map(ConnOpts) -> + ConnOpts. + +-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +new_conn(Conn, S) -> + case emqx_connection:start_link(emqx_quic_stream, Conn, S) of + {ok, _Pid} -> + ok = quicer:async_handshake(Conn), + {ok, S}; + Other -> + {error, Other} + end. + +-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +connected(Conn, #{slow_start := false} = S) -> + case emqx_connection:start_link(emqx_quic_stream, Conn, S) of + {ok, _Pid} -> + {ok, S}; + Other -> + {error, Other} + end; +connected(_Conn, S) -> + {ok, S}. + +-spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. +shutdown(Conn, S) -> + quicer:async_close_connection(Conn), + {ok, S}. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 236c11ad3..76a365340 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -88,5 +88,8 @@ ensure_ok_or_exit(Fun, Args = [Sock|_]) when is_atom(Fun), is_list(Args) -> async_send(Stream, Data, Options) when is_list(Data) -> async_send(Stream, iolist_to_binary(Data), Options); async_send(Stream, Data, _Options) when is_binary(Data) -> - {ok, _Len} = quicer:send(Stream, Data), - ok. + case quicer:send(Stream, Data) of + {ok, _Len} -> ok; + Other -> + Other + end. From abd58bb23553585a139dccac6275e0efd4385a29 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 25 Aug 2021 09:22:18 +0200 Subject: [PATCH 2/7] feat(quic): idle_timeout transport idle timeout should be at least 3x mqtt idle_timeout --- apps/emqx/src/emqx_listeners.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 411caeea9..3c132f335 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -183,8 +183,10 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> , {key, maps:get(keyfile, Opts)} , {alpn, ["mqtt"]} , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)} - , {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts), - [mqtt, idle_timeout])} + , {idle_timeout_ms, lists:max([ + emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3 + , timer:seconds(maps:get(idle_timeout, Opts))] + )} ], ConnectionOpts = #{ conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 From 2ef2acc8506f517ca7d1062f432078601fd80c13 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 25 Aug 2021 09:27:19 +0200 Subject: [PATCH 3/7] feat(quic): adapt to quicer 0.0.8 --- apps/emqx/src/emqx_quic_connection.erl | 14 ++++++++++---- apps/emqx/src/emqx_quic_stream.erl | 13 +++++++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 7ac130278..2fe911001 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -34,10 +34,16 @@ init(ConnOpts) when is_map(ConnOpts) -> -spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. new_conn(Conn, S) -> - case emqx_connection:start_link(emqx_quic_stream, Conn, S) of - {ok, _Pid} -> - ok = quicer:async_handshake(Conn), - {ok, S}; + process_flag(trap_exit, true), + case emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S) of + {ok, Pid} -> + receive + {Pid, stream_acceptor_ready} -> + ok = quicer:async_handshake(Conn), + {ok, S}; + {'EXIT', Pid, _Reason} -> + {error, stream_accept_error} + end; Other -> {error, Other} end. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 76a365340..bba1876c4 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -31,8 +31,16 @@ , peercert/1 ]). -wait(Conn) -> - quicer:accept_stream(Conn, []). +wait({ConnOwner, Conn}) -> + {ok, Conn} = quicer:async_accept_stream(Conn, []), + ConnOwner ! {self(), stream_acceptor_ready}, + receive + %% from msquic + {quic, new_stream, Stream} -> + {ok, Stream}; + {'EXIT', ConnOwner, _Reason} -> + {error, enotconn} + end. type(_) -> quic. @@ -44,6 +52,7 @@ sockname(S) -> quicer:sockname(S). peercert(_S) -> + %% @todo but unsupported by msquic nossl. getstat(Socket, Stats) -> From eb7625a5956d4310c12f07e01bf585f69869e540 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 25 Aug 2021 17:04:04 +0200 Subject: [PATCH 4/7] fix: quic listener spec --- apps/emqx/src/emqx_connection.erl | 4 +++- apps/emqx/src/emqx_quic_connection.erl | 8 ++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 7d7f215c2..26eb346a4 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -135,7 +135,9 @@ , system_code_change/4 ]}). --spec(start_link(esockd:transport(), esockd:socket(), emqx_channel:opts()) +-spec(start_link(esockd:transport(), + esockd:socket() | {pid(), quicer:connection_handler()}, + emqx_channel:opts()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> Args = [self(), Transport, Socket, Options], diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 2fe911001..3de607f75 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -50,12 +50,8 @@ new_conn(Conn, S) -> -spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. connected(Conn, #{slow_start := false} = S) -> - case emqx_connection:start_link(emqx_quic_stream, Conn, S) of - {ok, _Pid} -> - {ok, S}; - Other -> - {error, Other} - end; + {ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S), + {ok, S}; connected(_Conn, S) -> {ok, S}. From 1b5acdb9c59fecd345c7124b78d62f5c7791b38e Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 25 Aug 2021 17:04:22 +0200 Subject: [PATCH 5/7] feat(quic): at least 8 x number of cores acceptors --- apps/emqx/src/emqx_listeners.erl | 2 +- apps/emqx/src/emqx_quic_connection.erl | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 3c132f335..2d3357f37 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -182,7 +182,7 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> ListenOpts = [ {cert, maps:get(certfile, Opts)} , {key, maps:get(keyfile, Opts)} , {alpn, ["mqtt"]} - , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)} + , {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])} , {idle_timeout_ms, lists:max([ emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3 , timer:seconds(maps:get(idle_timeout, Opts))] diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 3de607f75..c23aec17b 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -35,17 +35,13 @@ init(ConnOpts) when is_map(ConnOpts) -> -spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. new_conn(Conn, S) -> process_flag(trap_exit, true), - case emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S) of - {ok, Pid} -> - receive - {Pid, stream_acceptor_ready} -> - ok = quicer:async_handshake(Conn), - {ok, S}; - {'EXIT', Pid, _Reason} -> - {error, stream_accept_error} - end; - Other -> - {error, Other} + {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S), + receive + {Pid, stream_acceptor_ready} -> + ok = quicer:async_handshake(Conn), + {ok, S}; + {'EXIT', Pid, _Reason} -> + {error, stream_accept_error} end. -spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. From 6186fa29a093fa19a7d683438cb54aa916d69be5 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 26 Aug 2021 15:15:00 +0200 Subject: [PATCH 6/7] feat(quic): bump to emqtt 1.4.3 --- apps/emqx/rebar.config | 2 +- rebar.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index c2229ce0f..b5aef703a 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ [ meck , {bbmustache,"1.10.0"} , {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {branch,"hocon"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.2"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} ]}, {extra_src_dirs, [{"test",[recursive]}]} ]} diff --git a/rebar.config b/rebar.config index 75d78b64e..dc883f1a0 100644 --- a/rebar.config +++ b/rebar.config @@ -55,7 +55,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, "0.3.3"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.2"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.2"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {getopt, "1.0.2"} From dbc971f26463dd8455919297480f9fd21a62c60c Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 26 Aug 2021 16:45:03 +0200 Subject: [PATCH 7/7] feat(quic): bump quicer to 0.0.8 --- apps/emqx/rebar.config.script | 2 +- rebar.config.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index 860be0a10..a56403dec 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -18,7 +18,7 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.7"}}}, +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {branch, "0.0.8"}}}, ExtraDeps = fun(C) -> {deps, Deps0} = lists:keyfind(deps, 1, C), diff --git a/rebar.config.erl b/rebar.config.erl index 3a189dba0..be08dc68c 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -16,7 +16,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {branch, "0.6.0"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.7"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.8"}}}. deps(Config) -> {deps, OldDeps} = lists:keyfind(deps, 1, Config),