diff --git a/apps/emqx/src/emqx_quic_data_stream.erl b/apps/emqx/src/emqx_quic_data_stream.erl index bea0d37e1..2aa3ad4f7 100644 --- a/apps/emqx/src/emqx_quic_data_stream.erl +++ b/apps/emqx/src/emqx_quic_data_stream.erl @@ -83,7 +83,7 @@ post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Cha {ok, S}; post_handoff(Stream, {PS, Serialize, Channel}, S) -> ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}), - quicer:setopt(Stream, active, true), + quicer:setopt(Stream, active, 10), {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. %% @@ -301,8 +301,8 @@ handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir : is_list(Packets) -> OutBin = [serialize_packet(P, Serialize) || P <- filter_disallowed_out(Packets)], - %% @TODO in which case shall we use sync send? - Res = quicer:async_send(Stream, OutBin), + %% Send data async but still want send feedback via {quic, send_complete, ...} + Res = quicer:async_send(Stream, OutBin, ?QUICER_SEND_FLAG_SYNC), ?TRACE("MQTT", "mqtt_packet_sent", #{packets => Packets}), [ok = inc_outgoing_stats(P) || P <- Packets], Res. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index ee764cdc5..88cf4b7c3 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -183,7 +183,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> end. async_send({quic, _Conn, Stream, _Info}, Data, _Options) -> - case quicer:send(Stream, Data) of + case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of {ok, _Len} -> ok; Other -> Other end. diff --git a/apps/emqx/test/emqtt_quic_SUITE.erl b/apps/emqx/test/emqtt_quic_SUITE.erl index cfb9d4ae4..f926c2f3e 100644 --- a/apps/emqx/test/emqtt_quic_SUITE.erl +++ b/apps/emqx/test/emqtt_quic_SUITE.erl @@ -71,7 +71,9 @@ groups() -> {sub_qos2, [{group, qos}]}, {qos, [ t_multi_streams_sub, + t_multi_streams_pub_5x100, t_multi_streams_pub_parallel, + t_multi_streams_pub_parallel_no_blocking, t_multi_streams_sub_pub_async, t_multi_streams_sub_pub_sync, t_multi_streams_unsub, @@ -83,6 +85,8 @@ groups() -> t_multi_streams_kill_sub_stream, t_multi_streams_packet_too_large, t_multi_streams_sub_0_rtt, + t_multi_streams_sub_0_rtt_large_payload, + t_multi_streams_sub_0_rtt_stream_data_cont, t_conn_change_client_addr ]}, @@ -347,6 +351,36 @@ t_multi_streams_sub(Config) -> end, ok = emqtt:disconnect(C). +t_multi_streams_pub_5x100(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + + PubVias = lists:map( + fun(_N) -> + {ok, Via} = emqtt:start_data_stream(C, []), + Via + end, + lists:seq(1, 5) + ), + [ + begin + case emqtt:publish_via(C, PVia, Topic, #{}, <<"stream data ", N>>, [{qos, PubQos}]) of + ok when PubQos == 0 -> ok; + {ok, _} -> ok + end, + 0 == (N rem 10) andalso timer:sleep(10) + end + || N <- lists:seq(1, 100), PVia <- PubVias + ], + ?assert(timeout =/= recv_pub(500)), + ok = emqtt:disconnect(C). + t_multi_streams_pub_parallel(Config) -> PubQos = ?config(pub_qos, Config), SubQos = ?config(sub_qos, Config), @@ -400,6 +434,60 @@ t_multi_streams_pub_parallel(Config) -> ), ok = emqtt:disconnect(C). +%% @doc test two pub streams, one send incomplete MQTT packet() can not block another. +t_multi_streams_pub_parallel_no_blocking(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId2 = calc_pkt_id(RecQos, 1), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + Drop = <<"stream data 1">>, + meck:new(emqtt_quic, [passthrough, no_history]), + meck:expect(emqtt_quic, send, fun(Sock, IoList) -> + case lists:last(IoList) == Drop of + true -> + ct:pal("meck droping ~p", [Drop]), + meck:passthrough([Sock, IoList -- [Drop]]); + false -> + meck:passthrough([Sock, IoList]) + end + end), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + Drop, + [{qos, PubQos}], + undefined + ), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<"stream data 2">>, + qos := RecQos, + topic := Topic + }} + ], + PubRecvs + ), + meck:unload(emqtt_quic), + ?assertEqual(timeout, recv_pub(1)), + ok = emqtt:disconnect(C). + t_multi_streams_packet_boundary(Config) -> PubQos = ?config(pub_qos, Config), SubQos = ?config(sub_qos, Config), @@ -1425,6 +1513,114 @@ t_multi_streams_sub_0_rtt(Config) -> ok = emqtt:disconnect(C), ok = emqtt:disconnect(C0). +t_multi_streams_sub_0_rtt_large_payload(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + Payload = binary:copy(<<"qos 2 1">>, 1600), + {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C0), + {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + ok = emqtt:open_quic_connection(C), + ok = emqtt:quic_mqtt_connect(C), + ok = emqtt:publish_async( + C, + {new_data_stream, []}, + Topic, + #{}, + Payload, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + {ok, _} = emqtt:quic_connect(C), + receive + {publish, #{ + client_pid := C0, + payload := Payload, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, + ok = emqtt:disconnect(C), + ok = emqtt:disconnect(C0). + +%% @doc verify data stream can continue after 0-RTT handshake +t_multi_streams_sub_0_rtt_stream_data_cont(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + Payload = binary:copy(<<"qos 2 1">>, 1600), + {ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C0), + {ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + ok = emqtt:open_quic_connection(C), + ok = emqtt:quic_mqtt_connect(C), + {ok, PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + #{}, + Payload, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + {ok, _} = emqtt:quic_connect(C), + receive + {publish, #{ + client_pid := C0, + payload := Payload, + qos := RecQos, + topic := Topic + }} -> + ok; + Other -> + ct:fail("unexpected recv ~p", [Other]) + after 100 -> + ct:fail("not received") + end, + Payload2 = <<"2nd part", Payload/binary>>, + ok = emqtt:publish_async( + C, + PubVia, + Topic, + #{}, + Payload2, + [{qos, PubQos}], + infinity, + fun(_) -> ok end + ), + receive + {publish, #{ + client_pid := C0, + payload := Payload2, + qos := RecQos, + topic := Topic + }} -> + ok; + Other2 -> + ct:fail("unexpected recv ~p", [Other2]) + after 100 -> + ct:fail("not received") + end, + ok = emqtt:disconnect(C), + ok = emqtt:disconnect(C0). + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------