feat(quic): 0-RTT multi-streams data

This commit is contained in:
William Yang 2023-01-06 13:09:25 +01:00
parent 71d3148544
commit 1e8b2e247e
4 changed files with 104 additions and 59 deletions

View File

@ -174,7 +174,8 @@ new_stream(
limiter => Limiter, limiter => Limiter,
parse_state => PS, parse_state => PS,
channel => Channel, channel => Channel,
serialize => Serialize serialize => Serialize,
quic_event_mask => ?QUICER_STREAM_EVENT_MASK_START_COMPLETE
}, },
{ok, NewStreamOwner} = quicer_stream:start_link( {ok, NewStreamOwner} = quicer_stream:start_link(
emqx_quic_data_stream, emqx_quic_data_stream,

View File

@ -25,14 +25,12 @@
-include_lib("quicer/include/quicer.hrl"). -include_lib("quicer/include/quicer.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-behaviour(quicer_stream). -behaviour(quicer_remote_stream).
%% Connection Callbacks %% Connection Callbacks
-export([ -export([
init_handoff/4, init_handoff/4,
post_handoff/3, post_handoff/3,
new_stream/3,
start_completed/3,
send_complete/3, send_complete/3,
peer_send_shutdown/3, peer_send_shutdown/3,
peer_send_aborted/3, peer_send_aborted/3,
@ -79,17 +77,15 @@ init_handoff(
%% %%
%% @TODO -spec %% @TODO -spec
%% %%
post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Channel}, S) ->
%% Channel isn't ready yet.
%% Data stream should wait for activate call with ?MODULE:activate_data/2
{ok, S};
post_handoff(Stream, {PS, Serialize, Channel}, S) -> post_handoff(Stream, {PS, Serialize, Channel}, S) ->
?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}), ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}),
quicer:setopt(Stream, active, true), quicer:setopt(Stream, active, true),
{ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
%%
%% @doc when this proc is assigned to the owner of new stream
%%
new_stream(Stream, #{flags := Flags}, Connection) ->
{ok, init_state(Stream, Connection, Flags)}.
%% %%
%% @doc for local initiated stream %% @doc for local initiated stream
%% %%
@ -125,12 +121,6 @@ send_complete(_Stream, true = _IsCanceled, S) ->
send_shutdown_complete(_Stream, _Flags, S) -> send_shutdown_complete(_Stream, _Flags, S) ->
{ok, S}. {ok, S}.
start_completed(_Stream, #{status := success, stream_id := StreamId}, S) ->
{ok, S#{stream_id => StreamId}};
start_completed(_Stream, #{status := Other}, S) ->
%% or we could retry
{stop, {start_fail, Other}, S}.
handle_stream_data( handle_stream_data(
Stream, Stream,
Bin, Bin,
@ -208,7 +198,18 @@ stream_closed(
{stop, normal, S}. {stop, normal, S}.
handle_call(Call, _From, S) -> handle_call(Call, _From, S) ->
do_handle_call(Call, S). case do_handle_call(Call, S) of
{ok, NewS} ->
{reply, ok, NewS};
{error, Reason, NewS} ->
{reply, {error, Reason}, NewS};
{{continue, _} = Cont, NewS} ->
{reply, ok, NewS, Cont};
{hibernate, NewS} ->
{reply, ok, NewS, hibernate};
{stop, Reason, NewS} ->
{stop, Reason, {stopped, Reason}, NewS}
end.
handle_continue(handle_appl_msg, #{task_queue := Q} = S) -> handle_continue(handle_appl_msg, #{task_queue := Q} = S) ->
case queue:out(Q) of case queue:out(Q) of
@ -390,8 +391,8 @@ do_handle_call(
?SLOG(error, #{msg => "set stream active failed", error => E}), ?SLOG(error, #{msg => "set stream active failed", error => E}),
{stop, E, NewS} {stop, E, NewS}
end; end;
do_handle_call(_Call, S) -> do_handle_call(_Call, _S) ->
{reply, {error, unimpl}, S}. {error, unimpl}.
%% @doc return reserved order of Packets %% @doc return reserved order of Packets
parse_incoming(Data, PS) -> parse_incoming(Data, PS) ->

View File

@ -17,7 +17,7 @@
%% MQTT/QUIC Stream %% MQTT/QUIC Stream
-module(emqx_quic_stream). -module(emqx_quic_stream).
-behaviour(quicer_stream). -behaviour(quicer_remote_stream).
%% emqx transport Callbacks %% emqx transport Callbacks
-export([ -export([
@ -57,18 +57,14 @@
-type stream_handle() :: quicer:stream_handle(). -type stream_handle() :: quicer:stream_handle().
-export([ -export([
init_handoff/4,
new_stream/3, new_stream/3,
start_completed/3,
send_complete/3, send_complete/3,
peer_send_shutdown/3, peer_send_shutdown/3,
peer_send_aborted/3, peer_send_aborted/3,
peer_receive_aborted/3, peer_receive_aborted/3,
send_shutdown_complete/3, send_shutdown_complete/3,
stream_closed/3, stream_closed/3,
peer_accepted/3, passive/3
passive/3,
handle_call/4
]). ]).
-export_type([socket/0]). -export_type([socket/0]).
@ -195,21 +191,10 @@ async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
%%% %%%
%%% quicer stream callbacks %%% quicer stream callbacks
%%% %%%
-spec init_handoff(stream_handle(), #{}, quicer:connection_handle(), #{}) -> cb_ret().
init_handoff(_Stream, _StreamOpts, _Conn, _Flags) ->
%% stream owner already set while starts.
{stop, unimpl}.
-spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret(). -spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret().
new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) -> new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) ->
{stop, unimpl}. {stop, unimpl}.
-spec peer_accepted(stream_handle(), undefined, cb_data()) -> cb_ret().
peer_accepted(_Stream, undefined, S) ->
%% We just ignore it
{ok, S}.
-spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, S) -> peer_receive_aborted(Stream, ErrorCode, S) ->
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
@ -237,19 +222,6 @@ send_complete(_Stream, true = _IsCancelled, S) ->
send_shutdown_complete(_Stream, _IsGraceful, S) -> send_shutdown_complete(_Stream, _IsGraceful, S) ->
{ok, S}. {ok, S}.
-spec start_completed(stream_handle(), quicer:stream_start_completed_props(), cb_data()) ->
cb_ret().
start_completed(_Stream, #{status := success, stream_id := StreamId} = Prop, S) ->
?SLOG(debug, Prop),
{ok, S#{stream_id => StreamId}};
start_completed(_Stream, #{status := stream_limit_reached, stream_id := _StreamId} = Prop, _S) ->
?SLOG(error, #{message => start_completed}, Prop),
{stop, stream_limit_reached};
start_completed(_Stream, #{status := Other} = Prop, S) ->
?SLOG(error, Prop),
%% or we could retry?
{stop, {start_fail, Other}, S}.
%% Local stream, Unidir %% Local stream, Unidir
%% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data()) %% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data())
%% -> cb_ret(). %% -> cb_ret().
@ -299,9 +271,6 @@ stream_closed(
%% a msg to be processed %% a msg to be processed
{ok, {sock_closed, Status}, S}. {ok, {sock_closed, Status}, S}.
handle_call(_Stream, _Request, _Opts, S) ->
{error, unimpl, S}.
%%% %%%
%%% Internals %%% Internals
%%% %%%

View File

@ -77,12 +77,12 @@ groups() ->
t_multi_streams_unsub, t_multi_streams_unsub,
t_multi_streams_corr_topic, t_multi_streams_corr_topic,
t_multi_streams_unsub_via_other, t_multi_streams_unsub_via_other,
t_multi_streams_shutdown_data_stream_abortive,
t_multi_streams_dup_sub, t_multi_streams_dup_sub,
t_multi_streams_packet_boundary, t_multi_streams_packet_boundary,
t_multi_streams_packet_malform, t_multi_streams_packet_malform,
t_multi_streams_kill_sub_stream, t_multi_streams_kill_sub_stream,
t_multi_streams_packet_too_large, t_multi_streams_packet_too_large,
t_multi_streams_sub_0_rtt,
t_conn_change_client_addr t_conn_change_client_addr
]}, ]},
@ -93,10 +93,22 @@ groups() ->
{group, abort_send_recv_shutdown} {group, abort_send_recv_shutdown}
]}, ]},
{graceful_shutdown, [{group, ctrl_stream_shutdown}]}, {graceful_shutdown, [
{abort_recv_shutdown, [{group, ctrl_stream_shutdown}]}, {group, ctrl_stream_shutdown},
{abort_send_shutdown, [{group, ctrl_stream_shutdown}]}, {group, data_stream_shutdown}
{abort_send_recv_shutdown, [{group, ctrl_stream_shutdown}]}, ]},
{abort_recv_shutdown, [
{group, ctrl_stream_shutdown},
{group, data_stream_shutdown}
]},
{abort_send_shutdown, [
{group, ctrl_stream_shutdown},
{group, data_stream_shutdown}
]},
{abort_send_recv_shutdown, [
{group, ctrl_stream_shutdown},
{group, data_stream_shutdown}
]},
{ctrl_stream_shutdown, [ {ctrl_stream_shutdown, [
t_multi_streams_shutdown_ctrl_stream, t_multi_streams_shutdown_ctrl_stream,
@ -104,6 +116,8 @@ groups() ->
t_multi_streams_remote_shutdown, t_multi_streams_remote_shutdown,
t_multi_streams_remote_shutdown_with_reconnect t_multi_streams_remote_shutdown_with_reconnect
]}, ]},
{data_stream_shutdown, [t_multi_streams_shutdown_data_stream]},
{misc, [ {misc, [
t_conn_silent_close, t_conn_silent_close,
t_client_conn_bump_streams, t_client_conn_bump_streams,
@ -1004,7 +1018,7 @@ t_multi_streams_unsub_via_other(Config) ->
), ),
ok = emqtt:disconnect(C). ok = emqtt:disconnect(C).
t_multi_streams_shutdown_data_stream_abortive(Config) -> t_multi_streams_shutdown_data_stream(Config) ->
PubQos = ?config(pub_qos, Config), PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config), SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos), RecQos = calc_qos(PubQos, SubQos),
@ -1045,7 +1059,7 @@ t_multi_streams_shutdown_data_stream_abortive(Config) ->
#{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)), #{data_stream_socks := [PubVia | _]} = proplists:get_value(extra, emqtt:info(C)),
{quic, _Conn, DataStream} = PubVia, {quic, _Conn, DataStream} = PubVia,
quicer:shutdown_stream(DataStream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND, 500, 100), quicer:shutdown_stream(DataStream, ?config(stream_shutdown_flag, Config), 500, 100),
timer:sleep(500), timer:sleep(500),
%% Still alive %% Still alive
?assert(is_list(emqtt:info(C))). ?assert(is_list(emqtt:info(C))).
@ -1351,6 +1365,66 @@ t_conn_without_ctrl_stream(Config) ->
{quic, transport_shutdown, Conn, _} -> ok {quic, transport_shutdown, Conn, _} -> ok
end. end.
t_data_stream_race_ctrl_stream(Config) ->
erlang:process_flag(trap_exit, true),
{ok, C0} = emqtt:start_link([
{proto_ver, v5},
{connect_timeout, 5}
| Config
]),
{ok, _} = emqtt:quic_connect(C0),
#{nst := NST} = proplists:get_value(extra, emqtt:info(C0)),
emqtt:disconnect(C0),
{ok, C} = emqtt:start_link([
{proto_ver, v5},
{connect_timeout, 5},
{nst, NST}
| Config
]),
{ok, _} = emqtt:quic_connect(C),
Cid = proplists:get_value(clientid, emqtt:info(C)),
ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]).
t_multi_streams_sub_0_rtt(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos),
Topic = atom_to_binary(?FUNCTION_NAME),
{ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C0),
{ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [
{Topic, [{qos, SubQos}]}
]),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
ok = emqtt:open_quic_connection(C),
ok = emqtt:quic_mqtt_connect(C),
ok = emqtt:publish_async(
C,
{new_data_stream, []},
Topic,
#{},
<<"qos 2 1">>,
[{qos, PubQos}],
infinity,
fun(_) -> ok end
),
{ok, _} = emqtt:quic_connect(C),
receive
{publish, #{
client_pid := C0,
payload := <<"qos 2 1">>,
qos := RecQos,
topic := Topic
}} ->
ok;
Other ->
ct:fail("unexpected recv ~p", [Other])
after 100 ->
ct:fail("not received")
end,
ok = emqtt:disconnect(C),
ok = emqtt:disconnect(C0).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------