From 00f615a1e33289d870d7ffada7d17952b05bcf95 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 9 Jan 2023 09:17:03 +0100 Subject: [PATCH] chore(quic): clean code --- apps/emqx/include/emqx_quic.hrl | 2 +- apps/emqx/src/emqx_connection.erl | 14 +- apps/emqx/src/emqx_quic_connection.erl | 43 +- apps/emqx/src/emqx_quic_data_stream.erl | 119 +- apps/emqx/src/emqx_quic_stream.erl | 77 +- apps/emqx/test/emqtt_quic_SUITE.erl | 1706 --------------- .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 13 - .../test/emqx_quic_multistreams_SUITE.erl | 1880 +++++++++++++++-- 8 files changed, 1817 insertions(+), 2037 deletions(-) delete mode 100644 apps/emqx/test/emqtt_quic_SUITE.erl diff --git a/apps/emqx/include/emqx_quic.hrl b/apps/emqx/include/emqx_quic.hrl index 302f2704d..3366b8938 100644 --- a/apps/emqx/include/emqx_quic.hrl +++ b/apps/emqx/include/emqx_quic.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 980c41010..be420d65e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -119,10 +119,7 @@ limiter_timer :: undefined | reference(), %% QUIC conn pid if is a pid - quic_conn_pid :: maybe(pid()), - - %% QUIC control stream callback state - quic_ctrl_state :: map() + quic_conn_pid :: maybe(pid()) }). -record(retry, { @@ -378,8 +375,7 @@ init_state( limiter_buffer = queue:new(), limiter_timer = undefined, %% for quic streams to inherit - quic_conn_pid = maps:get(conn_pid, Opts, undefined), - quic_ctrl_state = #{} + quic_conn_pid = maps:get(conn_pid, Opts, undefined) }. run_loop( @@ -928,12 +924,6 @@ handle_info({sock_error, Reason}, State) -> handle_info({sock_closed, Reason}, close_socket(State)); handle_info({quic, Event, Handle, Prop}, State) -> emqx_quic_stream:Event(Handle, Prop, State); -%% handle_info({quic, peer_send_shutdown, _Stream}, State) -> -%% handle_info({sock_closed, force}, close_socket(State)); -%% handle_info({quic, closed, _Channel, ReasonFlag}, State) -> -%% handle_info({sock_closed, ReasonFlag}, State); -%% handle_info({quic, closed, _Stream}, State) -> -%% handle_info({sock_closed, force}, State); handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index ef0d9b2e3..69d16cbc3 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -17,13 +17,11 @@ %% @doc impl. the quic connection owner process. -module(emqx_quic_connection). --include("logger.hrl"). -ifndef(BUILD_WITHOUT_QUIC). + +-include("logger.hrl"). -include_lib("quicer/include/quicer.hrl"). -include_lib("emqx/include/emqx_quic.hrl"). --else. --define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0). --endif. -behavior(quicer_connection). @@ -55,10 +53,9 @@ %% Pid of ctrl stream ctrl_pid := undefined | pid(), %% quic connecion handle - conn := undefined | quicer:conneciton_hanlder(), - %% streams that handoff from this process, excluding control stream - %% these streams could die/closed without effecting the connecion/session. - + conn := undefined | quicer:conneciton_handle(), + %% Data streams that handoff from this process + %% these streams could die/close without effecting the connecion/session. %@TODO type? streams := [{pid(), quicer:stream_handle()}], %% New stream opts @@ -82,22 +79,20 @@ activate_data_streams(ConnOwner, {PS, Serialize, Channel}) -> gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity). %% @doc conneciton owner init callback --spec init(map() | list()) -> {ok, cb_state()}. -init(ConnOpts) when is_list(ConnOpts) -> - init(maps:from_list(ConnOpts)); +-spec init(map()) -> {ok, cb_state()}. init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)}); init(ConnOpts) when is_map(ConnOpts) -> {ok, init_cb_state(ConnOpts)}. --spec closed(quicer:conneciton_hanlder(), quicer:conn_closed_props(), cb_state()) -> +-spec closed(quicer:conneciton_handle(), quicer:conn_closed_props(), cb_state()) -> {stop, normal, cb_state()}. closed(_Conn, #{is_peer_acked := _} = Prop, S) -> ?SLOG(debug, Prop), {stop, normal, S}. %% @doc handle the new incoming connecion as the connecion acceptor. --spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) -> +-spec new_conn(quicer:connection_handle(), quicer:new_conn_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. new_conn( Conn, @@ -133,7 +128,7 @@ new_conn( end. %% @doc callback when connection is connected. --spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) -> +-spec connected(quicer:connection_handle(), quicer:connected_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. connected(_Conn, Props, S) -> ?SLOG(debug, Props), @@ -185,21 +180,21 @@ new_stream( Props ), quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}), - %% @TODO keep them in ``inactive_streams' + %% @TODO maybe keep them in `inactive_streams' {ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}. -%% @doc callback for handling for remote connecion shutdown. +%% @doc callback for handling remote connecion shutdown. -spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret(). -shutdown(Conn, _ErrorCode, S) -> - %% @TODO check spec what to set for the ErrorCode? +shutdown(Conn, ErrorCode, S) -> + ErrorCode =/= 0 andalso ?SLOG(debug, #{error_code => ErrorCode, state => S}), quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. -%% @doc callback for handling for transport error, such as idle timeout +%% @doc callback for handling transport error, such as idle timeout -spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) -> cb_ret(). -transport_shutdown(_C, _DownInfo, S) -> - %% @TODO some counter +transport_shutdown(_C, DownInfo, S) when is_map(DownInfo) -> + ?SLOG(debug, DownInfo), {ok, S}. %% @doc callback for handling for peer addr changed. @@ -238,6 +233,7 @@ peer_needs_streams(_C, undefined, S) -> {ok, S}. %% @doc handle API calls +-spec handle_call(Req :: term(), gen_server:from(), cb_state()) -> cb_ret(). handle_call( {activate_data_streams, {PS, Serialize, Channel} = ActivateData}, _From, @@ -256,7 +252,6 @@ handle_call(_Req, _From, S) -> {reply, {error, unimpl}, S}. %% @doc handle DOWN messages from streams. -%% @TODO handle DOWN from supervisor? handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) -> case Reason of normal -> @@ -302,3 +297,7 @@ init_cb_state(#{zone := _Zone} = Map) -> serialize => undefined, is_resumed => false }. + +%% BUILD_WITHOUT_QUIC +-else. +-endif. diff --git a/apps/emqx/src/emqx_quic_data_stream.erl b/apps/emqx/src/emqx_quic_data_stream.erl index 2aa3ad4f7..e3f6b7adc 100644 --- a/apps/emqx/src/emqx_quic_data_stream.erl +++ b/apps/emqx/src/emqx_quic_data_stream.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,11 +21,14 @@ %% -module(emqx_quic_data_stream). + +-ifndef(BUILD_WITHOUT_QUIC). +-behaviour(quicer_remote_stream). + -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("quicer/include/quicer.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). --behaviour(quicer_remote_stream). %% Connection Callbacks -export([ @@ -37,12 +40,12 @@ peer_receive_aborted/3, send_shutdown_complete/3, stream_closed/3, - peer_accepted/3, passive/3 ]). -export([handle_stream_data/4]). +%% gen_server API -export([activate_data/2]). -export([ @@ -51,9 +54,19 @@ handle_continue/2 ]). +-type cb_ret() :: quicer_stream:cb_ret(). +-type cb_state() :: quicer_stream:cb_state(). +-type error_code() :: quicer:error_code(). +-type connection_handle() :: quicer:connection_handle(). +-type stream_handle() :: quicer:stream_handle(). +-type handoff_data() :: { + emqx_frame:parse_state() | undefined, + emqx_frame:serialize_opts() | undefined, + emqx_channel:channel() | undefined +}. %% %% @doc Activate the data handling. -%% Data handling is disabled before control stream allows the data processing. +%% Note, data handling is disabled before finishing the validation over control stream. -spec activate_data(pid(), { emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel() }) -> ok. @@ -61,9 +74,12 @@ activate_data(StreamPid, {PS, Serialize, Channel}) -> gen_server:call(StreamPid, {activate, {PS, Serialize, Channel}}, infinity). %% -%% @doc Handoff from previous owner, mostly from the connection owner. -%% @TODO parse_state doesn't look necessary since we have it in post_handoff -%% @TODO -spec +%% @doc Handoff from previous owner, from the connection owner. +%% Note, unlike control stream, there is no acceptor for data streams. +%% The connection owner get new stream, spawn new proc and then handover to it. +%% +-spec init_handoff(stream_handle(), map(), connection_handle(), quicer:new_stream_props()) -> + {ok, cb_state()}. init_handoff( Stream, _StreamOpts, @@ -75,10 +91,9 @@ init_handoff( %% %% @doc Post handoff data stream %% -%% @TODO -spec -%% +-spec post_handoff(stream_handle(), handoff_data(), cb_state()) -> cb_ret(). post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Channel}, S) -> - %% Channel isn't ready yet. + %% When the channel isn't ready yet. %% Data stream should wait for activate call with ?MODULE:activate_data/2 {ok, S}; post_handoff(Stream, {PS, Serialize, Channel}, S) -> @@ -86,53 +101,35 @@ post_handoff(Stream, {PS, Serialize, Channel}, S) -> quicer:setopt(Stream, active, 10), {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. -%% -%% @doc for local initiated stream -%% -peer_accepted(_Stream, _Flags, S) -> - %% we just ignore it - {ok, S}. - -peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> +-spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret(). +peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) -> %% we abort send with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), - {ok, S}; -peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) -> quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), {ok, S}. -peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) -> +-spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret(). +peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) -> %% we abort receive with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), - {ok, S}; -peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) -> quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), {ok, S}. -peer_send_shutdown(Stream, _Flags, S) -> +-spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret(). +peer_send_shutdown(Stream, undefined, S) -> ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0), {ok, S}. +-spec send_complete(stream_handle(), IsCanceled :: boolean(), cb_state()) -> cb_ret(). send_complete(_Stream, false, S) -> {ok, S}; send_complete(_Stream, true = _IsCanceled, S) -> {ok, S}. +-spec send_shutdown_complete(stream_handle(), error_code(), cb_state()) -> cb_ret(). send_shutdown_complete(_Stream, _Flags, S) -> {ok, S}. -handle_stream_data( - Stream, - Bin, - _Flags, - #{ - is_unidir := false, - channel := undefined, - data_queue := Queue, - stream := Stream - } = State -) when is_binary(Bin) -> - {ok, State#{data_queue := [Bin | Queue]}}; +-spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_state()) -> + cb_ret(). handle_stream_data( _Stream, Bin, @@ -145,6 +142,7 @@ handle_stream_data( task_queue := TQ } = State ) when + %% assert get stream data only after channel is created Channel =/= undefined -> {MQTTPackets, NewPS} = parse_incoming(list_to_binary(lists:reverse([Bin | QueuedData])), PS), @@ -157,25 +155,12 @@ handle_stream_data( ), {{continue, handle_appl_msg}, State#{parse_state := NewPS, task_queue := NewTQ}}. -%% Reserved for unidi streams -%% handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := PeerStream, conn := Conn} = State) -> -%% case PeerStream of -%% undefined -> -%% {ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn, -%% [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL} -%% , {is_local, true} -%% ]), -%% {ok, _} = quicer_stream:send(StreamProc, Bin), -%% {ok, State#{peer_stream := StreamProc}}; -%% StreamProc when is_pid(StreamProc) -> -%% {ok, _} = quicer_stream:send(StreamProc, Bin), -%% {ok, State} -%% end. - +-spec passive(stream_handle(), undefined, cb_state()) -> cb_ret(). passive(Stream, undefined, S) -> quicer:setopt(Stream, active, 10), {ok, S}. +-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret(). stream_closed( _Stream, #{ @@ -197,28 +182,20 @@ stream_closed( -> {stop, normal, S}. +-spec handle_call(Request :: term(), From :: {pid(), term()}, cb_state()) -> cb_ret(). handle_call(Call, _From, S) -> - case do_handle_call(Call, S) of - {ok, NewS} -> - {reply, ok, NewS}; - {error, Reason, NewS} -> - {reply, {error, Reason}, NewS}; - {{continue, _} = Cont, NewS} -> - {reply, ok, NewS, Cont}; - {hibernate, NewS} -> - {reply, ok, NewS, hibernate}; - {stop, Reason, NewS} -> - {stop, Reason, {stopped, Reason}, NewS} - end. + do_handle_call(Call, S). +-spec handle_continue(Continue :: term(), cb_state()) -> cb_ret(). handle_continue(handle_appl_msg, #{task_queue := Q} = S) -> case queue:out(Q) of {{value, Item}, Q2} -> do_handle_appl_msg(Item, S#{task_queue := Q2}); - {empty, Q} -> + {empty, _Q} -> {ok, S} end. +%%% Internals do_handle_appl_msg( {outgoing, Packets}, #{ @@ -248,7 +225,7 @@ do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) -> with_channel(handle_in, [FE], S); do_handle_appl_msg({close, Reason}, S) -> - %% @TODO shall we abort shutdown or graceful shutdown? + %% @TODO shall we abort shutdown or graceful shutdown here? with_channel(handle_info, [{sock_closed, Reason}], S); do_handle_appl_msg({event, updated}, S) -> %% Data stream don't care about connection state changes. @@ -294,7 +271,6 @@ with_channel(Fun, Args, #{channel := Channel, task_queue := Q} = S) when }} end. -%%% Internals handle_outgoing(#mqtt_packet{} = P, S) -> handle_outgoing([P], S); handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir := false}) when @@ -373,7 +349,7 @@ init_state(Stream, Connection, OpenFlags, PS) -> task_queue => queue:new() }. --spec do_handle_call(term(), quicer_stream:cb_state()) -> quicer_stream:cb_ret(). +-spec do_handle_call(term(), cb_state()) -> cb_ret(). do_handle_call( {activate, {PS, Serialize, Channel}}, #{ @@ -386,7 +362,7 @@ do_handle_call( %% We use quic protocol for flow control, and we don't check return val case quicer:setopt(Stream, active, true) of ok -> - {ok, NewS}; + {reply, ok, NewS}; {error, E} -> ?SLOG(error, #{msg => "set stream active failed", error => E}), {stop, E, NewS} @@ -484,3 +460,6 @@ is_datastream_out_pkt(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) w true; is_datastream_out_pkt(_) -> false. +%% BUILD_WITHOUT_QUIC +-else. +-endif. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 88cf4b7c3..a8ef7d41d 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -17,8 +17,12 @@ %% MQTT/QUIC Stream -module(emqx_quic_stream). +-ifndef(BUILD_WITHOUT_QUIC). + -behaviour(quicer_remote_stream). +-include("logger.hrl"). + %% emqx transport Callbacks -export([ type/1, @@ -33,31 +37,14 @@ sockname/1, peercert/1 ]). - --include("logger.hrl"). --ifndef(BUILD_WITHOUT_QUIC). -include_lib("quicer/include/quicer.hrl"). --else. -%% STREAM SHUTDOWN FLAGS --define(QUIC_STREAM_SHUTDOWN_FLAG_NONE, 0). -% Cleanly closes the send path. --define(QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1). -% Abruptly closes the send path. --define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND, 2). -% Abruptly closes the receive path. --define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, 4). -% Abruptly closes both send and receive paths. --define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 6). --define(QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, 8). --endif. --type cb_ret() :: gen_statem:event_handler_result(). --type cb_data() :: emqtt_quic:cb_data(). +-type cb_ret() :: quicer_stream:cb_ret(). +-type cb_data() :: quicer_stream:cb_state(). -type connection_handle() :: quicer:connection_handle(). -type stream_handle() :: quicer:stream_handle(). -export([ - new_stream/3, send_complete/3, peer_send_shutdown/3, peer_send_aborted/3, @@ -79,13 +66,8 @@ }. %% for accepting --spec wait - ({pid(), connection_handle(), socket_info()}) -> - {ok, socket()} | {error, enotconn}; - %% For handover - ({pid(), connection_handle(), stream_handle(), socket_info()}) -> - {ok, socket()} | {error, any()}. - +-spec wait({pid(), connection_handle(), socket_info()}) -> + {ok, socket()} | {error, enotconn}. %%% For Accepting New Remote Stream wait({ConnOwner, Conn, ConnInfo}) -> {ok, Conn} = quicer:async_accept_stream(Conn, []), @@ -105,15 +87,8 @@ wait({ConnOwner, Conn, ConnInfo}) -> {'EXIT', ConnOwner, _Reason} -> {error, enotconn} end. -%% UNUSED, for ownership handover, -%% wait({PrevOwner, Conn, Stream, SocketInfo}) -> -%% case quicer:wait_for_handoff(PrevOwner, Stream) of -%% ok -> -%% {ok, socket(Conn, Stream, SocketInfo)}; -%% owner_down -> -%% {error, owner_down} -%% end. +-spec type(_) -> quic. type(_) -> quic. @@ -155,7 +130,7 @@ getopts(_Socket, _Opts) -> {buffer, 80000} ]}. -%% @TODO supply some App Error Code +%% @TODO supply some App Error Code from caller fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) -> %% handshake aborted. quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), @@ -185,15 +160,13 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> async_send({quic, _Conn, Stream, _Info}, Data, _Options) -> case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of {ok, _Len} -> ok; + {error, X, Y} -> {error, {X, Y}}; Other -> Other end. %%% %%% quicer stream callbacks %%% --spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret(). -new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) -> - {stop, unimpl}. -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). peer_receive_aborted(Stream, ErrorCode, S) -> @@ -222,28 +195,12 @@ send_complete(_Stream, true = _IsCancelled, S) -> send_shutdown_complete(_Stream, _IsGraceful, S) -> {ok, S}. -%% Local stream, Unidir -%% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data()) -%% -> cb_ret(). -%% handle_stream_data(Stream, Bin, Flags, #{ is_local := true -%% , parse_state := PS} = S) -> -%% ?SLOG(debug, #{data => Bin}, Flags), -%% case parse(Bin, PS, []) of -%% {keep_state, NewPS, Packets} -> -%% quicer:setopt(Stream, active, once), -%% {keep_state, S#{parse_state := NewPS}, -%% [{next_event, cast, P } || P <- lists:reverse(Packets)]}; -%% {stop, _} = Stop -> -%% Stop -%% end; -%% %% Remote stream -%% handle_stream_data(_Stream, _Bin, _Flags, -%% #{is_local := false, is_unidir := true, conn := _Conn} = _S) -> -%% {stop, unimpl}. - -spec passive(stream_handle(), undefined, cb_data()) -> cb_ret(). passive(Stream, undefined, S) -> - quicer:setopt(Stream, active, 10), + case quicer:setopt(Stream, active, 10) of + ok -> ok; + Error -> ?SLOG(error, #{message => "set active error", error => Error}) + end, {ok, S}. -spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> cb_ret(). @@ -277,3 +234,7 @@ stream_closed( -spec socket(connection_handle(), stream_handle(), socket_info()) -> socket(). socket(Conn, CtrlStream, Info) when is_map(Info) -> {quic, Conn, CtrlStream, Info}. + +%% BUILD_WITHOUT_QUIC +-else. +-endif. diff --git a/apps/emqx/test/emqtt_quic_SUITE.erl b/apps/emqx/test/emqtt_quic_SUITE.erl deleted file mode 100644 index f926c2f3e..000000000 --- a/apps/emqx/test/emqtt_quic_SUITE.erl +++ /dev/null @@ -1,1706 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqtt_quic_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). --include_lib("quicer/include/quicer.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - -suite() -> - [{timetrap, {seconds, 30}}]. - -all() -> - [ - {group, mstream}, - {group, shutdown}, - {group, misc} - ]. - -groups() -> - [ - {mstream, [], [{group, profiles}]}, - - {profiles, [], [ - {group, profile_low_latency}, - {group, profile_max_throughput} - ]}, - {profile_low_latency, [], [ - {group, pub_qos0}, - {group, pub_qos1}, - {group, pub_qos2} - ]}, - {profile_max_throughput, [], [ - {group, pub_qos0}, - {group, pub_qos1}, - {group, pub_qos2} - ]}, - {pub_qos0, [], [ - {group, sub_qos0}, - {group, sub_qos1}, - {group, sub_qos2} - ]}, - {pub_qos1, [], [ - {group, sub_qos0}, - {group, sub_qos1}, - {group, sub_qos2} - ]}, - {pub_qos2, [], [ - {group, sub_qos0}, - {group, sub_qos1}, - {group, sub_qos2} - ]}, - {sub_qos0, [{group, qos}]}, - {sub_qos1, [{group, qos}]}, - {sub_qos2, [{group, qos}]}, - {qos, [ - t_multi_streams_sub, - t_multi_streams_pub_5x100, - t_multi_streams_pub_parallel, - t_multi_streams_pub_parallel_no_blocking, - t_multi_streams_sub_pub_async, - t_multi_streams_sub_pub_sync, - t_multi_streams_unsub, - t_multi_streams_corr_topic, - t_multi_streams_unsub_via_other, - t_multi_streams_dup_sub, - t_multi_streams_packet_boundary, - t_multi_streams_packet_malform, - t_multi_streams_kill_sub_stream, - t_multi_streams_packet_too_large, - t_multi_streams_sub_0_rtt, - t_multi_streams_sub_0_rtt_large_payload, - t_multi_streams_sub_0_rtt_stream_data_cont, - t_conn_change_client_addr - ]}, - - {shutdown, [ - {group, graceful_shutdown}, - {group, abort_recv_shutdown}, - {group, abort_send_shutdown}, - {group, abort_send_recv_shutdown} - ]}, - - {graceful_shutdown, [ - {group, ctrl_stream_shutdown}, - {group, data_stream_shutdown} - ]}, - {abort_recv_shutdown, [ - {group, ctrl_stream_shutdown}, - {group, data_stream_shutdown} - ]}, - {abort_send_shutdown, [ - {group, ctrl_stream_shutdown}, - {group, data_stream_shutdown} - ]}, - {abort_send_recv_shutdown, [ - {group, ctrl_stream_shutdown}, - {group, data_stream_shutdown} - ]}, - - {ctrl_stream_shutdown, [ - t_multi_streams_shutdown_ctrl_stream, - t_multi_streams_shutdown_ctrl_stream_then_reconnect, - t_multi_streams_remote_shutdown, - t_multi_streams_remote_shutdown_with_reconnect - ]}, - - {data_stream_shutdown, [t_multi_streams_shutdown_data_stream]}, - {misc, [ - t_conn_silent_close, - t_client_conn_bump_streams, - t_olp_true, - t_olp_reject, - t_conn_resume, - t_conn_without_ctrl_stream - ]} - ]. - -init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([]), - UdpPort = 14567, - start_emqx_quic(UdpPort), - %% dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}), - %% dbg:p(all, c), - %% dbg:tp(emqx_quic_connection, cx), - %% dbg:tp(emqx_quic_stream, cx), - %% dbg:tp(emqtt, cx), - %% dbg:tpl(emqtt_quic_stream, cx), - %% dbg:tpl(emqx_quic_stream, cx), - %% dbg:tpl(emqx_quic_data_stream, cx), - %% dbg:tpl(emqtt, cx), - [{port, UdpPort}, {pub_qos, 0}, {sub_qos, 0} | Config]. - -end_per_suite(_) -> - ok. - -init_per_group(pub_qos0, Config) -> - [{pub_qos, 0} | Config]; -init_per_group(sub_qos0, Config) -> - [{sub_qos, 0} | Config]; -init_per_group(pub_qos1, Config) -> - [{pub_qos, 1} | Config]; -init_per_group(sub_qos1, Config) -> - [{sub_qos, 1} | Config]; -init_per_group(pub_qos2, Config) -> - [{pub_qos, 2} | Config]; -init_per_group(sub_qos2, Config) -> - [{sub_qos, 2} | Config]; -init_per_group(abort_send_shutdown, Config) -> - [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND} | Config]; -init_per_group(abort_recv_shutdown, Config) -> - [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE} | Config]; -init_per_group(abort_send_recv_shutdown, Config) -> - [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT} | Config]; -init_per_group(graceful_shutdown, Config) -> - [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL} | Config]; -init_per_group(profile_max_throughput, Config) -> - quicer:reg_open(quic_execution_profile_type_max_throughput), - Config; -init_per_group(profile_low_latency, Config) -> - quicer:reg_open(quic_execution_profile_low_latency), - Config; -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(_, Config) -> - emqx_common_test_helpers:start_apps([]), - Config. - -t_quic_sock(Config) -> - Port = 4567, - SslOpts = [ - {cert, certfile(Config)}, - {key, keyfile(Config)}, - {idle_timeout_ms, 10000}, - % QUIC_SERVER_RESUME_AND_ZERORTT - {server_resumption_level, 2}, - {peer_bidi_stream_count, 10}, - {alpn, ["mqtt"]} - ], - Server = quic_server:start_link(Port, SslOpts), - timer:sleep(500), - {ok, Sock} = emqtt_quic:connect( - "localhost", - Port, - [{alpn, ["mqtt"]}, {active, false}], - 3000 - ), - send_and_recv_with(Sock), - ok = emqtt_quic:close(Sock), - quic_server:stop(Server). - -t_quic_sock_fail(_Config) -> - Port = 4567, - Error1 = - {error, - {transport_down, #{ - error => 2, - status => connection_refused - }}}, - Error2 = {error, {transport_down, #{error => 1, status => unreachable}}}, - case - emqtt_quic:connect( - "localhost", - Port, - [{alpn, ["mqtt"]}, {active, false}], - 3000 - ) - of - Error1 -> - ok; - Error2 -> - ok; - Other -> - ct:fail("unexpected return ~p", [Other]) - end. - -t_0_rtt(Config) -> - Port = 4568, - SslOpts = [ - {cert, certfile(Config)}, - {key, keyfile(Config)}, - {idle_timeout_ms, 10000}, - % QUIC_SERVER_RESUME_AND_ZERORTT - {server_resumption_level, 2}, - {peer_bidi_stream_count, 10}, - {alpn, ["mqtt"]} - ], - Server = quic_server:start_link(Port, SslOpts), - timer:sleep(500), - {ok, {quic, Conn, _Stream} = Sock} = emqtt_quic:connect( - "localhost", - Port, - [ - {alpn, ["mqtt"]}, - {active, false}, - {quic_event_mask, 1} - ], - 3000 - ), - send_and_recv_with(Sock), - ok = emqtt_quic:close(Sock), - NST = - receive - {quic, nst_received, Conn, Ticket} -> - Ticket - end, - {ok, Sock2} = emqtt_quic:connect( - "localhost", - Port, - [ - {alpn, ["mqtt"]}, - {active, false}, - {nst, NST} - ], - 3000 - ), - send_and_recv_with(Sock2), - ok = emqtt_quic:close(Sock2), - quic_server:stop(Server). - -t_0_rtt_fail(Config) -> - Port = 4569, - SslOpts = [ - {cert, certfile(Config)}, - {key, keyfile(Config)}, - {idle_timeout_ms, 10000}, - % QUIC_SERVER_RESUME_AND_ZERORTT - {server_resumption_level, 2}, - {peer_bidi_stream_count, 10}, - {alpn, ["mqtt"]} - ], - Server = quic_server:start_link(Port, SslOpts), - timer:sleep(500), - {ok, {quic, Conn, _Stream} = Sock} = emqtt_quic:connect( - "localhost", - Port, - [ - {alpn, ["mqtt"]}, - {active, false}, - {quic_event_mask, 1} - ], - 3000 - ), - send_and_recv_with(Sock), - ok = emqtt_quic:close(Sock), - <<_Head:16, Left/binary>> = - receive - {quic, nst_received, Conn, Ticket} when is_binary(Ticket) -> - Ticket - end, - - Error = {error, {not_found, invalid_parameter}}, - Error = emqtt_quic:connect( - "localhost", - Port, - [ - {alpn, ["mqtt"]}, - {active, false}, - {nst, Left} - ], - 3000 - ), - quic_server:stop(Server). - -t_multi_streams_sub(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - case emqtt:publish(C, Topic, <<"qos 2 1">>, PubQos) of - ok when PubQos == 0 -> ok; - {ok, _} -> ok - end, - receive - {publish, #{ - client_pid := C, - payload := <<"qos 2 1">>, - qos := RecQos, - topic := Topic - }} -> - ok; - Other -> - ct:fail("unexpected recv ~p", [Other]) - after 100 -> - ct:fail("not received") - end, - ok = emqtt:disconnect(C). - -t_multi_streams_pub_5x100(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - - PubVias = lists:map( - fun(_N) -> - {ok, Via} = emqtt:start_data_stream(C, []), - Via - end, - lists:seq(1, 5) - ), - [ - begin - case emqtt:publish_via(C, PVia, Topic, #{}, <<"stream data ", N>>, [{qos, PubQos}]) of - ok when PubQos == 0 -> ok; - {ok, _} -> ok - end, - 0 == (N rem 10) andalso timer:sleep(10) - end - || N <- lists:seq(1, 100), PVia <- PubVias - ], - ?assert(timeout =/= recv_pub(500)), - ok = emqtt:disconnect(C). - -t_multi_streams_pub_parallel(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - PktId2 = calc_pkt_id(RecQos, 2), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - <<"stream data 1">>, - [{qos, PubQos}], - undefined - ), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - <<"stream data 2">>, - [{qos, PubQos}], - undefined - ), - PubRecvs = recv_pub(2), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data", _/binary>>, - qos := RecQos, - topic := Topic - }}, - {publish, #{ - client_pid := C, - packet_id := PktId2, - payload := <<"stream data", _/binary>>, - qos := RecQos, - topic := Topic - }} - ], - PubRecvs - ), - Payloads = [P || {publish, #{payload := P}} <- PubRecvs], - ?assert( - [<<"stream data 1">>, <<"stream data 2">>] == Payloads orelse - [<<"stream data 2">>, <<"stream data 1">>] == Payloads - ), - ok = emqtt:disconnect(C). - -%% @doc test two pub streams, one send incomplete MQTT packet() can not block another. -t_multi_streams_pub_parallel_no_blocking(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId2 = calc_pkt_id(RecQos, 1), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), - Drop = <<"stream data 1">>, - meck:new(emqtt_quic, [passthrough, no_history]), - meck:expect(emqtt_quic, send, fun(Sock, IoList) -> - case lists:last(IoList) == Drop of - true -> - ct:pal("meck droping ~p", [Drop]), - meck:passthrough([Sock, IoList -- [Drop]]); - false -> - meck:passthrough([Sock, IoList]) - end - end), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - Drop, - [{qos, PubQos}], - undefined - ), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - <<"stream data 2">>, - [{qos, PubQos}], - undefined - ), - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId2, - payload := <<"stream data 2">>, - qos := RecQos, - topic := Topic - }} - ], - PubRecvs - ), - meck:unload(emqtt_quic), - ?assertEqual(timeout, recv_pub(1)), - ok = emqtt:disconnect(C). - -t_multi_streams_packet_boundary(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - PktId2 = calc_pkt_id(RecQos, 2), - PktId3 = calc_pkt_id(RecQos, 3), - Topic = atom_to_binary(?FUNCTION_NAME), - - %% make quicer to batch job - quicer:reg_open(quic_execution_profile_type_max_throughput), - - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), - - {ok, PubVia} = emqtt:start_data_stream(C, []), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - <<"stream data 1">>, - [{qos, PubQos}], - undefined - ), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - <<"stream data 2">>, - [{qos, PubQos}], - undefined - ), - LargePart3 = binary:copy(<<"stream data3">>, 2000), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - LargePart3, - [{qos, PubQos}], - undefined - ), - PubRecvs = recv_pub(3), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data 1">>, - qos := RecQos, - topic := Topic - }}, - {publish, #{ - client_pid := C, - packet_id := PktId2, - payload := <<"stream data 2">>, - qos := RecQos, - topic := Topic - }}, - {publish, #{ - client_pid := C, - packet_id := PktId3, - payload := LargePart3, - qos := RecQos, - topic := Topic - }} - ], - PubRecvs - ), - ok = emqtt:disconnect(C). - -%% @doc test that one malformed stream will not close the entire connection -t_multi_streams_packet_malform(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - PktId2 = calc_pkt_id(RecQos, 2), - PktId3 = calc_pkt_id(RecQos, 3), - Topic = atom_to_binary(?FUNCTION_NAME), - - %% make quicer to batch job - quicer:reg_open(quic_execution_profile_type_max_throughput), - - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), - - {ok, PubVia} = emqtt:start_data_stream(C, []), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - <<"stream data 1">>, - [{qos, PubQos}], - undefined - ), - - {ok, {quic, _Conn, MalformStream}} = emqtt:start_data_stream(C, []), - {ok, _} = quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>), - - ok = emqtt:publish_async( - C, - PubVia, - Topic, - <<"stream data 2">>, - [{qos, PubQos}], - undefined - ), - LargePart3 = binary:copy(<<"stream data3">>, 2000), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - LargePart3, - [{qos, PubQos}], - undefined - ), - PubRecvs = recv_pub(3), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data 1">>, - qos := RecQos, - topic := Topic - }}, - {publish, #{ - client_pid := C, - packet_id := PktId2, - payload := <<"stream data 2">>, - qos := RecQos, - topic := Topic - }}, - {publish, #{ - client_pid := C, - packet_id := PktId3, - payload := LargePart3, - qos := RecQos, - topic := Topic - }} - ], - PubRecvs - ), - - case quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>) of - {ok, 10} -> ok; - {error, cancelled} -> ok; - {error, stm_send_error, aborted} -> ok - end, - - timer:sleep(200), - ?assert(is_list(emqtt:info(C))), - - {error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>), - - timer:sleep(200), - ?assert(is_list(emqtt:info(C))), - - ok = emqtt:disconnect(C). - -t_multi_streams_packet_too_large(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - Topic = atom_to_binary(?FUNCTION_NAME), - meck:new(emqx_frame, [passthrough, no_history]), - ok = meck:expect( - emqx_frame, - serialize_opts, - fun(#mqtt_packet_connect{proto_ver = ProtoVer}) -> - #{version => ProtoVer, max_size => 1024} - end - ), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), - - {ok, PubVia} = emqtt:start_data_stream(C, []), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - binary:copy(<<"stream data 1">>, 1024), - [{qos, PubQos}], - undefined - ), - timeout = recv_pub(1), - ?assert(is_list(emqtt:info(C))), - ok = meck:unload(emqx_frame), - ok = emqtt:disconnect(C). - -t_conn_change_client_addr(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), - - {ok, {quic, Conn, _} = PubVia} = emqtt:start_data_stream(C, []), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - <<"stream data 1">>, - [{qos, PubQos}], - undefined - ), - - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := _PktId1, - payload := <<"stream data 1">>, - qos := RecQos - }} - ], - recv_pub(1) - ), - NewPort = select_port(), - {ok, OldAddr} = quicer:sockname(Conn), - ?assertEqual( - ok, quicer:setopt(Conn, param_conn_local_address, "127.0.0.1:" ++ integer_to_list(NewPort)) - ), - {ok, NewAddr} = quicer:sockname(Conn), - ct:pal("NewAddr: ~p, Old Addr: ~p", [NewAddr, OldAddr]), - ?assertNotEqual(OldAddr, NewAddr), - ?assert(is_list(emqtt:info(C))), - ok = emqtt:disconnect(C). - -t_multi_streams_sub_pub_async(Config) -> - Topic = atom_to_binary(?FUNCTION_NAME), - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - Topic2 = <>, - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - <<"stream data 1">>, - [{qos, PubQos}], - undefined - ), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic2, - <<"stream data 2">>, - [{qos, PubQos}], - undefined - ), - PubRecvs = recv_pub(2), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data", _/binary>>, - qos := RecQos - }}, - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data", _/binary>>, - qos := RecQos - }} - ], - PubRecvs - ), - Payloads = [P || {publish, #{payload := P}} <- PubRecvs], - ?assert( - [<<"stream data 1">>, <<"stream data 2">>] == Payloads orelse - [<<"stream data 2">>, <<"stream data 1">>] == Payloads - ), - ok = emqtt:disconnect(C). - -t_multi_streams_sub_pub_sync(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SVia1}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<"stream data 3">>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> - Via1 = undefined, - ok; - {ok, #{reason_code := 0, via := Via1}} -> - ok - end, - case - emqtt:publish_via(C, {new_data_stream, []}, Topic2, #{}, <<"stream data 4">>, [ - {qos, PubQos} - ]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := Via2}} -> - ?assert(Via1 =/= Via2), - ok - end, - ct:pal("SVia1: ~p, SVia2: ~p", [SVia1, SVia2]), - PubRecvs = recv_pub(2), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data 3">>, - qos := RecQos, - via := SVia1 - }}, - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data 4">>, - qos := RecQos, - via := SVia2 - }} - ], - lists:sort(PubRecvs) - ), - ok = emqtt:disconnect(C). - -t_multi_streams_dup_sub(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SVia1}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - - #{data_stream_socks := [{quic, _Conn, SubStream} | _]} = proplists:get_value( - extra, emqtt:info(C) - ), - ?assertEqual(2, length(emqx_broker:subscribers(Topic))), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<"stream data 3">>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> - ok; - {ok, #{reason_code := 0, via := _Via1}} -> - ok - end, - PubRecvs = recv_pub(2), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data 3">>, - qos := RecQos - }}, - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<"stream data 3">>, - qos := RecQos - }} - ], - lists:sort(PubRecvs) - ), - - RecvVias = [Via || {publish, #{via := Via}} <- PubRecvs], - - ct:pal("~p, ~p, ~n recv from: ~p~n", [SVia1, SVia2, PubRecvs]), - %% Can recv in any order - ?assert([SVia1, SVia2] == RecvVias orelse [SVia2, SVia1] == RecvVias), - - %% Shutdown one stream - quicer:async_shutdown_stream(SubStream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 500), - timer:sleep(100), - - ?assertEqual(1, length(emqx_broker:subscribers(Topic))), - - ok = emqtt:disconnect(C). - -t_multi_streams_corr_topic(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - PktId2 = calc_pkt_id(RecQos, 2), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SubVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> - ok; - {ok, #{reason_code := 0, via := _Via}} -> - ok - end, - - #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), - ?assert(PubVia =/= SubVia), - - case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := PubVia}} -> ok - end, - PubRecvs = recv_pub(2), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }}, - {publish, #{ - client_pid := C, - packet_id := PktId2, - payload := <<6, 7, 8, 9>>, - qos := RecQos - }} - ], - PubRecvs - ), - ok = emqtt:disconnect(C). - -t_multi_streams_unsub(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SubVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> - ok; - {ok, #{reason_code := 0, via := _PVia}} -> - ok - end, - - #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), - ?assert(PubVia =/= SubVia), - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - emqtt:unsubscribe_via(C, SubVia, Topic), - - case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of - ok when PubQos == 0 -> - ok; - {ok, #{reason_code := 16, via := PubVia, reason_code_name := no_matching_subscribers}} -> - ok - end, - - timeout = recv_pub(1), - ok = emqtt:disconnect(C). - -t_multi_streams_kill_sub_stream(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := _SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - [TopicStreamOwner] = emqx_broker:subscribers(Topic), - exit(TopicStreamOwner, kill), - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> - ok; - {ok, #{reason_code := Code, via := _PVia}} when Code == 0 orelse Code == 16 -> - ok - end, - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic2, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> - ok; - {ok, #{reason_code := 0, via := _PVia2}} -> - ok - end, - - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - topic := Topic2, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - recv_pub(1) - ), - ?assertEqual(timeout, recv_pub(1)), - ok. - -t_multi_streams_unsub_via_other(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - PktId2 = calc_pkt_id(RecQos, 2), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia}} -> ok - end, - - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), - - %% Unsub topic1 via stream2 should fail with error code 17: "No subscription existed" - {ok, #{via := SVia2}, [17]} = emqtt:unsubscribe_via(C, SVia2, Topic), - - case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia2}} -> ok - end, - - PubRecvs2 = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId2, - payload := <<6, 7, 8, 9>>, - qos := RecQos - }} - ], - PubRecvs2 - ), - ok = emqtt:disconnect(C). - -t_multi_streams_shutdown_data_stream(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - ?assert(SVia =/= SVia2), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia}} -> ok - end, - - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), - {quic, _Conn, DataStream} = PubVia, - quicer:shutdown_stream(DataStream, ?config(stream_shutdown_flag, Config), 500, 100), - timer:sleep(500), - %% Still alive - ?assert(is_list(emqtt:info(C))). - -t_multi_streams_shutdown_ctrl_stream(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - unlink(C), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := _SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia}} -> ok - end, - - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - {quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), - quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 1000), - timer:sleep(500), - %% Client should be closed - ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). - -t_multi_streams_shutdown_ctrl_stream_then_reconnect(Config) -> - erlang:process_flag(trap_exit, true), - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {reconnect, true}, - %% speedup test - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - ?assert(SVia2 =/= SVia), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia}} -> ok - end, - - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - {quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), - quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 100), - timer:sleep(200), - %% Client should be closed - ?assert(is_list(emqtt:info(C))). - -t_multi_streams_remote_shutdown(Config) -> - erlang:process_flag(trap_exit, true), - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {reconnect, false}, - %% speedup test - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - ?assert(SVia2 =/= SVia), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia}} -> ok - end, - - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - {quic, _Conn, _Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), - - ok = stop_emqx(), - - timer:sleep(200), - start_emqx_quic(?config(port, Config)), - - %% Client should be closed - ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). - -t_multi_streams_remote_shutdown_with_reconnect(Config) -> - erlang:process_flag(trap_exit, true), - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - PktId1 = calc_pkt_id(RecQos, 1), - - Topic = atom_to_binary(?FUNCTION_NAME), - Topic2 = <>, - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {reconnect, true}, - %% speedup test - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ - {Topic2, [{qos, SubQos}]} - ]), - - ?assert(SVia2 =/= SVia), - - case - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) - of - ok when PubQos == 0 -> ok; - {ok, #{reason_code := 0, via := _PVia}} -> ok - end, - - PubRecvs = recv_pub(1), - ?assertMatch( - [ - {publish, #{ - client_pid := C, - packet_id := PktId1, - payload := <<1, 2, 3, 4, 5>>, - qos := RecQos - }} - ], - PubRecvs - ), - - {quic, _Conn, _Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), - - ok = stop_emqx(), - - timer:sleep(200), - - start_emqx_quic(?config(port, Config)), - %% Client should be closed - ?assert(is_list(emqtt:info(C))). - -t_conn_silent_close(Config) -> - erlang:process_flag(trap_exit, true), - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - %% quic idle timeout + 1s - timer:sleep(16000), - Topic = atom_to_binary(?FUNCTION_NAME), - ?assertException( - exit, - noproc, - emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, 1}]) - ). - -t_client_conn_bump_streams(Config) -> - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - {quic, Conn, _Stream} = proplists:get_value(socket, emqtt:info(C)), - ok = quicer:setopt(Conn, param_conn_settings, #{peer_unidi_stream_count => 20}). - -t_olp_true(Config) -> - meck:new(emqx_olp, [passthrough, no_history]), - ok = meck:expect(emqx_olp, is_overloaded, fun() -> true end), - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - ok = meck:unload(emqx_olp). - -t_olp_reject(Config) -> - erlang:process_flag(trap_exit, true), - emqx_config:put_zone_conf(default, [overload_protection, enable], true), - meck:new(emqx_olp, [passthrough, no_history]), - ok = meck:expect(emqx_olp, is_overloaded, fun() -> true end), - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5} - | Config - ]), - ?assertEqual( - {error, - {transport_down, #{ - error => 346, - status => - user_canceled - }}}, - emqtt:quic_connect(C) - ), - ok = meck:unload(emqx_olp), - emqx_config:put_zone_conf(default, [overload_protection, enable], false). - -t_conn_resume(Config) -> - erlang:process_flag(trap_exit, true), - {ok, C0} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5} - | Config - ]), - - {ok, _} = emqtt:quic_connect(C0), - #{nst := NST} = proplists:get_value(extra, emqtt:info(C0)), - emqtt:disconnect(C0), - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5}, - {nst, NST} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - Cid = proplists:get_value(clientid, emqtt:info(C)), - ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]). - -t_conn_without_ctrl_stream(Config) -> - erlang:process_flag(trap_exit, true), - {ok, Conn} = quicer:connect( - {127, 0, 0, 1}, - ?config(port, Config), - [{alpn, ["mqtt"]}, {verify, none}], - 3000 - ), - receive - {quic, transport_shutdown, Conn, _} -> ok - end. - -t_data_stream_race_ctrl_stream(Config) -> - erlang:process_flag(trap_exit, true), - {ok, C0} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5} - | Config - ]), - {ok, _} = emqtt:quic_connect(C0), - #{nst := NST} = proplists:get_value(extra, emqtt:info(C0)), - emqtt:disconnect(C0), - {ok, C} = emqtt:start_link([ - {proto_ver, v5}, - {connect_timeout, 5}, - {nst, NST} - | Config - ]), - {ok, _} = emqtt:quic_connect(C), - Cid = proplists:get_value(clientid, emqtt:info(C)), - ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]). - -t_multi_streams_sub_0_rtt(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - Topic = atom_to_binary(?FUNCTION_NAME), - {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C0), - {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - ok = emqtt:open_quic_connection(C), - ok = emqtt:quic_mqtt_connect(C), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - #{}, - <<"qos 2 1">>, - [{qos, PubQos}], - infinity, - fun(_) -> ok end - ), - {ok, _} = emqtt:quic_connect(C), - receive - {publish, #{ - client_pid := C0, - payload := <<"qos 2 1">>, - qos := RecQos, - topic := Topic - }} -> - ok; - Other -> - ct:fail("unexpected recv ~p", [Other]) - after 100 -> - ct:fail("not received") - end, - ok = emqtt:disconnect(C), - ok = emqtt:disconnect(C0). - -t_multi_streams_sub_0_rtt_large_payload(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - Topic = atom_to_binary(?FUNCTION_NAME), - Payload = binary:copy(<<"qos 2 1">>, 1600), - {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C0), - {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - ok = emqtt:open_quic_connection(C), - ok = emqtt:quic_mqtt_connect(C), - ok = emqtt:publish_async( - C, - {new_data_stream, []}, - Topic, - #{}, - Payload, - [{qos, PubQos}], - infinity, - fun(_) -> ok end - ), - {ok, _} = emqtt:quic_connect(C), - receive - {publish, #{ - client_pid := C0, - payload := Payload, - qos := RecQos, - topic := Topic - }} -> - ok; - Other -> - ct:fail("unexpected recv ~p", [Other]) - after 100 -> - ct:fail("not received") - end, - ok = emqtt:disconnect(C), - ok = emqtt:disconnect(C0). - -%% @doc verify data stream can continue after 0-RTT handshake -t_multi_streams_sub_0_rtt_stream_data_cont(Config) -> - PubQos = ?config(pub_qos, Config), - SubQos = ?config(sub_qos, Config), - RecQos = calc_qos(PubQos, SubQos), - Topic = atom_to_binary(?FUNCTION_NAME), - Payload = binary:copy(<<"qos 2 1">>, 1600), - {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:quic_connect(C0), - {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ - {Topic, [{qos, SubQos}]} - ]), - {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), - ok = emqtt:open_quic_connection(C), - ok = emqtt:quic_mqtt_connect(C), - {ok, PubVia} = emqtt:start_data_stream(C, []), - ok = emqtt:publish_async( - C, - PubVia, - Topic, - #{}, - Payload, - [{qos, PubQos}], - infinity, - fun(_) -> ok end - ), - {ok, _} = emqtt:quic_connect(C), - receive - {publish, #{ - client_pid := C0, - payload := Payload, - qos := RecQos, - topic := Topic - }} -> - ok; - Other -> - ct:fail("unexpected recv ~p", [Other]) - after 100 -> - ct:fail("not received") - end, - Payload2 = <<"2nd part", Payload/binary>>, - ok = emqtt:publish_async( - C, - PubVia, - Topic, - #{}, - Payload2, - [{qos, PubQos}], - infinity, - fun(_) -> ok end - ), - receive - {publish, #{ - client_pid := C0, - payload := Payload2, - qos := RecQos, - topic := Topic - }} -> - ok; - Other2 -> - ct:fail("unexpected recv ~p", [Other2]) - after 100 -> - ct:fail("not received") - end, - ok = emqtt:disconnect(C), - ok = emqtt:disconnect(C0). - -%%-------------------------------------------------------------------- -%% Helper functions -%%-------------------------------------------------------------------- -send_and_recv_with(Sock) -> - {ok, {IP, _}} = emqtt_quic:sockname(Sock), - ?assert(lists:member(tuple_size(IP), [4, 8])), - ok = emqtt_quic:send(Sock, <<"ping">>), - emqtt_quic:setopts(Sock, [{active, false}]), - {ok, <<"pong">>} = emqtt_quic:recv(Sock, 0), - ok = emqtt_quic:setopts(Sock, [{active, 100}]), - {ok, Stats} = emqtt_quic:getstat(Sock, [send_cnt, recv_cnt]), - %% connection level counters, not stream level - [{send_cnt, _}, {recv_cnt, _}] = Stats. - -certfile(Config) -> - filename:join([test_dir(Config), "certs", "test.crt"]). - -keyfile(Config) -> - filename:join([test_dir(Config), "certs", "test.key"]). - -test_dir(Config) -> - filename:dirname(filename:dirname(proplists:get_value(data_dir, Config))). - -recv_pub(Count) -> - recv_pub(Count, []). - -recv_pub(0, Acc) -> - lists:reverse(Acc); -recv_pub(Count, Acc) -> - receive - {publish, _Prop} = Pub -> - recv_pub(Count - 1, [Pub | Acc]) - after 100 -> - timeout - end. - -all_tc() -> - code:add_patha(filename:join(code:lib_dir(emqx), "ebin/")), - emqx_common_test_helpers:all(?MODULE). - --spec calc_qos(0 | 1 | 2, 0 | 1 | 2) -> 0 | 1 | 2. -calc_qos(PubQos, SubQos) -> - if - PubQos > SubQos -> - SubQos; - SubQos > PubQos -> - PubQos; - true -> - PubQos - end. --spec calc_pkt_id(0 | 1 | 2, non_neg_integer()) -> undefined | non_neg_integer(). -calc_pkt_id(0, _Id) -> - undefined; -calc_pkt_id(1, Id) -> - Id; -calc_pkt_id(2, Id) -> - Id. - --spec start_emqx_quic(inet:port_number()) -> ok. -start_emqx_quic(UdpPort) -> - emqx_common_test_helpers:start_apps([]), - application:ensure_all_started(quicer), - emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort). - --spec stop_emqx() -> ok. -stop_emqx() -> - emqx_common_test_helpers:stop_apps([]). - -%% select a random port picked by OS --spec select_port() -> inet:port_number(). -select_port() -> - {ok, S} = gen_udp:open(0, [{reuseaddr, true}]), - {ok, {_, Port}} = inet:sockname(S), - gen_udp:close(S), - case os:type() of - {unix, darwin} -> - %% in MacOS, still get address_in_use after close port - timer:sleep(500); - _ -> - skip - end, - ct:pal("select port: ~p", [Port]), - Port. diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 5a9abc7f4..0199bbc10 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -79,19 +79,6 @@ end_per_group(_Group, _Config) -> init_per_suite(Config) -> %% Start Apps - dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}), - dbg:p(all, c), - dbg:tp(emqx_quic_connection, cx), - dbg:tp(quicer_connection, cx), - %% dbg:tp(emqx_quic_stream, cx), - %% dbg:tp(emqtt_quic, cx), - %% dbg:tp(emqtt, cx), - %% dbg:tp(emqtt_quic_stream, cx), - %% dbg:tp(emqtt_quic_connection, cx), - %% dbg:tp(emqx_cm, open_session, cx), - %% dbg:tpl(emqx_cm, lookup_channels, cx), - %% dbg:tpl(emqx_cm, register_channel, cx), - %% dbg:tpl(emqx_cm, unregister_channel, cx), emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), Config. diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index bb19092f7..b6d3c661c 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -13,178 +13,1748 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- - -module(emqx_quic_multistreams_SUITE). -compile(export_all). +-compile(nowarn_export_all). --include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("quicer/include/quicer.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). --define(TOPICS, [ - <<"TopicA">>, - <<"TopicA/B">>, - <<"Topic/C">>, - <<"TopicA/C">>, - <<"/TopicA">> -]). - -%%-------------------------------------------------------------------- -%% @spec suite() -> Info -%% Info = [tuple()] -%% @end -%%-------------------------------------------------------------------- suite() -> [{timetrap, {seconds, 30}}]. -%%-------------------------------------------------------------------- -%% @spec init_per_suite(Config0) -> -%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} -%% Config0 = Config1 = [tuple()] -%% Reason = term() -%% @end -%%-------------------------------------------------------------------- -init_per_suite(Config) -> - UdpPort = 1884, - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort), - %% @TODO remove - emqx_logger:set_log_level(debug), - - dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}), - dbg:p(all, c), - - %dbg:tp(emqx_quic_stream, cx), - %% dbg:tp(quicer_stream, cx), - %% dbg:tp(emqx_quic_data_stream, cx), - %% dbg:tp(emqx_channel, cx), - %% dbg:tp(emqx_packet,check,cx), - %% dbg:tp(emqx_frame,parse,cx), - %dbg:tp(emqx_quic_connection, cx), - [{port, UdpPort}, {conn_fun, quic_connect} | Config]. - -%%-------------------------------------------------------------------- -%% @spec end_per_suite(Config0) -> term() | {save_config,Config1} -%% Config0 = Config1 = [tuple()] -%% @end -%%-------------------------------------------------------------------- -end_per_suite(_Config) -> - ok. - -%%-------------------------------------------------------------------- -%% @spec init_per_group(GroupName, Config0) -> -%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} -%% GroupName = atom() -%% Config0 = Config1 = [tuple()] -%% Reason = term() -%% @end -%%-------------------------------------------------------------------- -init_per_group(_GroupName, Config) -> - Config. - -%%-------------------------------------------------------------------- -%% @spec end_per_group(GroupName, Config0) -> -%% term() | {save_config,Config1} -%% GroupName = atom() -%% Config0 = Config1 = [tuple()] -%% @end -%%-------------------------------------------------------------------- -end_per_group(_GroupName, _Config) -> - ok. - -%%-------------------------------------------------------------------- -%% @spec init_per_testcase(TestCase, Config0) -> -%% Config1 | {skip,Reason} | {skip_and_save,Reason,Config1} -%% TestCase = atom() -%% Config0 = Config1 = [tuple()] -%% Reason = term() -%% @end -%%-------------------------------------------------------------------- -init_per_testcase(_TestCase, Config) -> - Config. - -%%-------------------------------------------------------------------- -%% @spec end_per_testcase(TestCase, Config0) -> -%% term() | {save_config,Config1} | {fail,Reason} -%% TestCase = atom() -%% Config0 = Config1 = [tuple()] -%% Reason = term() -%% @end -%%-------------------------------------------------------------------- -end_per_testcase(_TestCase, _Config) -> - ok. - -%%-------------------------------------------------------------------- -%% @spec groups() -> [Group] -%% Group = {GroupName,Properties,GroupsAndTestCases} -%% GroupName = atom() -%% Properties = [parallel | sequence | Shuffle | {RepeatType,N}] -%% GroupsAndTestCases = [Group | {group,GroupName} | TestCase] -%% TestCase = atom() -%% Shuffle = shuffle | {shuffle,{integer(),integer(),integer()}} -%% RepeatType = repeat | repeat_until_all_ok | repeat_until_all_fail | -%% repeat_until_any_ok | repeat_until_any_fail -%% N = integer() | forever -%% @end -%%-------------------------------------------------------------------- -groups() -> - []. - -%%-------------------------------------------------------------------- -%% @spec all() -> GroupsAndTestCases | {skip,Reason} -%% GroupsAndTestCases = [{group,GroupName} | TestCase] -%% GroupName = atom() -%% TestCase = atom() -%% Reason = term() -%% @end -%%-------------------------------------------------------------------- all() -> [ - tc_data_stream_sub + {group, mstream}, + {group, shutdown}, + {group, misc} ]. -%%-------------------------------------------------------------------- -%% @spec TestCase(Config0) -> -%% ok | exit() | {skip,Reason} | {comment,Comment} | -%% {save_config,Config1} | {skip_and_save,Reason,Config1} -%% Config0 = Config1 = [tuple()] -%% Reason = term() -%% Comment = term() -%% @end -%%-------------------------------------------------------------------- +groups() -> + [ + {mstream, [], [{group, profiles}]}, -%% @doc Test MQTT Subscribe via data_stream -tc_data_stream_sub(Config) -> - Topic = lists:nth(1, ?TOPICS), + {profiles, [], [ + {group, profile_low_latency}, + {group, profile_max_throughput} + ]}, + {profile_low_latency, [], [ + {group, pub_qos0}, + {group, pub_qos1}, + {group, pub_qos2} + ]}, + {profile_max_throughput, [], [ + {group, pub_qos0}, + {group, pub_qos1}, + {group, pub_qos2} + ]}, + {pub_qos0, [], [ + {group, sub_qos0}, + {group, sub_qos1}, + {group, sub_qos2} + ]}, + {pub_qos1, [], [ + {group, sub_qos0}, + {group, sub_qos1}, + {group, sub_qos2} + ]}, + {pub_qos2, [], [ + {group, sub_qos0}, + {group, sub_qos1}, + {group, sub_qos2} + ]}, + {sub_qos0, [{group, qos}]}, + {sub_qos1, [{group, qos}]}, + {sub_qos2, [{group, qos}]}, + {qos, [ + t_multi_streams_sub, + t_multi_streams_pub_5x100, + t_multi_streams_pub_parallel, + t_multi_streams_pub_parallel_no_blocking, + t_multi_streams_sub_pub_async, + t_multi_streams_sub_pub_sync, + t_multi_streams_unsub, + t_multi_streams_corr_topic, + t_multi_streams_unsub_via_other, + t_multi_streams_dup_sub, + t_multi_streams_packet_boundary, + t_multi_streams_packet_malform, + t_multi_streams_kill_sub_stream, + t_multi_streams_packet_too_large, + t_multi_streams_sub_0_rtt, + t_multi_streams_sub_0_rtt_large_payload, + t_multi_streams_sub_0_rtt_stream_data_cont, + t_conn_change_client_addr + ]}, + + {shutdown, [ + {group, graceful_shutdown}, + {group, abort_recv_shutdown}, + {group, abort_send_shutdown}, + {group, abort_send_recv_shutdown} + ]}, + + {graceful_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + {abort_recv_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + {abort_send_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + {abort_send_recv_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + + {ctrl_stream_shutdown, [ + t_multi_streams_shutdown_ctrl_stream, + t_multi_streams_shutdown_ctrl_stream_then_reconnect, + t_multi_streams_remote_shutdown, + t_multi_streams_remote_shutdown_with_reconnect + ]}, + + {data_stream_shutdown, [ + t_multi_streams_shutdown_pub_data_stream, + t_multi_streams_shutdown_sub_data_stream + ]}, + {misc, [ + t_conn_silent_close, + t_client_conn_bump_streams, + t_olp_true, + t_olp_reject, + t_conn_resume, + t_conn_without_ctrl_stream + ]} + ]. + +init_per_suite(Config) -> + emqx_common_test_helpers:start_apps([]), + UdpPort = 14567, + start_emqx_quic(UdpPort), + %% dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}), + %% dbg:p(all, c), + %% dbg:tpl(quicer_stream, handle_info, c), + %% dbg:tp(emqx_quic_connection, cx), + %% dbg:tp(emqx_quic_stream, cx), + %% dbg:tp(emqtt, cx), + %% dbg:tpl(emqtt_quic_stream, cx), + %% dbg:tpl(emqx_quic_stream, cx), + %% dbg:tpl(emqx_quic_data_stream, cx), + %% dbg:tpl(emqtt, cx), + [{port, UdpPort}, {pub_qos, 0}, {sub_qos, 0} | Config]. + +end_per_suite(_) -> + ok. + +init_per_group(pub_qos0, Config) -> + [{pub_qos, 0} | Config]; +init_per_group(sub_qos0, Config) -> + [{sub_qos, 0} | Config]; +init_per_group(pub_qos1, Config) -> + [{pub_qos, 1} | Config]; +init_per_group(sub_qos1, Config) -> + [{sub_qos, 1} | Config]; +init_per_group(pub_qos2, Config) -> + [{pub_qos, 2} | Config]; +init_per_group(sub_qos2, Config) -> + [{sub_qos, 2} | Config]; +init_per_group(abort_send_shutdown, Config) -> + [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND} | Config]; +init_per_group(abort_recv_shutdown, Config) -> + [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE} | Config]; +init_per_group(abort_send_recv_shutdown, Config) -> + [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT} | Config]; +init_per_group(graceful_shutdown, Config) -> + [{stream_shutdown_flag, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL} | Config]; +init_per_group(profile_max_throughput, Config) -> + quicer:reg_open(quic_execution_profile_type_max_throughput), + Config; +init_per_group(profile_low_latency, Config) -> + quicer:reg_open(quic_execution_profile_low_latency), + Config; +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(_, Config) -> + emqx_common_test_helpers:start_apps([]), + Config. + +t_quic_sock(Config) -> + Port = 4567, + SslOpts = [ + {cert, certfile(Config)}, + {key, keyfile(Config)}, + {idle_timeout_ms, 10000}, + % QUIC_SERVER_RESUME_AND_ZERORTT + {server_resumption_level, 2}, + {peer_bidi_stream_count, 10}, + {alpn, ["mqtt"]} + ], + Server = quic_server:start_link(Port, SslOpts), + timer:sleep(500), + {ok, Sock} = emqtt_quic:connect( + "localhost", + Port, + [{alpn, ["mqtt"]}, {active, false}], + 3000 + ), + send_and_recv_with(Sock), + ok = emqtt_quic:close(Sock), + quic_server:stop(Server). + +t_quic_sock_fail(_Config) -> + Port = 4567, + Error1 = + {error, + {transport_down, #{ + error => 2, + status => connection_refused + }}}, + Error2 = {error, {transport_down, #{error => 1, status => unreachable}}}, + case + emqtt_quic:connect( + "localhost", + Port, + [{alpn, ["mqtt"]}, {active, false}], + 3000 + ) + of + Error1 -> + ok; + Error2 -> + ok; + Other -> + ct:fail("unexpected return ~p", [Other]) + end. + +t_0_rtt(Config) -> + Port = 4568, + SslOpts = [ + {cert, certfile(Config)}, + {key, keyfile(Config)}, + {idle_timeout_ms, 10000}, + % QUIC_SERVER_RESUME_AND_ZERORTT + {server_resumption_level, 2}, + {peer_bidi_stream_count, 10}, + {alpn, ["mqtt"]} + ], + Server = quic_server:start_link(Port, SslOpts), + timer:sleep(500), + {ok, {quic, Conn, _Stream} = Sock} = emqtt_quic:connect( + "localhost", + Port, + [ + {alpn, ["mqtt"]}, + {active, false}, + {quic_event_mask, 1} + ], + 3000 + ), + send_and_recv_with(Sock), + ok = emqtt_quic:close(Sock), + NST = + receive + {quic, nst_received, Conn, Ticket} -> + Ticket + end, + {ok, Sock2} = emqtt_quic:connect( + "localhost", + Port, + [ + {alpn, ["mqtt"]}, + {active, false}, + {nst, NST} + ], + 3000 + ), + send_and_recv_with(Sock2), + ok = emqtt_quic:close(Sock2), + quic_server:stop(Server). + +t_0_rtt_fail(Config) -> + Port = 4569, + SslOpts = [ + {cert, certfile(Config)}, + {key, keyfile(Config)}, + {idle_timeout_ms, 10000}, + % QUIC_SERVER_RESUME_AND_ZERORTT + {server_resumption_level, 2}, + {peer_bidi_stream_count, 10}, + {alpn, ["mqtt"]} + ], + Server = quic_server:start_link(Port, SslOpts), + timer:sleep(500), + {ok, {quic, Conn, _Stream} = Sock} = emqtt_quic:connect( + "localhost", + Port, + [ + {alpn, ["mqtt"]}, + {active, false}, + {quic_event_mask, 1} + ], + 3000 + ), + send_and_recv_with(Sock), + ok = emqtt_quic:close(Sock), + <<_Head:16, Left/binary>> = + receive + {quic, nst_received, Conn, Ticket} when is_binary(Ticket) -> + Ticket + end, + + Error = {error, {not_found, invalid_parameter}}, + Error = emqtt_quic:connect( + "localhost", + Port, + [ + {alpn, ["mqtt"]}, + {active, false}, + {nst, Left} + ], + 3000 + ), + quic_server:stop(Server). + +t_multi_streams_sub(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, _} = emqtt:quic_connect(C), - {ok, _, [1]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [{Topic, [{qos, qos1}]}]), - {ok, _, [2]} = emqtt:subscribe_via( - C, - {new_data_stream, []}, - #{}, - [{lists:nth(2, ?TOPICS), [{qos, qos2}]}] - ), - {ok, _} = emqtt:publish(C, Topic, <<"qos 2 1">>, 2), - {ok, _} = emqtt:publish(C, Topic, <<"qos 2 2">>, 2), - {ok, _} = emqtt:publish(C, Topic, <<"qos 2 3">>, 2), - Msgs = receive_messages(3), - ct:pal("recv msg: ~p", [Msgs]), - ?assertEqual(3, length(Msgs)), + {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + case emqtt:publish(C, Topic, <<"qos 2 1">>, PubQos) of + ok when PubQos == 0 -> ok; + {ok, _} -> ok + end, + receive + {publish, #{ + client_pid := C, + payload := <<"qos 2 1">>, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, ok = emqtt:disconnect(C). -receive_messages(Count) -> - receive_messages(Count, []). +t_multi_streams_pub_5x100(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), -receive_messages(0, Msgs) -> - Msgs; -receive_messages(Count, Msgs) -> + PubVias = lists:map( + fun(_N) -> + {ok, Via} = emqtt:start_data_stream(C, []), + Via + end, + lists:seq(1, 5) + ), + CtrlVia = proplists:get_value(socket, emqtt:info(C)), + [ + begin + case emqtt:publish_via(C, PVia, Topic, #{}, <<"stream data ", N>>, [{qos, PubQos}]) of + ok when PubQos == 0 -> ok; + {ok, _} -> ok + end, + 0 == (N rem 10) andalso timer:sleep(10) + end + || %% also publish on control stream + N <- lists:seq(1, 100), + PVia <- [CtrlVia | PubVias] + ], + ?assert(timeout =/= recv_pub(600)), + ok = emqtt:disconnect(C). + +t_multi_streams_pub_parallel(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + PktId2 = calc_pkt_id(RecQos, 2), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + <<"stream data 1">>, + [{qos, PubQos}], + undefined + ), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + PubRecvs = recv_pub(2), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data", _/binary>>, + qos := RecQos, + topic := Topic + }}, + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<"stream data", _/binary>>, + qos := RecQos, + topic := Topic + }} + ], + PubRecvs + ), + Payloads = [P || {publish, #{payload := P}} <- PubRecvs], + ?assert( + [<<"stream data 1">>, <<"stream data 2">>] == Payloads orelse + [<<"stream data 2">>, <<"stream data 1">>] == Payloads + ), + ok = emqtt:disconnect(C). + +%% @doc test two pub streams, one send incomplete MQTT packet() can not block another. +t_multi_streams_pub_parallel_no_blocking(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId2 = calc_pkt_id(RecQos, 1), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + Drop = <<"stream data 1">>, + meck:new(emqtt_quic, [passthrough, no_history]), + meck:expect(emqtt_quic, send, fun(Sock, IoList) -> + case lists:last(IoList) == Drop of + true -> + ct:pal("meck droping ~p", [Drop]), + meck:passthrough([Sock, IoList -- [Drop]]); + false -> + meck:passthrough([Sock, IoList]) + end + end), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + Drop, + [{qos, PubQos}], + undefined + ), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<"stream data 2">>, + qos := RecQos, + topic := Topic + }} + ], + PubRecvs + ), + meck:unload(emqtt_quic), + ?assertEqual(timeout, recv_pub(1)), + ok = emqtt:disconnect(C). + +t_multi_streams_packet_boundary(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + PktId2 = calc_pkt_id(RecQos, 2), + PktId3 = calc_pkt_id(RecQos, 3), + Topic = atom_to_binary(?FUNCTION_NAME), + + %% make quicer to batch job + quicer:reg_open(quic_execution_profile_type_max_throughput), + + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + + {ok, PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 1">>, + [{qos, PubQos}], + undefined + ), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + LargePart3 = binary:copy(<<"stream data3">>, 2000), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + LargePart3, + [{qos, PubQos}], + undefined + ), + PubRecvs = recv_pub(3), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 1">>, + qos := RecQos, + topic := Topic + }}, + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<"stream data 2">>, + qos := RecQos, + topic := Topic + }}, + {publish, #{ + client_pid := C, + packet_id := PktId3, + payload := LargePart3, + qos := RecQos, + topic := Topic + }} + ], + PubRecvs + ), + ok = emqtt:disconnect(C). + +%% @doc test that one malformed stream will not close the entire connection +t_multi_streams_packet_malform(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + PktId2 = calc_pkt_id(RecQos, 2), + PktId3 = calc_pkt_id(RecQos, 3), + Topic = atom_to_binary(?FUNCTION_NAME), + + %% make quicer to batch job + quicer:reg_open(quic_execution_profile_type_max_throughput), + + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + + {ok, PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 1">>, + [{qos, PubQos}], + undefined + ), + + {ok, {quic, _Conn, MalformStream}} = emqtt:start_data_stream(C, []), + {ok, _} = quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>), + + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + LargePart3 = binary:copy(<<"stream data3">>, 2000), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + LargePart3, + [{qos, PubQos}], + undefined + ), + PubRecvs = recv_pub(3), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 1">>, + qos := RecQos, + topic := Topic + }}, + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<"stream data 2">>, + qos := RecQos, + topic := Topic + }}, + {publish, #{ + client_pid := C, + packet_id := PktId3, + payload := LargePart3, + qos := RecQos, + topic := Topic + }} + ], + PubRecvs + ), + + case quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>) of + {ok, 10} -> ok; + {error, cancelled} -> ok; + {error, stm_send_error, aborted} -> ok + end, + + timer:sleep(200), + ?assert(is_list(emqtt:info(C))), + + {error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>), + + timer:sleep(200), + ?assert(is_list(emqtt:info(C))), + + ok = emqtt:disconnect(C). + +t_multi_streams_packet_too_large(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + Topic = atom_to_binary(?FUNCTION_NAME), + meck:new(emqx_frame, [passthrough, no_history]), + ok = meck:expect( + emqx_frame, + serialize_opts, + fun(#mqtt_packet_connect{proto_ver = ProtoVer}) -> + #{version => ProtoVer, max_size => 1024} + end + ), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + + {ok, PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + binary:copy(<<"stream data 1">>, 1024), + [{qos, PubQos}], + undefined + ), + timeout = recv_pub(1), + ?assert(is_list(emqtt:info(C))), + ok = meck:unload(emqx_frame), + ok = emqtt:disconnect(C). + +t_conn_change_client_addr(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + + {ok, {quic, Conn, _} = PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 1">>, + [{qos, PubQos}], + undefined + ), + + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := _PktId1, + payload := <<"stream data 1">>, + qos := RecQos + }} + ], + recv_pub(1) + ), + NewPort = select_port(), + {ok, OldAddr} = quicer:sockname(Conn), + ?assertEqual( + ok, quicer:setopt(Conn, param_conn_local_address, "127.0.0.1:" ++ integer_to_list(NewPort)) + ), + {ok, NewAddr} = quicer:sockname(Conn), + ct:pal("NewAddr: ~p, Old Addr: ~p", [NewAddr, OldAddr]), + ?assertNotEqual(OldAddr, NewAddr), + ?assert(is_list(emqtt:info(C))), + ok = emqtt:disconnect(C). + +t_multi_streams_sub_pub_async(Config) -> + Topic = atom_to_binary(?FUNCTION_NAME), + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + <<"stream data 1">>, + [{qos, PubQos}], + undefined + ), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic2, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + PubRecvs = recv_pub(2), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data", _/binary>>, + qos := RecQos + }}, + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data", _/binary>>, + qos := RecQos + }} + ], + PubRecvs + ), + Payloads = [P || {publish, #{payload := P}} <- PubRecvs], + ?assert( + [<<"stream data 1">>, <<"stream data 2">>] == Payloads orelse + [<<"stream data 2">>, <<"stream data 1">>] == Payloads + ), + ok = emqtt:disconnect(C). + +t_multi_streams_sub_pub_sync(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia1}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<"stream data 3">>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + Via1 = undefined, + ok; + {ok, #{reason_code := 0, via := Via1}} -> + ok + end, + case + emqtt:publish_via(C, {new_data_stream, []}, Topic2, #{}, <<"stream data 4">>, [ + {qos, PubQos} + ]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := Via2}} -> + ?assert(Via1 =/= Via2), + ok + end, + ct:pal("SVia1: ~p, SVia2: ~p", [SVia1, SVia2]), + PubRecvs = recv_pub(2), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 3">>, + qos := RecQos, + via := SVia1 + }}, + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 4">>, + qos := RecQos, + via := SVia2 + }} + ], + lists:sort(PubRecvs) + ), + ok = emqtt:disconnect(C). + +t_multi_streams_dup_sub(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia1}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + + #{data_stream_socks := [{quic, _Conn, SubStream} | _]} = proplists:get_value( + extra, emqtt:info(C) + ), + ?assertEqual(2, length(emqx_broker:subscribers(Topic))), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<"stream data 3">>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := 0, via := _Via1}} -> + ok + end, + PubRecvs = recv_pub(2), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 3">>, + qos := RecQos + }}, + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 3">>, + qos := RecQos + }} + ], + lists:sort(PubRecvs) + ), + + RecvVias = [Via || {publish, #{via := Via}} <- PubRecvs], + + ct:pal("~p, ~p, ~n recv from: ~p~n", [SVia1, SVia2, PubRecvs]), + %% Can recv in any order + ?assert([SVia1, SVia2] == RecvVias orelse [SVia2, SVia1] == RecvVias), + + %% Shutdown one stream + quicer:async_shutdown_stream(SubStream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 500), + timer:sleep(100), + + ?assertEqual(1, length(emqx_broker:subscribers(Topic))), + + ok = emqtt:disconnect(C). + +t_multi_streams_corr_topic(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + PktId2 = calc_pkt_id(RecQos, 2), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SubVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := 0, via := _Via}} -> + ok + end, + + #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), + ?assert(PubVia =/= SubVia), + + case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := PubVia}} -> ok + end, + PubRecvs = recv_pub(2), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }}, + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<6, 7, 8, 9>>, + qos := RecQos + }} + ], + PubRecvs + ), + ok = emqtt:disconnect(C). + +t_multi_streams_unsub(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SubVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := 0, via := _PVia}} -> + ok + end, + + #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), + ?assert(PubVia =/= SubVia), + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + emqtt:unsubscribe_via(C, SubVia, Topic), + + case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := 16, via := PubVia, reason_code_name := no_matching_subscribers}} -> + ok + end, + + timeout = recv_pub(1), + ok = emqtt:disconnect(C). + +t_multi_streams_kill_sub_stream(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := _SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + [TopicStreamOwner] = emqx_broker:subscribers(Topic), + exit(TopicStreamOwner, kill), + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := Code, via := _PVia}} when Code == 0 orelse Code == 16 -> + ok + end, + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic2, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := 0, via := _PVia2}} -> + ok + end, + + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + topic := Topic2, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + recv_pub(1) + ), + ?assertEqual(timeout, recv_pub(1)), + ok. + +t_multi_streams_unsub_via_other(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + PktId2 = calc_pkt_id(RecQos, 2), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), + + %% Unsub topic1 via stream2 should fail with error code 17: "No subscription existed" + {ok, #{via := SVia2}, [17]} = emqtt:unsubscribe_via(C, SVia2, Topic), + + case emqtt:publish_via(C, PubVia, Topic, #{}, <<6, 7, 8, 9>>, [{qos, PubQos}]) of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia2}} -> ok + end, + + PubRecvs2 = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<6, 7, 8, 9>>, + qos := RecQos + }} + ], + PubRecvs2 + ), + ok = emqtt:disconnect(C). + +t_multi_streams_shutdown_pub_data_stream(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia =/= SVia2), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), + {quic, _Conn, DataStream} = PubVia, + quicer:shutdown_stream(DataStream, ?config(stream_shutdown_flag, Config), 500, 100), + timer:sleep(500), + %% Still alive + ?assert(is_list(emqtt:info(C))). + +t_multi_streams_shutdown_sub_data_stream(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia =/= SVia2), + {quic, _Conn, DataStream} = SVia2, + quicer:shutdown_stream(DataStream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, 500, 100), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + #{data_stream_socks := [_PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), + timer:sleep(500), + %% Still alive + ?assert(is_list(emqtt:info(C))). + +t_multi_streams_shutdown_ctrl_stream(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + unlink(C), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := _SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + {quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), + quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 1000), + timer:sleep(500), + %% Client should be closed + ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + +t_multi_streams_shutdown_ctrl_stream_then_reconnect(Config) -> + erlang:process_flag(trap_exit, true), + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {reconnect, true}, + %% speedup test + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia2 =/= SVia), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + {quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), + quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 100), + timer:sleep(200), + %% Client should be closed + ?assert(is_list(emqtt:info(C))). + +t_multi_streams_remote_shutdown(Config) -> + erlang:process_flag(trap_exit, true), + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {reconnect, false}, + %% speedup test + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia2 =/= SVia), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + {quic, _Conn, _Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), + + ok = stop_emqx(), + + timer:sleep(200), + start_emqx_quic(?config(port, Config)), + + %% Client should be closed + ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + +t_multi_streams_remote_shutdown_with_reconnect(Config) -> + erlang:process_flag(trap_exit, true), + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {reconnect, true}, + %% speedup test + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia2 =/= SVia), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + {quic, _Conn, _Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), + + ok = stop_emqx(), + + timer:sleep(200), + + start_emqx_quic(?config(port, Config)), + %% Client should be closed + ?assert(is_list(emqtt:info(C))). + +t_conn_silent_close(Config) -> + erlang:process_flag(trap_exit, true), + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + %% quic idle timeout + 1s + timer:sleep(16000), + Topic = atom_to_binary(?FUNCTION_NAME), + ?assertException( + exit, + noproc, + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, 1}]) + ). + +t_client_conn_bump_streams(Config) -> + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + {quic, Conn, _Stream} = proplists:get_value(socket, emqtt:info(C)), + ok = quicer:setopt(Conn, param_conn_settings, #{peer_unidi_stream_count => 20}). + +t_olp_true(Config) -> + meck:new(emqx_olp, [passthrough, no_history]), + ok = meck:expect(emqx_olp, is_overloaded, fun() -> true end), + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + ok = meck:unload(emqx_olp). + +t_olp_reject(Config) -> + erlang:process_flag(trap_exit, true), + emqx_config:put_zone_conf(default, [overload_protection, enable], true), + meck:new(emqx_olp, [passthrough, no_history]), + ok = meck:expect(emqx_olp, is_overloaded, fun() -> true end), + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + ?assertEqual( + {error, + {transport_down, #{ + error => 346, + status => + user_canceled + }}}, + emqtt:quic_connect(C) + ), + ok = meck:unload(emqx_olp), + emqx_config:put_zone_conf(default, [overload_protection, enable], false). + +t_conn_resume(Config) -> + erlang:process_flag(trap_exit, true), + {ok, C0} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + + {ok, _} = emqtt:quic_connect(C0), + #{nst := NST} = proplists:get_value(extra, emqtt:info(C0)), + emqtt:disconnect(C0), + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5}, + {nst, NST} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + Cid = proplists:get_value(clientid, emqtt:info(C)), + ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]). + +t_conn_without_ctrl_stream(Config) -> + erlang:process_flag(trap_exit, true), + {ok, Conn} = quicer:connect( + {127, 0, 0, 1}, + ?config(port, Config), + [{alpn, ["mqtt"]}, {verify, none}], + 3000 + ), receive - {publish, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]); - _Other -> - receive_messages(Count, Msgs) - after 1000 -> - Msgs + {quic, transport_shutdown, Conn, _} -> ok end. + +t_data_stream_race_ctrl_stream(Config) -> + erlang:process_flag(trap_exit, true), + {ok, C0} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C0), + #{nst := NST} = proplists:get_value(extra, emqtt:info(C0)), + emqtt:disconnect(C0), + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5}, + {nst, NST} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + Cid = proplists:get_value(clientid, emqtt:info(C)), + ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]). + +t_multi_streams_sub_0_rtt(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C0), + {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + ok = emqtt:open_quic_connection(C), + ok = emqtt:quic_mqtt_connect(C), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + #{}, + <<"qos 2 1">>, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + {ok, _} = emqtt:quic_connect(C), + receive + {publish, #{ + client_pid := C0, + payload := <<"qos 2 1">>, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, + ok = emqtt:disconnect(C), + ok = emqtt:disconnect(C0). + +t_multi_streams_sub_0_rtt_large_payload(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + Payload = binary:copy(<<"qos 2 1">>, 1600), + {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C0), + {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + ok = emqtt:open_quic_connection(C), + ok = emqtt:quic_mqtt_connect(C), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + #{}, + Payload, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + {ok, _} = emqtt:quic_connect(C), + receive + {publish, #{ + client_pid := C0, + payload := Payload, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, + ok = emqtt:disconnect(C), + ok = emqtt:disconnect(C0). + +%% @doc verify data stream can continue after 0-RTT handshake +t_multi_streams_sub_0_rtt_stream_data_cont(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + Payload = binary:copy(<<"qos 2 1">>, 1600), + {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C0), + {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + ok = emqtt:open_quic_connection(C), + ok = emqtt:quic_mqtt_connect(C), + {ok, PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + #{}, + Payload, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + {ok, _} = emqtt:quic_connect(C), + receive + {publish, #{ + client_pid := C0, + payload := Payload, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, + Payload2 = <<"2nd part", Payload/binary>>, + ok = emqtt:publish_async( + C, + PubVia, + Topic, + #{}, + Payload2, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + receive + {publish, #{ + client_pid := C0, + payload := Payload2, + qos := RecQos, + topic := Topic + }} -> + ok; + Other2 -> + ct:fail("unexpected recv ~p", [Other2]) + after 100 -> + ct:fail("not received") + end, + ok = emqtt:disconnect(C), + ok = emqtt:disconnect(C0). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- +send_and_recv_with(Sock) -> + {ok, {IP, _}} = emqtt_quic:sockname(Sock), + ?assert(lists:member(tuple_size(IP), [4, 8])), + ok = emqtt_quic:send(Sock, <<"ping">>), + emqtt_quic:setopts(Sock, [{active, false}]), + {ok, <<"pong">>} = emqtt_quic:recv(Sock, 0), + ok = emqtt_quic:setopts(Sock, [{active, 100}]), + {ok, Stats} = emqtt_quic:getstat(Sock, [send_cnt, recv_cnt]), + %% connection level counters, not stream level + [{send_cnt, _}, {recv_cnt, _}] = Stats. + +certfile(Config) -> + filename:join([test_dir(Config), "certs", "test.crt"]). + +keyfile(Config) -> + filename:join([test_dir(Config), "certs", "test.key"]). + +test_dir(Config) -> + filename:dirname(filename:dirname(proplists:get_value(data_dir, Config))). + +recv_pub(Count) -> + recv_pub(Count, []). + +recv_pub(0, Acc) -> + lists:reverse(Acc); +recv_pub(Count, Acc) -> + receive + {publish, _Prop} = Pub -> + recv_pub(Count - 1, [Pub | Acc]) + after 100 -> + timeout + end. + +all_tc() -> + code:add_patha(filename:join(code:lib_dir(emqx), "ebin/")), + emqx_common_test_helpers:all(?MODULE). + +-spec calc_qos(0 | 1 | 2, 0 | 1 | 2) -> 0 | 1 | 2. +calc_qos(PubQos, SubQos) -> + if + PubQos > SubQos -> + SubQos; + SubQos > PubQos -> + PubQos; + true -> + PubQos + end. +-spec calc_pkt_id(0 | 1 | 2, non_neg_integer()) -> undefined | non_neg_integer(). +calc_pkt_id(0, _Id) -> + undefined; +calc_pkt_id(1, Id) -> + Id; +calc_pkt_id(2, Id) -> + Id. + +-spec start_emqx_quic(inet:port_number()) -> ok. +start_emqx_quic(UdpPort) -> + emqx_common_test_helpers:start_apps([]), + application:ensure_all_started(quicer), + emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort). + +-spec stop_emqx() -> ok. +stop_emqx() -> + emqx_common_test_helpers:stop_apps([]). + +%% select a random port picked by OS +-spec select_port() -> inet:port_number(). +select_port() -> + {ok, S} = gen_udp:open(0, [{reuseaddr, true}]), + {ok, {_, Port}} = inet:sockname(S), + gen_udp:close(S), + case os:type() of + {unix, darwin} -> + %% in MacOS, still get address_in_use after close port + timer:sleep(500); + _ -> + skip + end, + ct:pal("select port: ~p", [Port]), + Port.