diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 5b783f2fe..6c88b87cf 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -525,11 +525,10 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), when_bytes_in(Oct, Data, State); -handle_msg({quic, Data, _Sock, _, _, _}, State) -> - Oct = iolist_size(Data), - inc_counter(incoming_bytes, Oct), - ok = emqx_metrics:inc('bytes.received', Oct), - when_bytes_in(Oct, Data, State); +handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) -> + inc_counter(incoming_bytes, Len), + ok = emqx_metrics:inc('bytes.received', Len), + when_bytes_in(Len, Data, State); handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> case queue:peek(Cache) of empty -> @@ -893,12 +892,12 @@ handle_info({sock_error, Reason}, State) -> false -> ok end, handle_info({sock_closed, Reason}, close_socket(State)); -handle_info({quic, peer_send_shutdown, _Stream}, State) -> - handle_info({sock_closed, force}, close_socket(State)); -handle_info({quic, closed, _Channel, ReasonFlag}, State) -> - handle_info({sock_closed, ReasonFlag}, State); -handle_info({quic, closed, _Stream}, State) -> - handle_info({sock_closed, force}, State); +%% handle_info({quic, peer_send_shutdown, _Stream}, State) -> +%% handle_info({sock_closed, force}, close_socket(State)); +%% handle_info({quic, closed, _Channel, ReasonFlag}, State) -> +%% handle_info({sock_closed, ReasonFlag}, State); +%% handle_info({quic, closed, _Stream}, State) -> +%% handle_info({sock_closed, force}, State); handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 003c8785e..45f3b2cfd 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -375,7 +375,8 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)}, {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)}, {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)}, - {server_resumption_level, 2} + {server_resumption_level, 2}, + {verify, none} ], ConnectionOpts = #{ conn_callback => emqx_quic_connection, diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 9a2589a3a..6da9ec9a8 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -22,24 +22,42 @@ -define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0). -endif. -%% Callbacks +-behavior(quicer_connection). + -export([ init/1, - new_conn/2, - connected/2, - shutdown/2 + new_conn/3, + connected/3, + transport_shutdown/3, + shutdown/3, + closed/3, + local_address_changed/3, + peer_address_changed/3, + streams_available/3, + peer_needs_streams/3, + resumed/3, + nst_received/3, + new_stream/3 ]). -type cb_state() :: map() | proplists:proplist(). +-type cb_ret() :: ok. --spec init(cb_state()) -> cb_state(). init(ConnOpts) when is_list(ConnOpts) -> init(maps:from_list(ConnOpts)); +init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> + init(S#{stream_opts := maps:from_list(SOpts)}); init(ConnOpts) when is_map(ConnOpts) -> - ConnOpts. + {ok, ConnOpts}. --spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. -new_conn(Conn, #{zone := Zone} = S) -> +closed(_Conn, #{is_peer_acked := true}, S) -> + {stop, normal, S}; +closed(_Conn, #{is_peer_acked := false}, S) -> + {stop, abnorml, S}. + +-spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) -> + {ok, cb_state()} | {error, any()}. +new_conn(Conn, #{version := _Vsn}, #{zone := Zone} = S) -> process_flag(trap_exit, true), case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of false -> @@ -47,7 +65,7 @@ new_conn(Conn, #{zone := Zone} = S) -> receive {Pid, stream_acceptor_ready} -> ok = quicer:async_handshake(Conn), - {ok, S}; + {ok, S#{conn => Conn}}; {'EXIT', Pid, _Reason} -> {error, stream_accept_error} end; @@ -56,18 +74,76 @@ new_conn(Conn, #{zone := Zone} = S) -> {error, overloaded} end. --spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. -connected(Conn, #{slow_start := false} = S) -> +-spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) -> + {ok, cb_state()} | {error, any()}. +connected(Conn, _Props, #{slow_start := false} = S) -> {ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S), {ok, S}; -connected(_Conn, S) -> +connected(_Conn, _Props, S) -> {ok, S}. --spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. -shutdown(Conn, S) -> +-spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret(). +resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when + is_function(ResumeFun) +-> + ResumeFun(Conn, Data, S); +resumed(_Conn, _Data, S) -> + {ok, S}. + +-spec nst_received(quicer:connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret(). +nst_received(_Conn, _Data, S) -> + {stop, no_nst_for_server, S}. + +-spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret(). +new_stream( + Stream, + #{is_orphan := true} = Props, + #{ + conn := Conn, + streams := Streams, + stream_opts := SOpts + } = CBState +) -> + %% Spawn new stream + case quicer_stream:start_link(emqx_quic_stream, Stream, Conn, SOpts, Props) of + {ok, StreamOwner} -> + quicer_connection:handoff_stream(Stream, StreamOwner), + {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}}; + Other -> + Other + end. +-spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret(). +shutdown(Conn, _ErrorCode, S) -> quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. +-spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) -> + cb_ret(). +transport_shutdown(_C, _DownInfo, S) -> + {ok, S}. + +-spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret(). +peer_address_changed(_C, _NewAddr, S) -> + {ok, S}. + +-spec local_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state()) -> + cb_ret(). +local_address_changed(_C, _NewAddr, S) -> + {ok, S}. + +-spec streams_available( + quicer:connection_handle(), + {BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()}, + cb_state() +) -> cb_ret(). +streams_available(_C, {_BidirCnt, _UnidirCnt}, S) -> + {ok, S}. + +-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret(). +%% for https://github.com/microsoft/msquic/issues/3120 +peer_needs_streams(_C, undefined, S) -> + {ok, S}. + -spec is_zone_olp_enabled(emqx_types:zone()) -> boolean(). is_zone_olp_enabled(Zone) -> case emqx_config:get_zone_conf(Zone, [overload_protection]) of diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 567488862..fe6ff692c 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -37,7 +37,7 @@ wait({ConnOwner, Conn}) -> ConnOwner ! {self(), stream_acceptor_ready}, receive %% from msquic - {quic, new_stream, Stream} -> + {quic, new_stream, Stream, _Props} -> {ok, {quic, Conn, Stream}}; {'EXIT', ConnOwner, _Reason} -> {error, enotconn} diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 7e97c5bf4..07299bd42 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -78,6 +78,14 @@ end_per_group(_Group, _Config) -> init_per_suite(Config) -> %% Start Apps + %% dbg:tracer(process, {fun dbg:dhandler/2,group_leader()}), + %% dbg:p(all,c), + %% dbg:tp(emqx_quic_connection,cx), + %% dbg:tp(emqx_quic_stream,cx), + %% dbg:tp(emqtt_quic,cx), + %% dbg:tp(emqtt,cx), + %% dbg:tp(emqtt_quic_stream,cx), + %% dbg:tp(emqtt_quic_connection,cx), emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), Config.