test(quic): improve coverage

This commit is contained in:
William Yang 2023-01-08 22:24:32 +01:00
parent 5764994436
commit f65ac5422e
3 changed files with 200 additions and 4 deletions

View File

@ -83,7 +83,7 @@ post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Cha
{ok, S};
post_handoff(Stream, {PS, Serialize, Channel}, S) ->
?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}),
quicer:setopt(Stream, active, true),
quicer:setopt(Stream, active, 10),
{ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
%%
@ -301,8 +301,8 @@ handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir :
is_list(Packets)
->
OutBin = [serialize_packet(P, Serialize) || P <- filter_disallowed_out(Packets)],
%% @TODO in which case shall we use sync send?
Res = quicer:async_send(Stream, OutBin),
%% Send data async but still want send feedback via {quic, send_complete, ...}
Res = quicer:async_send(Stream, OutBin, ?QUICER_SEND_FLAG_SYNC),
?TRACE("MQTT", "mqtt_packet_sent", #{packets => Packets}),
[ok = inc_outgoing_stats(P) || P <- Packets],
Res.

View File

@ -183,7 +183,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
end.
async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
case quicer:send(Stream, Data) of
case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of
{ok, _Len} -> ok;
Other -> Other
end.

View File

@ -71,7 +71,9 @@ groups() ->
{sub_qos2, [{group, qos}]},
{qos, [
t_multi_streams_sub,
t_multi_streams_pub_5x100,
t_multi_streams_pub_parallel,
t_multi_streams_pub_parallel_no_blocking,
t_multi_streams_sub_pub_async,
t_multi_streams_sub_pub_sync,
t_multi_streams_unsub,
@ -83,6 +85,8 @@ groups() ->
t_multi_streams_kill_sub_stream,
t_multi_streams_packet_too_large,
t_multi_streams_sub_0_rtt,
t_multi_streams_sub_0_rtt_large_payload,
t_multi_streams_sub_0_rtt_stream_data_cont,
t_conn_change_client_addr
]},
@ -347,6 +351,36 @@ t_multi_streams_sub(Config) ->
end,
ok = emqtt:disconnect(C).
t_multi_streams_pub_5x100(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
Topic = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C),
{ok, _, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
{Topic, [{qos, SubQos}]}
]),
PubVias = lists:map(
fun(_N) ->
{ok, Via} = emqtt:start_data_stream(C, []),
Via
end,
lists:seq(1, 5)
),
[
begin
case emqtt:publish_via(C, PVia, Topic, #{}, <<"stream data ", N>>, [{qos, PubQos}]) of
ok when PubQos == 0 -> ok;
{ok, _} -> ok
end,
0 == (N rem 10) andalso timer:sleep(10)
end
|| N <- lists:seq(1, 100), PVia <- PubVias
],
?assert(timeout =/= recv_pub(500)),
ok = emqtt:disconnect(C).
t_multi_streams_pub_parallel(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
@ -400,6 +434,60 @@ t_multi_streams_pub_parallel(Config) ->
),
ok = emqtt:disconnect(C).
%% @doc test two pub streams, one send incomplete MQTT packet() can not block another.
t_multi_streams_pub_parallel_no_blocking(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos),
PktId2 = calc_pkt_id(RecQos, 1),
Topic = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C),
{ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]),
Drop = <<"stream data 1">>,
meck:new(emqtt_quic, [passthrough, no_history]),
meck:expect(emqtt_quic, send, fun(Sock, IoList) ->
case lists:last(IoList) == Drop of
true ->
ct:pal("meck droping ~p", [Drop]),
meck:passthrough([Sock, IoList -- [Drop]]);
false ->
meck:passthrough([Sock, IoList])
end
end),
ok = emqtt:publish_async(
C,
{new_data_stream, []},
Topic,
Drop,
[{qos, PubQos}],
undefined
),
ok = emqtt:publish_async(
C,
{new_data_stream, []},
Topic,
<<"stream data 2">>,
[{qos, PubQos}],
undefined
),
PubRecvs = recv_pub(1),
?assertMatch(
[
{publish, #{
client_pid := C,
packet_id := PktId2,
payload := <<"stream data 2">>,
qos := RecQos,
topic := Topic
}}
],
PubRecvs
),
meck:unload(emqtt_quic),
?assertEqual(timeout, recv_pub(1)),
ok = emqtt:disconnect(C).
t_multi_streams_packet_boundary(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
@ -1425,6 +1513,114 @@ t_multi_streams_sub_0_rtt(Config) ->
ok = emqtt:disconnect(C),
ok = emqtt:disconnect(C0).
t_multi_streams_sub_0_rtt_large_payload(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos),
Topic = atom_to_binary(?FUNCTION_NAME),
Payload = binary:copy(<<"qos 2 1">>, 1600),
{ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C0),
{ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [
{Topic, [{qos, SubQos}]}
]),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
ok = emqtt:open_quic_connection(C),
ok = emqtt:quic_mqtt_connect(C),
ok = emqtt:publish_async(
C,
{new_data_stream, []},
Topic,
#{},
Payload,
[{qos, PubQos}],
infinity,
fun(_) -> ok end
),
{ok, _} = emqtt:quic_connect(C),
receive
{publish, #{
client_pid := C0,
payload := Payload,
qos := RecQos,
topic := Topic
}} ->
ok;
Other ->
ct:fail("unexpected recv ~p", [Other])
after 100 ->
ct:fail("not received")
end,
ok = emqtt:disconnect(C),
ok = emqtt:disconnect(C0).
%% @doc verify data stream can continue after 0-RTT handshake
t_multi_streams_sub_0_rtt_stream_data_cont(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos),
Topic = atom_to_binary(?FUNCTION_NAME),
Payload = binary:copy(<<"qos 2 1">>, 1600),
{ok, C0} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C0),
{ok, _, [SubQos]} = emqtt:subscribe_via(C0, {new_data_stream, []}, #{}, [
{Topic, [{qos, SubQos}]}
]),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
ok = emqtt:open_quic_connection(C),
ok = emqtt:quic_mqtt_connect(C),
{ok, PubVia} = emqtt:start_data_stream(C, []),
ok = emqtt:publish_async(
C,
PubVia,
Topic,
#{},
Payload,
[{qos, PubQos}],
infinity,
fun(_) -> ok end
),
{ok, _} = emqtt:quic_connect(C),
receive
{publish, #{
client_pid := C0,
payload := Payload,
qos := RecQos,
topic := Topic
}} ->
ok;
Other ->
ct:fail("unexpected recv ~p", [Other])
after 100 ->
ct:fail("not received")
end,
Payload2 = <<"2nd part", Payload/binary>>,
ok = emqtt:publish_async(
C,
PubVia,
Topic,
#{},
Payload2,
[{qos, PubQos}],
infinity,
fun(_) -> ok end
),
receive
{publish, #{
client_pid := C0,
payload := Payload2,
qos := RecQos,
topic := Topic
}} ->
ok;
Other2 ->
ct:fail("unexpected recv ~p", [Other2])
after 100 ->
ct:fail("not received")
end,
ok = emqtt:disconnect(C),
ok = emqtt:disconnect(C0).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------