From 9f696928b6174bb17f4505ee8a1f4e72df028dcb Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 25 Nov 2022 15:15:52 +0100 Subject: [PATCH] feat(quic): multi streams --- apps/emqx/src/emqx_channel.erl | 1 + apps/emqx/src/emqx_connection.erl | 37 +- apps/emqx/src/emqx_quic_connection.erl | 139 +++++- apps/emqx/src/emqx_quic_data_stream.erl | 466 ++++++++++++++++++ apps/emqx/src/emqx_quic_stream.erl | 43 +- .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 22 +- .../test/emqx_quic_multistreams_SUITE.erl | 190 +++++++ 7 files changed, 848 insertions(+), 50 deletions(-) create mode 100644 apps/emqx/src/emqx_quic_data_stream.erl create mode 100644 apps/emqx/test/emqx_quic_multistreams_SUITE.erl diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index e82adc786..a12df9c64 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1136,6 +1136,7 @@ do_deliver(Publishes, Channel) when is_list(Publishes) -> {Packets, NChannel} = lists:foldl( fun(Publish, {Acc, Chann}) -> + %% @FIXME perf: list append with copy left list {Packets, NChann} = do_deliver(Publish, Chann), {Packets ++ Acc, NChann} end, diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 1c8b85808..980c41010 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -14,7 +14,12 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT/TCP|TLS Connection|QUIC Stream +%% This module interacts with the transport layer of MQTT +%% Transport: +%% - TCP connection +%% - TCP/TLS connection +%% - WebSocket +%% - QUIC Stream -module(emqx_connection). -include("emqx.hrl"). @@ -111,7 +116,13 @@ limiter_buffer :: queue:queue(pending_req()), %% limiter timers - limiter_timer :: undefined | reference() + limiter_timer :: undefined | reference(), + + %% QUIC conn pid if is a pid + quic_conn_pid :: maybe(pid()), + + %% QUIC control stream callback state + quic_ctrl_state :: map() }). -record(retry, { @@ -194,7 +205,7 @@ {ok, pid()}; ( emqx_quic_stream, - {ConnOwner :: pid(), quicer:connection_handler(), quicer:new_conn_props()}, + {ConnOwner :: pid(), quicer:connection_handle(), quicer:new_conn_props()}, emqx_quic_connection:cb_state() ) -> {ok, pid()}. @@ -334,6 +345,7 @@ init_state( }, ParseState = emqx_frame:initial_parse_state(FrameOpts), Serialize = emqx_frame:serialize_opts(), + %% Init Channel Channel = emqx_channel:init(ConnInfo, Opts), GcState = case emqx_config:get_zone_conf(Zone, [force_gc]) of @@ -364,7 +376,10 @@ init_state( zone = Zone, listener = Listener, limiter_buffer = queue:new(), - limiter_timer = undefined + limiter_timer = undefined, + %% for quic streams to inherit + quic_conn_pid = maps:get(conn_pid, Opts, undefined), + quic_ctrl_state = #{} }. run_loop( @@ -600,9 +615,20 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_msg({connack, ConnAck}, State) -> handle_outgoing(ConnAck, State); handle_msg({close, Reason}, State) -> + %% @FIXME here it could be close due to appl error. ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); -handle_msg({event, connected}, State = #state{channel = Channel}) -> +handle_msg( + {event, connected}, + State = #state{ + channel = Channel, + serialize = Serialize, + parse_state = PS, + quic_conn_pid = QuicConnPid + } +) -> + QuicConnPid =/= undefined andalso + emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}), ClientId = emqx_channel:info(clientid, Channel), emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); handle_msg({event, disconnected}, State = #state{channel = Channel}) -> @@ -876,6 +902,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ok; Error = {error, _Reason} -> %% Send an inet_reply to postpone handling the error + %% @FIXME: why not just return error? self() ! {inet_reply, Socket, Error}, ok end. diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index a5af3d4b3..de7776429 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -14,6 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- +%% @doc impl. the quic connection owner process. -module(emqx_quic_connection). -include("logger.hrl"). @@ -41,15 +42,46 @@ new_stream/3 ]). +-export([activate_data_streams/2]). + +-export([ + handle_call/3, + handle_info/2 +]). + -type cb_state() :: #{ + %% connecion owner pid + conn_pid := pid(), + %% Pid of ctrl stream ctrl_pid := undefined | pid(), + %% quic connecion handle conn := undefined | quicer:conneciton_hanlder(), + %% streams that handoff from this process, excluding control stream + %% these streams could die/closed without effecting the connecion/session. + + %@TODO type? + streams := [{pid(), quicer:stream_handle()}], + %% New stream opts stream_opts := map(), + %% If conneciton is resumed from session ticket is_resumed => boolean(), + %% mqtt message serializer config + serialize => undefined, _ => _ }. -type cb_ret() :: quicer_lib:cb_ret(). +%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked +%% for the activation from control stream after it is accepted as a legit conneciton. +%% For security, the initial number of allowed data streams from client should be limited by +%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count` +-spec activate_data_streams(pid(), { + emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel() +}) -> ok. +activate_data_streams(ConnOwner, {PS, Serialize, Channel}) -> + gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity). + +%% @doc conneciton owner init callback -spec init(map() | list()) -> {ok, cb_state()}. init(ConnOpts) when is_list(ConnOpts) -> init(maps:from_list(ConnOpts)); @@ -64,6 +96,7 @@ closed(_Conn, #{is_peer_acked := _} = Prop, S) -> ?SLOG(debug, Prop), {stop, normal, S}. +%% @doc handle the new incoming connecion as the connecion acceptor. -spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. new_conn( @@ -75,15 +108,17 @@ new_conn( ?SLOG(debug, ConnInfo), case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of false -> - {ok, Pid} = emqx_connection:start_link( + %% Start control stream process + StartOption = S, + {ok, CtrlPid} = emqx_connection:start_link( emqx_quic_stream, {self(), Conn, maps:without([crypto_buffer], ConnInfo)}, - S + StartOption ), receive - {Pid, stream_acceptor_ready} -> + {CtrlPid, stream_acceptor_ready} -> ok = quicer:async_handshake(Conn), - {ok, S#{conn := Conn, ctrl_pid := Pid}}; + {ok, S#{conn := Conn, ctrl_pid := CtrlPid}}; {'EXIT', _Pid, _Reason} -> {error, stream_accept_error} end; @@ -92,6 +127,7 @@ new_conn( {error, overloaded} end. +%% @doc callback when connection is connected. -spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. connected(Conn, Props, #{slow_start := false} = S) -> @@ -102,6 +138,7 @@ connected(_Conn, Props, S) -> ?SLOG(debug, Props), {ok, S}. +%% @doc callback when connection is resumed from 0-RTT -spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret(). resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when is_function(ResumeFun) @@ -110,51 +147,77 @@ resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when resumed(_Conn, _Data, S) -> {ok, S#{is_resumed := true}}. +%% @doc callback for receiving nst, should never happen on server. -spec nst_received(quicer:connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret(). nst_received(_Conn, _Data, S) -> %% As server we should not recv NST! {stop, no_nst_for_server, S}. +%% @doc callback for handling orphan data streams +%% depends on the connecion state and control stream state. -spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret(). new_stream( Stream, - #{is_orphan := true} = Props, + #{is_orphan := true, flags := _Flags} = Props, #{ conn := Conn, streams := Streams, - stream_opts := SOpts - } = CBState + stream_opts := SOpts, + zone := Zone, + limiter := Limiter, + parse_state := PS, + channel := Channel, + serialize := Serialize + } = S ) -> - %% Spawn new stream - case quicer_stream:start_link(emqx_quic_stream, Stream, Conn, SOpts, Props) of - {ok, StreamOwner} -> - quicer_connection:handoff_stream(Stream, StreamOwner), - {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}}; - Other -> - Other - end. + %% Cherry pick options for data streams + SOpts1 = SOpts#{ + is_local => false, + zone => Zone, + % unused + limiter => Limiter, + parse_state => PS, + channel => Channel, + serialize => Serialize + }, + {ok, NewStreamOwner} = quicer_stream:start_link( + emqx_quic_data_stream, + Stream, + Conn, + SOpts1, + Props + ), + quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}), + %% @TODO keep them in ``inactive_streams' + {ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}. +%% @doc callback for handling for remote connecion shutdown. -spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret(). shutdown(Conn, _ErrorCode, S) -> - %% @TODO check spec what to do with the ErrorCode? + %% @TODO check spec what to set for the ErrorCode? quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. +%% @doc callback for handling for transport error, such as idle timeout -spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) -> cb_ret(). transport_shutdown(_C, _DownInfo, S) -> %% @TODO some counter {ok, S}. +%% @doc callback for handling for peer addr changed. -spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret(). peer_address_changed(_C, _NewAddr, S) -> + %% @TODO update session info? {ok, S}. +%% @doc callback for handling local addr change, currently unused -spec local_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state()) -> cb_ret(). local_address_changed(_C, _NewAddr, S) -> {ok, S}. +%% @doc callback for handling remote stream limit updates -spec streams_available( quicer:connection_handle(), {BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()}, @@ -166,12 +229,43 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) -> peer_unidi_stream_count => UnidirCnt }}. --spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret(). -%% @TODO this is not going to get triggered. +%% @doc callback for handling request when remote wants for more streams +%% should cope with rate limiting +%% @TODO this is not going to get triggered in current version %% for https://github.com/microsoft/msquic/issues/3120 +-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret(). peer_needs_streams(_C, undefined, S) -> {ok, S}. +%% @doc handle API calls +handle_call( + {activate_data_streams, {PS, Serialize, Channel} = ActivateData}, + _From, + #{streams := Streams} = S +) -> + [emqx_quic_data_stream:activate_data(OwnerPid, ActivateData) || {OwnerPid, _Stream} <- Streams], + {reply, ok, S#{ + %streams := [], %% @FIXME what ?????? + channel := Channel, + serialize := Serialize, + parse_state := PS + }}; +handle_call(_Req, _From, S) -> + {reply, {error, unimpl}, S}. + +%% @doc handle DOWN messages from streams. +%% @TODO handle DOWN from supervisor? +handle_info({'DOWN', _Ref, process, Pid, Reason}, #{streams := Streams} = S) when + Reason =:= normal orelse + Reason =:= {shutdown, protocol_error} +-> + case proplists:is_defined(Pid, Streams) of + true -> + {ok, S}; + false -> + {stop, unknown_pid_down, S} + end. + %%% %%% Internals %%% @@ -185,8 +279,13 @@ is_zone_olp_enabled(Zone) -> end. -spec init_cb_state(map()) -> cb_state(). -init_cb_state(Map) -> +init_cb_state(#{zone := _Zone} = Map) -> Map#{ + conn_pid => self(), ctrl_pid => undefined, - conn => undefined + conn => undefined, + streams => [], + parse_state => undefined, + channel => undefined, + serialize => undefined }. diff --git a/apps/emqx/src/emqx_quic_data_stream.erl b/apps/emqx/src/emqx_quic_data_stream.erl new file mode 100644 index 000000000..72f0e913f --- /dev/null +++ b/apps/emqx/src/emqx_quic_data_stream.erl @@ -0,0 +1,466 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% +%% @doc QUIC data stream +%% Following the behaviour of emqx_connection: +%% The MQTT packets and their side effects are handled *atomically*. +%% + +-module(emqx_quic_data_stream). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("quicer/include/quicer.hrl"). +-include("emqx_mqtt.hrl"). +-include("logger.hrl"). +-behaviour(quicer_stream). + +%% Connection Callbacks +-export([ + init_handoff/4, + post_handoff/3, + new_stream/3, + start_completed/3, + send_complete/3, + peer_send_shutdown/3, + peer_send_aborted/3, + peer_receive_aborted/3, + send_shutdown_complete/3, + stream_closed/3, + peer_accepted/3, + passive/3 +]). + +-export([handle_stream_data/4]). + +-export([activate_data/2]). + +-export([ + handle_call/3, + handle_info/2, + handle_continue/2 +]). + +%% +%% @doc Activate the data handling. +%% Data handling is disabled before control stream allows the data processing. +-spec activate_data(pid(), { + emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel() +}) -> ok. +activate_data(StreamPid, {PS, Serialize, Channel}) -> + gen_server:call(StreamPid, {activate, {PS, Serialize, Channel}}, infinity). + +%% +%% @doc Handoff from previous owner, mostly from the connection owner. +%% @TODO parse_state doesn't look necessary since we have it in post_handoff +%% @TODO -spec +init_handoff( + Stream, + #{parse_state := PS} = _StreamOpts, + Connection, + #{is_orphan := true, flags := Flags} +) -> + {ok, init_state(Stream, Connection, Flags, PS)}. + +%% +%% @doc Post handoff data stream +%% +%% @TODO -spec +%% +post_handoff(Stream, {PS, Serialize, Channel}, S) -> + ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}), + quicer:setopt(Stream, active, true), + {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. + +%% +%% @doc when this proc is assigned to the owner of new stream +%% +new_stream(Stream, #{flags := Flags}, Connection) -> + {ok, init_state(Stream, Connection, Flags)}. + +%% +%% @doc for local initiated stream +%% +peer_accepted(_Stream, _Flags, S) -> + %% we just ignore it + {ok, S}. + +peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> + %% we abort send with same reason + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + {ok, S}; +peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) -> + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + {ok, S}. + +peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> + %% we abort receive with same reason + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), + {ok, S}; +peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) -> + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), + {ok, S}. + +peer_send_shutdown(Stream, _Flags, S) -> + ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0), + {ok, S}. + +send_complete(_Stream, false, S) -> + {ok, S}; +send_complete(_Stream, true = _IsCanceled, S) -> + {ok, S}. + +send_shutdown_complete(_Stream, _Flags, S) -> + {ok, S}. + +start_completed(_Stream, #{status := success, stream_id := StreamId}, S) -> + {ok, S#{stream_id => StreamId}}; +start_completed(_Stream, #{status := Other}, S) -> + %% or we could retry + {stop, {start_fail, Other}, S}. + +handle_stream_data( + Stream, + Bin, + _Flags, + #{ + is_unidir := false, + channel := undefined, + data_queue := Queue, + stream := Stream + } = State +) when is_binary(Bin) -> + {ok, State#{data_queue := [Bin | Queue]}}; +handle_stream_data( + _Stream, + Bin, + _Flags, + #{ + is_unidir := false, + channel := Channel, + parse_state := PS, + data_queue := QueuedData, + task_queue := TQ + } = State +) when + Channel =/= undefined +-> + {MQTTPackets, NewPS} = parse_incoming(list_to_binary(lists:reverse([Bin | QueuedData])), PS), + NewTQ = lists:foldl( + fun(Item, Acc) -> + queue:in(Item, Acc) + end, + TQ, + [{incoming, P} || P <- lists:reverse(MQTTPackets)] + ), + {{continue, handle_appl_msg}, State#{parse_state := NewPS, task_queue := NewTQ}}. + +%% Reserved for unidi streams +%% handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := PeerStream, conn := Conn} = State) -> +%% case PeerStream of +%% undefined -> +%% {ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn, +%% [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} +%% , {is_local, true} +%% ]), +%% {ok, _} = quicer_stream:send(StreamProc, Bin), +%% {ok, State#{peer_stream := StreamProc}}; +%% StreamProc when is_pid(StreamProc) -> +%% {ok, _} = quicer_stream:send(StreamProc, Bin), +%% {ok, State} +%% end. + +passive(_Stream, undefined, S) -> + {ok, S}. + +stream_closed( + _Stream, + #{ + is_conn_shutdown := IsConnShutdown, + is_app_closing := IsAppClosing, + is_shutdown_by_app := IsAppShutdown, + is_closed_remotely := IsRemote, + status := Status, + error := Code + }, + S +) when + is_boolean(IsConnShutdown) andalso + is_boolean(IsAppClosing) andalso + is_boolean(IsAppShutdown) andalso + is_boolean(IsRemote) andalso + is_atom(Status) andalso + is_integer(Code) +-> + {stop, normal, S}. + +handle_call(Call, _From, S) -> + do_handle_call(Call, S). + +handle_continue(handle_appl_msg, #{task_queue := Q} = S) -> + case queue:out(Q) of + {{value, Item}, Q2} -> + do_handle_appl_msg(Item, S#{task_queue := Q2}); + {empty, Q} -> + {ok, S} + end. + +do_handle_appl_msg( + {outgoing, Packets}, + #{ + channel := Channel, + stream := _Stream, + serialize := _Serialize + } = S +) when + Channel =/= undefined +-> + case handle_outgoing(Packets, S) of + {ok, Size} -> + ok = emqx_metrics:inc('bytes.sent', Size), + {{continue, handle_appl_msg}, S}; + {error, E1, E2} -> + {stop, {E1, E2}, S}; + {error, E} -> + {stop, E, S} + end; +do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when + Channel =/= undefined +-> + with_channel(handle_in, [Packet], S); +do_handle_appl_msg({close, Reason}, S) -> + %% @TODO shall we abort shutdown or graceful shutdown? + with_channel(handle_info, [{sock_closed, Reason}], S); +do_handle_appl_msg({event, updated}, S) -> + %% Data stream don't care about connection state changes. + {{continue, handle_appl_msg}, S}. + +handle_info(Deliver = {deliver, _, _}, S) -> + Delivers = [Deliver], + with_channel(handle_deliver, [Delivers], S). + +with_channel(Fun, Args, #{channel := Channel, task_queue := Q} = S) when + Channel =/= undefined +-> + case apply(emqx_channel, Fun, Args ++ [Channel]) of + ok -> + {{continue, handle_appl_msg}, S}; + {ok, Msgs, NewChannel} when is_list(Msgs) -> + {{continue, handle_appl_msg}, S#{ + task_queue := queue:join(Q, queue:from_list(Msgs)), + channel := NewChannel + }}; + {ok, Msg, NewChannel} when is_record(Msg, mqtt_packet) -> + {{continue, handle_appl_msg}, S#{ + task_queue := queue:in({outgoing, Msg}, Q), channel := NewChannel + }}; + %% @FIXME WTH? + {ok, {outgoing, _} = Msg, NewChannel} -> + {{continue, handle_appl_msg}, S#{task_queue := queue:in(Msg, Q), channel := NewChannel}}; + {ok, NewChannel} -> + {{continue, handle_appl_msg}, S#{channel := NewChannel}}; + %% @TODO optimisation for shutdown wrap + {shutdown, Reason, NewChannel} -> + {stop, {shutdown, Reason}, S#{channel := NewChannel}}; + {shutdown, Reason, Msgs, NewChannel} when is_list(Msgs) -> + %% @TODO handle outgoing? + {stop, {shutdown, Reason}, S#{ + channel := NewChannel, + task_queue := queue:join(Q, queue:from_list(Msgs)) + }}; + {shutdown, Reason, Msg, NewChannel} -> + {stop, {shutdown, Reason}, S#{ + channel := NewChannel, + task_queue := queue:in(Msg, Q) + }} + end. + +%%% Internals +handle_outgoing(#mqtt_packet{} = P, S) -> + handle_outgoing([P], S); +handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir := false}) when + is_list(Packets) +-> + OutBin = [serialize_packet(P, Serialize) || P <- filter_disallowed_out(Packets)], + %% @TODO in which case shall we use sync send? + Res = quicer:async_send(Stream, OutBin), + ?TRACE("MQTT", "mqtt_packet_sent", #{packets => Packets}), + [ok = inc_outgoing_stats(P) || P <- Packets], + Res. + +serialize_packet(Packet, Serialize) -> + try emqx_frame:serialize_pkt(Packet, Serialize) of + <<>> -> + ?SLOG(warning, #{ + msg => "packet_is_discarded", + reason => "frame_is_too_large", + packet => emqx_packet:format(Packet, hidden) + }), + ok = emqx_metrics:inc('delivery.dropped.too_large'), + ok = emqx_metrics:inc('delivery.dropped'), + ok = inc_outgoing_stats({error, message_too_large}), + <<>>; + Data -> + Data + catch + %% Maybe Never happen. + throw:{?FRAME_SERIALIZE_ERROR, Reason} -> + ?SLOG(info, #{ + reason => Reason, + input_packet => Packet + }), + erlang:error({?FRAME_SERIALIZE_ERROR, Reason}); + error:Reason:Stacktrace -> + ?SLOG(error, #{ + input_packet => Packet, + exception => Reason, + stacktrace => Stacktrace + }), + erlang:error(?FRAME_SERIALIZE_ERROR) + end. + +-spec init_state( + quicer:stream_handle(), + quicer:connection_handle(), + quicer:new_stream_props() +) -> + % @TODO + map(). +init_state(Stream, Connection, OpenFlags) -> + init_state(Stream, Connection, OpenFlags, undefined). + +init_state(Stream, Connection, OpenFlags, PS) -> + %% quic stream handle + #{ + stream => Stream, + %% quic connection handle + conn => Connection, + %% if it is QUIC unidi stream + is_unidir => quicer:is_unidirectional(OpenFlags), + %% Frame Parse State + parse_state => PS, + %% Peer Stream handle in a pair for type unidir only + peer_stream => undefined, + %% if the stream is locally initiated. + is_local => false, + %% queue binary data when is NOT connected, in reversed order. + data_queue => [], + %% Channel from connection + %% `undefined' means the connection is not connected. + channel => undefined, + %% serialize opts for connection + serialize => undefined, + %% Current working queue + task_queue => queue:new() + }. + +-spec do_handle_call(term(), quicer_stream:cb_state()) -> quicer_stream:cb_ret(). +do_handle_call( + {activate, {PS, Serialize, Channel}}, + #{ + channel := undefined, + stream := Stream, + serialize := undefined + } = S +) -> + NewS = S#{channel := Channel, serialize := Serialize, parse_state := PS}, + %% We use quic protocol for flow control, and we don't check return val + case quicer:setopt(Stream, active, true) of + ok -> + {ok, NewS}; + {error, E} -> + ?SLOG(error, #{msg => "set stream active failed", error => E}), + {stop, E, NewS} + end; +do_handle_call(_Call, S) -> + {reply, {error, unimpl}, S}. + +%% @doc return reserved order of Packets +parse_incoming(Data, PS) -> + try + do_parse_incoming(Data, [], PS) + catch + throw:{?FRAME_PARSE_ERROR, Reason} -> + ?SLOG(info, #{ + reason => Reason, + input_bytes => Data + }), + {[{frame_error, Reason}], PS}; + error:Reason:Stacktrace -> + ?SLOG(error, #{ + input_bytes => Data, + reason => Reason, + stacktrace => Stacktrace + }), + {[{frame_error, Reason}], PS} + end. + +do_parse_incoming(<<>>, Packets, ParseState) -> + {Packets, ParseState}; +do_parse_incoming(Data, Packets, ParseState) -> + case emqx_frame:parse(Data, ParseState) of + {more, NParseState} -> + {Packets, NParseState}; + {ok, Packet, Rest, NParseState} -> + do_parse_incoming(Rest, [Packet | Packets], NParseState) + end. + +%% followings are copied from emqx_connection +-compile({inline, [inc_outgoing_stats/1]}). +inc_outgoing_stats({error, message_too_large}) -> + inc_counter('send_msg.dropped', 1), + inc_counter('send_msg.dropped.too_large', 1); +inc_outgoing_stats(Packet = ?PACKET(Type)) -> + inc_counter(send_pkt, 1), + case Type of + ?PUBLISH -> + inc_counter(send_msg, 1), + inc_counter(outgoing_pubs, 1), + inc_qos_stats(send_msg, Packet); + _ -> + ok + end, + emqx_metrics:inc_sent(Packet). + +inc_counter(Key, Inc) -> + _ = emqx_pd:inc_counter(Key, Inc), + ok. + +inc_qos_stats(Type, Packet) -> + case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of + undefined -> + ignore; + Key -> + inc_counter(Key, 1) + end. + +inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0'; +inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1'; +inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2'; +inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0'; +inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1'; +inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2'; +%% for bad qos +inc_qos_stats_key(_, _) -> undefined. + +filter_disallowed_out(Packets) -> + lists:filter(fun is_datastream_out_pkt/1, Packets). + +is_datastream_out_pkt(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) when + Type > 2 andalso Type < 12 +-> + true; +is_datastream_out_pkt(_) -> + false. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index d9c080c0d..70b01e643 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -78,17 +78,24 @@ -type socket_info() :: #{ is_orphan => boolean(), ctrl_stream_start_flags => quicer:stream_open_flags(), - %% quicer:new_conn_props + %% and quicer:new_conn_props() _ => _ }. --spec wait({pid(), quicer:connection_handle(), socket_info()}) -> - {ok, socket()} | {error, enotconn}. +%% for accepting +-spec wait + ({pid(), connection_handle(), socket_info()}) -> + {ok, socket()} | {error, enotconn}; + %% For handover + ({pid(), connection_handle(), stream_handle(), socket_info()}) -> + {ok, socket()} | {error, any()}. + +%%% For Accepting New Remote Stream wait({ConnOwner, Conn, ConnInfo}) -> {ok, Conn} = quicer:async_accept_stream(Conn, []), ConnOwner ! {self(), stream_acceptor_ready}, receive - %% New incoming stream, this is a *ctrl* stream + %% New incoming stream, this is a *control* stream {quic, new_stream, Stream, #{is_orphan := IsOrphan, flags := StartFlags}} -> SocketInfo = ConnInfo#{ is_orphan => IsOrphan, @@ -101,6 +108,14 @@ wait({ConnOwner, Conn, ConnInfo}) -> %% Connection owner process down {'EXIT', ConnOwner, _Reason} -> {error, enotconn} + end; +%% For ownership handover +wait({PrevOwner, Conn, Stream, SocketInfo}) -> + case quicer:wait_for_handoff(PrevOwner, Stream) of + ok -> + {ok, socket(Conn, Stream, SocketInfo)}; + owner_down -> + {error, owner_down} end. type(_) -> @@ -144,9 +159,10 @@ getopts(_Socket, _Opts) -> {buffer, 80000} ]}. -fast_close({quic, _Conn, Stream, _Info}) -> - %% Flush send buffer, gracefully shutdown - quicer:async_shutdown_stream(Stream), +fast_close({quic, Conn, _Stream, _Info}) -> + %% Since we shutdown the control stream, we shutdown the connection as well + %% @TODO supply some App Error Code + quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), ok. -spec ensure_ok_or_exit(atom(), list(term())) -> term(). @@ -187,21 +203,14 @@ peer_accepted(_Stream, undefined, S) -> {ok, S}. -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). -peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> - %% we abort send with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), - {ok, S}; -peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) -> +peer_receive_aborted(Stream, ErrorCode, S) -> quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), {ok, S}. -spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). -peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> +peer_send_aborted(Stream, ErrorCode, S) -> %% we abort receive with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), - {ok, S}; -peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) -> - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), + quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), {ok, S}. -spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret(). diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 07299bd42..0b493dff6 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -65,6 +65,7 @@ init_per_group(quic, Config) -> UdpPort = 1884, emqx_common_test_helpers:start_apps([]), emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort), + emqx_logger:set_log_level(debug), [{port, UdpPort}, {conn_fun, quic_connect} | Config]; init_per_group(_, Config) -> emqx_common_test_helpers:stop_apps([]), @@ -78,14 +79,19 @@ end_per_group(_Group, _Config) -> init_per_suite(Config) -> %% Start Apps - %% dbg:tracer(process, {fun dbg:dhandler/2,group_leader()}), - %% dbg:p(all,c), - %% dbg:tp(emqx_quic_connection,cx), - %% dbg:tp(emqx_quic_stream,cx), - %% dbg:tp(emqtt_quic,cx), - %% dbg:tp(emqtt,cx), - %% dbg:tp(emqtt_quic_stream,cx), - %% dbg:tp(emqtt_quic_connection,cx), + dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}), + dbg:p(all, c), + dbg:tp(emqx_quic_connection, cx), + dbg:tp(quicer_connection, cx), + %% dbg:tp(emqx_quic_stream, cx), + %% dbg:tp(emqtt_quic, cx), + %% dbg:tp(emqtt, cx), + %% dbg:tp(emqtt_quic_stream, cx), + %% dbg:tp(emqtt_quic_connection, cx), + %% dbg:tp(emqx_cm, open_session, cx), + %% dbg:tpl(emqx_cm, lookup_channels, cx), + %% dbg:tpl(emqx_cm, register_channel, cx), + %% dbg:tpl(emqx_cm, unregister_channel, cx), emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), Config. diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl new file mode 100644 index 000000000..bb19092f7 --- /dev/null +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -0,0 +1,190 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_quic_multistreams_SUITE). + +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TOPICS, [ + <<"TopicA">>, + <<"TopicA/B">>, + <<"Topic/C">>, + <<"TopicA/C">>, + <<"/TopicA">> +]). + +%%-------------------------------------------------------------------- +%% @spec suite() -> Info +%% Info = [tuple()] +%% @end +%%-------------------------------------------------------------------- +suite() -> + [{timetrap, {seconds, 30}}]. + +%%-------------------------------------------------------------------- +%% @spec init_per_suite(Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%% @end +%%-------------------------------------------------------------------- +init_per_suite(Config) -> + UdpPort = 1884, + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort), + %% @TODO remove + emqx_logger:set_log_level(debug), + + dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}), + dbg:p(all, c), + + %dbg:tp(emqx_quic_stream, cx), + %% dbg:tp(quicer_stream, cx), + %% dbg:tp(emqx_quic_data_stream, cx), + %% dbg:tp(emqx_channel, cx), + %% dbg:tp(emqx_packet,check,cx), + %% dbg:tp(emqx_frame,parse,cx), + %dbg:tp(emqx_quic_connection, cx), + [{port, UdpPort}, {conn_fun, quic_connect} | Config]. + +%%-------------------------------------------------------------------- +%% @spec end_per_suite(Config0) -> term() | {save_config,Config1} +%% Config0 = Config1 = [tuple()] +%% @end +%%-------------------------------------------------------------------- +end_per_suite(_Config) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec init_per_group(GroupName, Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% GroupName = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%% @end +%%-------------------------------------------------------------------- +init_per_group(_GroupName, Config) -> + Config. + +%%-------------------------------------------------------------------- +%% @spec end_per_group(GroupName, Config0) -> +%% term() | {save_config,Config1} +%% GroupName = atom() +%% Config0 = Config1 = [tuple()] +%% @end +%%-------------------------------------------------------------------- +end_per_group(_GroupName, _Config) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec init_per_testcase(TestCase, Config0) -> +%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} +%% TestCase = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%% @end +%%-------------------------------------------------------------------- +init_per_testcase(_TestCase, Config) -> + Config. + +%%-------------------------------------------------------------------- +%% @spec end_per_testcase(TestCase, Config0) -> +%% term() | {save_config,Config1} | {fail,Reason} +%% TestCase = atom() +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%% @end +%%-------------------------------------------------------------------- +end_per_testcase(_TestCase, _Config) -> + ok. + +%%-------------------------------------------------------------------- +%% @spec groups() -> [Group] +%% Group = {GroupName,Properties,GroupsAndTestCases} +%% GroupName = atom() +%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}] +%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase] +%% TestCase = atom() +%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}} +%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail | +%% repeat_until_any_ok | repeat_until_any_fail +%% N = integer() | forever +%% @end +%%-------------------------------------------------------------------- +groups() -> + []. + +%%-------------------------------------------------------------------- +%% @spec all() -> GroupsAndTestCases | {skip,Reason} +%% GroupsAndTestCases = [{group,GroupName} | TestCase] +%% GroupName = atom() +%% TestCase = atom() +%% Reason = term() +%% @end +%%-------------------------------------------------------------------- +all() -> + [ + tc_data_stream_sub + ]. + +%%-------------------------------------------------------------------- +%% @spec TestCase(Config0) -> +%% ok | exit() | {skip,Reason} | {comment,Comment} | +%% {save_config,Config1} | {skip_and_save,Reason,Config1} +%% Config0 = Config1 = [tuple()] +%% Reason = term() +%% Comment = term() +%% @end +%%-------------------------------------------------------------------- + +%% @doc Test MQTT Subscribe via data_stream +tc_data_stream_sub(Config) -> + Topic = lists:nth(1, ?TOPICS), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [1]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [{Topic, [{qos, qos1}]}]), + {ok, _, [2]} = emqtt:subscribe_via( + C, + {new_data_stream, []}, + #{}, + [{lists:nth(2, ?TOPICS), [{qos, qos2}]}] + ), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2 1">>, 2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2 2">>, 2), + {ok, _} = emqtt:publish(C, Topic, <<"qos 2 3">>, 2), + Msgs = receive_messages(3), + ct:pal("recv msg: ~p", [Msgs]), + ?assertEqual(3, length(Msgs)), + ok = emqtt:disconnect(C). + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 1000 -> + Msgs + end.