From a51c8869086358e9e4387727bdc3b8f593448f46 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 2 Nov 2022 14:20:17 +0100 Subject: [PATCH] fix: prepare for multi stream --- apps/emqx/src/emqx_connection.erl | 27 ++-- apps/emqx/src/emqx_quic_connection.erl | 72 ++++++--- apps/emqx/src/emqx_quic_stream.erl | 201 +++++++++++++++++++++++-- 3 files changed, 263 insertions(+), 37 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 6c88b87cf..1c8b85808 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT/TCP|TLS Connection +%% MQTT/TCP|TLS Connection|QUIC Stream -module(emqx_connection). -include("emqx.hrl"). @@ -189,12 +189,16 @@ ]} ). --spec start_link( - esockd:transport(), - esockd:socket() | {pid(), quicer:connection_handler()}, - emqx_channel:opts() -) -> - {ok, pid()}. +-spec start_link + (esockd:transport(), esockd:socket(), emqx_channel:opts()) -> + {ok, pid()}; + ( + emqx_quic_stream, + {ConnOwner :: pid(), quicer:connection_handler(), quicer:new_conn_props()}, + emqx_quic_connection:cb_state() + ) -> + {ok, pid()}. + start_link(Transport, Socket, Options) -> Args = [self(), Transport, Socket, Options], CPid = proc_lib:spawn_link(?MODULE, init, Args), @@ -324,6 +328,7 @@ init_state( Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg), FrameOpts = #{ + %% @TODO:q what is strict_mode? strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]), max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) }, @@ -476,7 +481,9 @@ process_msg([Msg | More], State) -> {ok, Msgs, NState} -> process_msg(append_msg(More, Msgs), NState); {stop, Reason, NState} -> - {stop, Reason, NState} + {stop, Reason, NState}; + {stop, Reason} -> + {stop, Reason, State} end catch exit:normal -> @@ -507,7 +514,6 @@ append_msg(Q, Msg) -> %%-------------------------------------------------------------------- %% Handle a Msg - handle_msg({'$gen_call', From, Req}, State) -> case handle_call(From, Req, State) of {reply, Reply, NState} -> @@ -747,6 +753,7 @@ when_bytes_in(Oct, Data, State) -> NState ). +%% @doc: return a reversed Msg list -compile({inline, [next_incoming_msgs/3]}). next_incoming_msgs([Packet], Msgs, State) -> {ok, [{incoming, Packet} | Msgs], State}; @@ -892,6 +899,8 @@ handle_info({sock_error, Reason}, State) -> false -> ok end, handle_info({sock_closed, Reason}, close_socket(State)); +handle_info({quic, Event, Handle, Prop}, State) -> + emqx_quic_stream:Event(Handle, Prop, State); %% handle_info({quic, peer_send_shutdown, _Stream}, State) -> %% handle_info({sock_closed, force}, close_socket(State)); %% handle_info({quic, closed, _Channel, ReasonFlag}, State) -> diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 22d068237..a5af3d4b3 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -16,6 +16,7 @@ -module(emqx_quic_connection). +-include("logger.hrl"). -ifndef(BUILD_WITHOUT_QUIC). -include_lib("quicer/include/quicer.hrl"). -else. @@ -40,37 +41,50 @@ new_stream/3 ]). --type cb_state() :: map(). --type cb_ret() :: ok. - --spec init(map() | list()) -> cb_state(). +-type cb_state() :: #{ + ctrl_pid := undefined | pid(), + conn := undefined | quicer:conneciton_hanlder(), + stream_opts := map(), + is_resumed => boolean(), + _ => _ +}. +-type cb_ret() :: quicer_lib:cb_ret(). +-spec init(map() | list()) -> {ok, cb_state()}. init(ConnOpts) when is_list(ConnOpts) -> init(maps:from_list(ConnOpts)); init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); init(ConnOpts) when is_map(ConnOpts) -> - {ok, ConnOpts}. + {ok, init_cb_state(ConnOpts)}. -spec closed(quicer:conneciton_hanlder(), quicer:conn_closed_props(), cb_state()) -> - {ok, cb_state()} | {error, any()}. -closed(_Conn, #{is_peer_acked := true}, S) -> - {stop, normal, S}; -closed(_Conn, #{is_peer_acked := false}, S) -> - {stop, abnorml, S}. + {stop, normal, cb_state()}. +closed(_Conn, #{is_peer_acked := _} = Prop, S) -> + ?SLOG(debug, Prop), + {stop, normal, S}. -spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. -new_conn(Conn, #{version := _Vsn}, #{zone := Zone} = S) -> +new_conn( + Conn, + #{version := _Vsn} = ConnInfo, + #{zone := Zone, conn := undefined, ctrl_pid := undefined} = S +) -> process_flag(trap_exit, true), + ?SLOG(debug, ConnInfo), case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of false -> - {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S), + {ok, Pid} = emqx_connection:start_link( + emqx_quic_stream, + {self(), Conn, maps:without([crypto_buffer], ConnInfo)}, + S + ), receive {Pid, stream_acceptor_ready} -> ok = quicer:async_handshake(Conn), - {ok, S#{conn => Conn}}; - {'EXIT', Pid, _Reason} -> + {ok, S#{conn := Conn, ctrl_pid := Pid}}; + {'EXIT', _Pid, _Reason} -> {error, stream_accept_error} end; true -> @@ -80,10 +94,12 @@ new_conn(Conn, #{version := _Vsn}, #{zone := Zone} = S) -> -spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. -connected(Conn, _Props, #{slow_start := false} = S) -> +connected(Conn, Props, #{slow_start := false} = S) -> + ?SLOG(debug, Props), {ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S), {ok, S}; -connected(_Conn, _Props, S) -> +connected(_Conn, Props, S) -> + ?SLOG(debug, Props), {ok, S}. -spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret(). @@ -92,10 +108,11 @@ resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when -> ResumeFun(Conn, Data, S); resumed(_Conn, _Data, S) -> - {ok, S}. + {ok, S#{is_resumed := true}}. -spec nst_received(quicer:connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret(). nst_received(_Conn, _Data, S) -> + %% As server we should not recv NST! {stop, no_nst_for_server, S}. -spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret(). @@ -116,14 +133,17 @@ new_stream( Other -> Other end. + -spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret(). shutdown(Conn, _ErrorCode, S) -> + %% @TODO check spec what to do with the ErrorCode? quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. -spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) -> cb_ret(). transport_shutdown(_C, _DownInfo, S) -> + %% @TODO some counter {ok, S}. -spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret(). @@ -140,14 +160,21 @@ local_address_changed(_C, _NewAddr, S) -> {BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()}, cb_state() ) -> cb_ret(). -streams_available(_C, {_BidirCnt, _UnidirCnt}, S) -> - {ok, S}. +streams_available(_C, {BidirCnt, UnidirCnt}, S) -> + {ok, S#{ + peer_bidi_stream_count => BidirCnt, + peer_unidi_stream_count => UnidirCnt + }}. -spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret(). +%% @TODO this is not going to get triggered. %% for https://github.com/microsoft/msquic/issues/3120 peer_needs_streams(_C, undefined, S) -> {ok, S}. +%%% +%%% Internals +%%% -spec is_zone_olp_enabled(emqx_types:zone()) -> boolean(). is_zone_olp_enabled(Zone) -> case emqx_config:get_zone_conf(Zone, [overload_protection]) of @@ -156,3 +183,10 @@ is_zone_olp_enabled(Zone) -> _ -> false end. + +-spec init_cb_state(map()) -> cb_state(). +init_cb_state(Map) -> + Map#{ + ctrl_pid => undefined, + conn => undefined + }. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index fe6ff692c..d9c080c0d 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -17,6 +17,8 @@ %% MQTT/QUIC Stream -module(emqx_quic_stream). +-behaviour(quicer_stream). + %% emqx transport Callbacks -export([ type/1, @@ -32,13 +34,71 @@ peercert/1 ]). -wait({ConnOwner, Conn}) -> +-include("logger.hrl"). +-ifndef(BUILD_WITHOUT_QUIC). +-include_lib("quicer/include/quicer.hrl"). +-else. +%% STREAM SHUTDOWN FLAGS +-define(QUIC_STREAM_SHUTDOWN_FLAG_NONE, 0). +% Cleanly closes the send path. +-define(QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1). +% Abruptly closes the send path. +-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND, 2). +% Abruptly closes the receive path. +-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, 4). +% Abruptly closes both send and receive paths. +-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 6). +-define(QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, 8). +-endif. + +-type cb_ret() :: gen_statem:event_handler_result(). +-type cb_data() :: emqtt_quic:cb_data(). +-type connection_handle() :: quicer:connection_handle(). +-type stream_handle() :: quicer:stream_handle(). + +-export([ + init_handoff/4, + new_stream/3, + start_completed/3, + send_complete/3, + peer_send_shutdown/3, + peer_send_aborted/3, + peer_receive_aborted/3, + send_shutdown_complete/3, + stream_closed/3, + peer_accepted/3, + passive/3, + handle_call/4 +]). + +-export_type([socket/0]). + +-opaque socket() :: {quic, connection_handle(), stream_handle(), socket_info()}. + +-type socket_info() :: #{ + is_orphan => boolean(), + ctrl_stream_start_flags => quicer:stream_open_flags(), + %% quicer:new_conn_props + _ => _ +}. + +-spec wait({pid(), quicer:connection_handle(), socket_info()}) -> + {ok, socket()} | {error, enotconn}. +wait({ConnOwner, Conn, ConnInfo}) -> {ok, Conn} = quicer:async_accept_stream(Conn, []), ConnOwner ! {self(), stream_acceptor_ready}, receive - %% from msquic - {quic, new_stream, Stream, _Props} -> - {ok, {quic, Conn, Stream}}; + %% New incoming stream, this is a *ctrl* stream + {quic, new_stream, Stream, #{is_orphan := IsOrphan, flags := StartFlags}} -> + SocketInfo = ConnInfo#{ + is_orphan => IsOrphan, + ctrl_stream_start_flags => StartFlags + }, + {ok, socket(Conn, Stream, SocketInfo)}; + %% connection closed event for stream acceptor + {quic, closed, undefined, undefined} -> + {error, enotconn}; + %% Connection owner process down {'EXIT', ConnOwner, _Reason} -> {error, enotconn} end. @@ -46,17 +106,17 @@ wait({ConnOwner, Conn}) -> type(_) -> quic. -peername({quic, Conn, _Stream}) -> +peername({quic, Conn, _Stream, _Info}) -> quicer:peername(Conn). -sockname({quic, Conn, _Stream}) -> +sockname({quic, Conn, _Stream, _Info}) -> quicer:sockname(Conn). peercert(_S) -> %% @todo but unsupported by msquic nossl. -getstat({quic, Conn, _Stream}, Stats) -> +getstat({quic, Conn, _Stream, _Info}, Stats) -> case quicer:getstat(Conn, Stats) of {error, _} -> {error, closed}; Res -> Res @@ -84,7 +144,7 @@ getopts(_Socket, _Opts) -> {buffer, 80000} ]}. -fast_close({quic, _Conn, Stream}) -> +fast_close({quic, _Conn, Stream, _Info}) -> %% Flush send buffer, gracefully shutdown quicer:async_shutdown_stream(Stream), ok. @@ -102,8 +162,131 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> Result end. -async_send({quic, _Conn, Stream}, Data, _Options) -> +async_send({quic, _Conn, Stream, _Info}, Data, _Options) -> case quicer:send(Stream, Data) of {ok, _Len} -> ok; Other -> Other end. + +%%% +%%% quicer stream callbacks +%%% + +-spec init_handoff(stream_handle(), #{}, quicer:connection_handle(), #{}) -> cb_ret(). +init_handoff(_Stream, _StreamOpts, _Conn, _Flags) -> + %% stream owner already set while starts. + {stop, unimpl}. + +-spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret(). +new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) -> + {stop, unimpl}. + +-spec peer_accepted(stream_handle(), undefined, cb_data()) -> cb_ret(). +peer_accepted(_Stream, undefined, S) -> + %% We just ignore it + {ok, S}. + +-spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). +peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> + %% we abort send with same reason + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + {ok, S}; +peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) -> + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + {ok, S}. + +-spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). +peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> + %% we abort receive with same reason + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), + {ok, S}; +peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) -> + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), + {ok, S}. + +-spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret(). +peer_send_shutdown(Stream, undefined, S) -> + ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0), + {ok, S}. + +-spec send_complete(stream_handle(), boolean(), cb_data()) -> cb_ret(). +send_complete(_Stream, false, S) -> + {ok, S}; +send_complete(_Stream, true = _IsCancelled, S) -> + ?SLOG(error, #{message => "send cancelled"}), + {ok, S}. + +-spec send_shutdown_complete(stream_handle(), boolean(), cb_data()) -> cb_ret(). +send_shutdown_complete(_Stream, _IsGraceful, S) -> + {ok, S}. + +-spec start_completed(stream_handle(), quicer:stream_start_completed_props(), cb_data()) -> + cb_ret(). +start_completed(_Stream, #{status := success, stream_id := StreamId} = Prop, S) -> + ?SLOG(debug, Prop), + {ok, S#{stream_id => StreamId}}; +start_completed(_Stream, #{status := stream_limit_reached, stream_id := _StreamId} = Prop, _S) -> + ?SLOG(error, #{message => start_completed}, Prop), + {stop, stream_limit_reached}; +start_completed(_Stream, #{status := Other} = Prop, S) -> + ?SLOG(error, Prop), + %% or we could retry? + {stop, {start_fail, Other}, S}. + +%% Local stream, Unidir +%% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data()) +%% -> cb_ret(). +%% handle_stream_data(Stream, Bin, Flags, #{ is_local := true +%% , parse_state := PS} = S) -> +%% ?SLOG(debug, #{data => Bin}, Flags), +%% case parse(Bin, PS, []) of +%% {keep_state, NewPS, Packets} -> +%% quicer:setopt(Stream, active, once), +%% {keep_state, S#{parse_state := NewPS}, +%% [{next_event, cast, P } || P <- lists:reverse(Packets)]}; +%% {stop, _} = Stop -> +%% Stop +%% end; +%% %% Remote stream +%% handle_stream_data(_Stream, _Bin, _Flags, +%% #{is_local := false, is_unidir := true, conn := _Conn} = _S) -> +%% {stop, unimpl}. + +-spec passive(stream_handle(), undefined, cb_data()) -> cb_ret(). +passive(_Stream, undefined, _S) -> + {stop, unimpl}. + +-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> cb_ret(). +stream_closed( + _Stream, + #{ + is_conn_shutdown := IsConnShutdown, + is_app_closing := IsAppClosing, + is_shutdown_by_app := IsAppShutdown, + is_closed_remotely := IsRemote, + status := Status, + error := Code + }, + S +) when + is_boolean(IsConnShutdown) andalso + is_boolean(IsAppClosing) andalso + is_boolean(IsAppShutdown) andalso + is_boolean(IsRemote) andalso + is_atom(Status) andalso + is_integer(Code) +-> + %% @TODO for now we fake a sock_closed for + %% emqx_connection:process_msg to append + %% a msg to be processed + {ok, {sock_closed, Status}, S}. + +handle_call(_Stream, _Request, _Opts, S) -> + {error, unimpl, S}. + +%%% +%%% Internals +%%% +-spec socket(connection_handle(), stream_handle(), socket_info()) -> socket(). +socket(Conn, CtrlStream, Info) when is_map(Info) -> + {quic, Conn, CtrlStream, Info}.