From 1e8b2e247e9d31c754412622ce8cfcf4eb0f4bea Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 6 Jan 2023 13:09:25 +0100 Subject: [PATCH] feat(quic): 0-RTT multi-streams data --- apps/emqx/src/emqx_quic_connection.erl | 3 +- apps/emqx/src/emqx_quic_data_stream.erl | 37 ++++++----- apps/emqx/src/emqx_quic_stream.erl | 35 +--------- apps/emqx/test/emqtt_quic_SUITE.erl | 88 +++++++++++++++++++++++-- 4 files changed, 104 insertions(+), 59 deletions(-) diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 588648483..ef0d9b2e3 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -174,7 +174,8 @@ new_stream( limiter => Limiter, parse_state => PS, channel => Channel, - serialize => Serialize + serialize => Serialize, + quic_event_mask => ?QUICER_STREAM_EVENT_MASK_START_COMPLETE }, {ok, NewStreamOwner} = quicer_stream:start_link( emqx_quic_data_stream, diff --git a/apps/emqx/src/emqx_quic_data_stream.erl b/apps/emqx/src/emqx_quic_data_stream.erl index 61c13bdee..bea0d37e1 100644 --- a/apps/emqx/src/emqx_quic_data_stream.erl +++ b/apps/emqx/src/emqx_quic_data_stream.erl @@ -25,14 +25,12 @@ -include_lib("quicer/include/quicer.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). --behaviour(quicer_stream). +-behaviour(quicer_remote_stream). %% Connection Callbacks -export([ init_handoff/4, post_handoff/3, - new_stream/3, - start_completed/3, send_complete/3, peer_send_shutdown/3, peer_send_aborted/3, @@ -79,17 +77,15 @@ init_handoff( %% %% @TODO -spec %% +post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Channel}, S) -> + %% Channel isn't ready yet. + %% Data stream should wait for activate call with ?MODULE:activate_data/2 + {ok, S}; post_handoff(Stream, {PS, Serialize, Channel}, S) -> ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}), quicer:setopt(Stream, active, true), {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. -%% -%% @doc when this proc is assigned to the owner of new stream -%% -new_stream(Stream, #{flags := Flags}, Connection) -> - {ok, init_state(Stream, Connection, Flags)}. - %% %% @doc for local initiated stream %% @@ -125,12 +121,6 @@ send_complete(_Stream, true = _IsCanceled, S) -> send_shutdown_complete(_Stream, _Flags, S) -> {ok, S}. -start_completed(_Stream, #{status := success, stream_id := StreamId}, S) -> - {ok, S#{stream_id => StreamId}}; -start_completed(_Stream, #{status := Other}, S) -> - %% or we could retry - {stop, {start_fail, Other}, S}. - handle_stream_data( Stream, Bin, @@ -208,7 +198,18 @@ stream_closed( {stop, normal, S}. handle_call(Call, _From, S) -> - do_handle_call(Call, S). + case do_handle_call(Call, S) of + {ok, NewS} -> + {reply, ok, NewS}; + {error, Reason, NewS} -> + {reply, {error, Reason}, NewS}; + {{continue, _} = Cont, NewS} -> + {reply, ok, NewS, Cont}; + {hibernate, NewS} -> + {reply, ok, NewS, hibernate}; + {stop, Reason, NewS} -> + {stop, Reason, {stopped, Reason}, NewS} + end. handle_continue(handle_appl_msg, #{task_queue := Q} = S) -> case queue:out(Q) of @@ -390,8 +391,8 @@ do_handle_call( ?SLOG(error, #{msg => "set stream active failed", error => E}), {stop, E, NewS} end; -do_handle_call(_Call, S) -> - {reply, {error, unimpl}, S}. +do_handle_call(_Call, _S) -> + {error, unimpl}. %% @doc return reserved order of Packets parse_incoming(Data, PS) -> diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 667ddb2b0..ee764cdc5 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -17,7 +17,7 @@ %% MQTT/QUIC Stream -module(emqx_quic_stream). --behaviour(quicer_stream). +-behaviour(quicer_remote_stream). %% emqx transport Callbacks -export([ @@ -57,18 +57,14 @@ -type stream_handle() :: quicer:stream_handle(). -export([ - init_handoff/4, new_stream/3, - start_completed/3, send_complete/3, peer_send_shutdown/3, peer_send_aborted/3, peer_receive_aborted/3, send_shutdown_complete/3, stream_closed/3, - peer_accepted/3, - passive/3, - handle_call/4 + passive/3 ]). -export_type([socket/0]). @@ -195,21 +191,10 @@ async_send({quic, _Conn, Stream, _Info}, Data, _Options) -> %%% %%% quicer stream callbacks %%% - --spec init_handoff(stream_handle(), #{}, quicer:connection_handle(), #{}) -> cb_ret(). -init_handoff(_Stream, _StreamOpts, _Conn, _Flags) -> - %% stream owner already set while starts. - {stop, unimpl}. - -spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret(). new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) -> {stop, unimpl}. --spec peer_accepted(stream_handle(), undefined, cb_data()) -> cb_ret(). -peer_accepted(_Stream, undefined, S) -> - %% We just ignore it - {ok, S}. - -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). peer_receive_aborted(Stream, ErrorCode, S) -> quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), @@ -237,19 +222,6 @@ send_complete(_Stream, true = _IsCancelled, S) -> send_shutdown_complete(_Stream, _IsGraceful, S) -> {ok, S}. --spec start_completed(stream_handle(), quicer:stream_start_completed_props(), cb_data()) -> - cb_ret(). -start_completed(_Stream, #{status := success, stream_id := StreamId} = Prop, S) -> - ?SLOG(debug, Prop), - {ok, S#{stream_id => StreamId}}; -start_completed(_Stream, #{status := stream_limit_reached, stream_id := _StreamId} = Prop, _S) -> - ?SLOG(error, #{message => start_completed}, Prop), - {stop, stream_limit_reached}; -start_completed(_Stream, #{status := Other} = Prop, S) -> - ?SLOG(error, Prop), - %% or we could retry? - {stop, {start_fail, Other}, S}. - %% Local stream, Unidir %% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data()) %% -> cb_ret(). @@ -299,9 +271,6 @@ stream_closed( %% a msg to be processed {ok, {sock_closed, Status}, S}. -handle_call(_Stream, _Request, _Opts, S) -> - {error, unimpl, S}. - %%% %%% Internals %%% diff --git a/apps/emqx/test/emqtt_quic_SUITE.erl b/apps/emqx/test/emqtt_quic_SUITE.erl index 6c19ecdad..cfb9d4ae4 100644 --- a/apps/emqx/test/emqtt_quic_SUITE.erl +++ b/apps/emqx/test/emqtt_quic_SUITE.erl @@ -77,12 +77,12 @@ groups() -> t_multi_streams_unsub, t_multi_streams_corr_topic, t_multi_streams_unsub_via_other, - t_multi_streams_shutdown_data_stream_abortive, t_multi_streams_dup_sub, t_multi_streams_packet_boundary, t_multi_streams_packet_malform, t_multi_streams_kill_sub_stream, t_multi_streams_packet_too_large, + t_multi_streams_sub_0_rtt, t_conn_change_client_addr ]}, @@ -93,10 +93,22 @@ groups() -> {group, abort_send_recv_shutdown} ]}, - {graceful_shutdown, [{group, ctrl_stream_shutdown}]}, - {abort_recv_shutdown, [{group, ctrl_stream_shutdown}]}, - {abort_send_shutdown, [{group, ctrl_stream_shutdown}]}, - {abort_send_recv_shutdown, [{group, ctrl_stream_shutdown}]}, + {graceful_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + {abort_recv_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + {abort_send_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, + {abort_send_recv_shutdown, [ + {group, ctrl_stream_shutdown}, + {group, data_stream_shutdown} + ]}, {ctrl_stream_shutdown, [ t_multi_streams_shutdown_ctrl_stream, @@ -104,6 +116,8 @@ groups() -> t_multi_streams_remote_shutdown, t_multi_streams_remote_shutdown_with_reconnect ]}, + + {data_stream_shutdown, [t_multi_streams_shutdown_data_stream]}, {misc, [ t_conn_silent_close, t_client_conn_bump_streams, @@ -1004,7 +1018,7 @@ t_multi_streams_unsub_via_other(Config) -> ), ok = emqtt:disconnect(C). -t_multi_streams_shutdown_data_stream_abortive(Config) -> +t_multi_streams_shutdown_data_stream(Config) -> PubQos = ?config(pub_qos, Config), SubQos = ?config(sub_qos, Config), RecQos = calc_qos(PubQos, SubQos), @@ -1045,7 +1059,7 @@ t_multi_streams_shutdown_data_stream_abortive(Config) -> #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), {quic, _Conn, DataStream} = PubVia, - quicer:shutdown_stream(DataStream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND, 500, 100), + quicer:shutdown_stream(DataStream, ?config(stream_shutdown_flag, Config), 500, 100), timer:sleep(500), %% Still alive ?assert(is_list(emqtt:info(C))). @@ -1351,6 +1365,66 @@ t_conn_without_ctrl_stream(Config) -> {quic, transport_shutdown, Conn, _} -> ok end. +t_data_stream_race_ctrl_stream(Config) -> + erlang:process_flag(trap_exit, true), + {ok, C0} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C0), + #{nst := NST} = proplists:get_value(extra, emqtt:info(C0)), + emqtt:disconnect(C0), + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {connect_timeout, 5}, + {nst, NST} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + Cid = proplists:get_value(clientid, emqtt:info(C)), + ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]). + +t_multi_streams_sub_0_rtt(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C0), + {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + ok = emqtt:open_quic_connection(C), + ok = emqtt:quic_mqtt_connect(C), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + #{}, + <<"qos 2 1">>, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + {ok, _} = emqtt:quic_connect(C), + receive + {publish, #{ + client_pid := C0, + payload := <<"qos 2 1">>, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, + ok = emqtt:disconnect(C), + ok = emqtt:disconnect(C0). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------