test(quic): enhance large payload test
This commit is contained in:
parent
00f615a1e3
commit
2a6cdd9da6
|
@ -1136,7 +1136,6 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
|
||||||
{Packets, NChannel} =
|
{Packets, NChannel} =
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Publish, {Acc, Chann}) ->
|
fun(Publish, {Acc, Chann}) ->
|
||||||
%% @FIXME perf: list append with copy left list
|
|
||||||
{Packets, NChann} = do_deliver(Publish, Chann),
|
{Packets, NChann} = do_deliver(Publish, Chann),
|
||||||
{Packets ++ Acc, NChann}
|
{Packets ++ Acc, NChann}
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -118,7 +118,7 @@
|
||||||
%% limiter timers
|
%% limiter timers
|
||||||
limiter_timer :: undefined | reference(),
|
limiter_timer :: undefined | reference(),
|
||||||
|
|
||||||
%% QUIC conn pid if is a pid
|
%% QUIC conn owner pid if in use.
|
||||||
quic_conn_pid :: maybe(pid())
|
quic_conn_pid :: maybe(pid())
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -336,7 +336,6 @@ init_state(
|
||||||
Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg),
|
Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg),
|
||||||
|
|
||||||
FrameOpts = #{
|
FrameOpts = #{
|
||||||
%% @TODO:q what is strict_mode?
|
|
||||||
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
|
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
|
||||||
max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
|
max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
|
||||||
},
|
},
|
||||||
|
|
|
@ -661,14 +661,14 @@ t_multi_streams_packet_too_large(Config) ->
|
||||||
PubQos = ?config(pub_qos, Config),
|
PubQos = ?config(pub_qos, Config),
|
||||||
SubQos = ?config(sub_qos, Config),
|
SubQos = ?config(sub_qos, Config),
|
||||||
Topic = atom_to_binary(?FUNCTION_NAME),
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
||||||
meck:new(emqx_frame, [passthrough, no_history]),
|
RecQos = calc_qos(PubQos, SubQos),
|
||||||
ok = meck:expect(
|
PktId1 = calc_pkt_id(RecQos, 1),
|
||||||
emqx_frame,
|
PktId2 = calc_pkt_id(RecQos, 2),
|
||||||
serialize_opts,
|
PktId3 = calc_pkt_id(RecQos, 3),
|
||||||
fun(#mqtt_packet_connect{proto_ver = ProtoVer}) ->
|
|
||||||
#{version => ProtoVer, max_size => 1024}
|
OldMax = emqx_config:get_zone_conf(default, [mqtt, max_packet_size]),
|
||||||
end
|
emqx_config:put_zone_conf(default, [mqtt, max_packet_size], 1000),
|
||||||
),
|
|
||||||
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
|
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
|
||||||
{ok, _} = emqtt:quic_connect(C),
|
{ok, _} = emqtt:quic_connect(C),
|
||||||
{ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]),
|
{ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]),
|
||||||
|
@ -678,13 +678,95 @@ t_multi_streams_packet_too_large(Config) ->
|
||||||
C,
|
C,
|
||||||
PubVia,
|
PubVia,
|
||||||
Topic,
|
Topic,
|
||||||
binary:copy(<<"stream data 1">>, 1024),
|
<<"stream data 1">>,
|
||||||
[{qos, PubQos}],
|
[{qos, PubQos}],
|
||||||
undefined
|
undefined
|
||||||
),
|
),
|
||||||
|
|
||||||
|
ok = emqtt:publish_async(
|
||||||
|
C,
|
||||||
|
PubVia,
|
||||||
|
Topic,
|
||||||
|
<<"stream data 2">>,
|
||||||
|
[{qos, PubQos}],
|
||||||
|
undefined
|
||||||
|
),
|
||||||
|
|
||||||
|
PubRecvs = recv_pub(2),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
{publish, #{
|
||||||
|
client_pid := C,
|
||||||
|
packet_id := PktId1,
|
||||||
|
payload := <<"stream data 1">>,
|
||||||
|
qos := RecQos,
|
||||||
|
topic := Topic
|
||||||
|
}},
|
||||||
|
{publish, #{
|
||||||
|
client_pid := C,
|
||||||
|
packet_id := PktId2,
|
||||||
|
payload := <<"stream data 2">>,
|
||||||
|
qos := RecQos,
|
||||||
|
topic := Topic
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
PubRecvs
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, PubVia2} = emqtt:start_data_stream(C, []),
|
||||||
|
ok = emqtt:publish_async(
|
||||||
|
C,
|
||||||
|
PubVia2,
|
||||||
|
Topic,
|
||||||
|
binary:copy(<<"too large">>, 200),
|
||||||
|
[{qos, PubQos}],
|
||||||
|
undefined
|
||||||
|
),
|
||||||
|
timer:sleep(200),
|
||||||
|
?assert(is_list(emqtt:info(C))),
|
||||||
|
|
||||||
|
timeout = recv_pub(1),
|
||||||
|
|
||||||
|
%% send large payload on stream 1
|
||||||
|
ok = emqtt:publish_async(
|
||||||
|
C,
|
||||||
|
PubVia,
|
||||||
|
Topic,
|
||||||
|
binary:copy(<<"too large">>, 200),
|
||||||
|
[{qos, PubQos}],
|
||||||
|
undefined
|
||||||
|
),
|
||||||
|
timer:sleep(200),
|
||||||
timeout = recv_pub(1),
|
timeout = recv_pub(1),
|
||||||
?assert(is_list(emqtt:info(C))),
|
?assert(is_list(emqtt:info(C))),
|
||||||
ok = meck:unload(emqx_frame),
|
|
||||||
|
%% Connection could be kept
|
||||||
|
{error, stm_send_error, _} = quicer:send(via_stream(PubVia), <<1>>),
|
||||||
|
{error, stm_send_error, _} = quicer:send(via_stream(PubVia2), <<1>>),
|
||||||
|
%% We could send data over new stream
|
||||||
|
{ok, PubVia3} = emqtt:start_data_stream(C, []),
|
||||||
|
ok = emqtt:publish_async(
|
||||||
|
C,
|
||||||
|
PubVia3,
|
||||||
|
Topic,
|
||||||
|
<<"stream data 3">>,
|
||||||
|
[{qos, PubQos}],
|
||||||
|
undefined
|
||||||
|
),
|
||||||
|
[
|
||||||
|
{publish, #{
|
||||||
|
client_pid := C,
|
||||||
|
packet_id := PktId3,
|
||||||
|
payload := <<"stream data 3">>,
|
||||||
|
qos := RecQos,
|
||||||
|
topic := Topic
|
||||||
|
}}
|
||||||
|
] = recv_pub(1),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
?assert(is_list(emqtt:info(C))),
|
||||||
|
|
||||||
|
emqx_config:put_zone_conf(default, [mqtt, max_packet_size], OldMax),
|
||||||
ok = emqtt:disconnect(C).
|
ok = emqtt:disconnect(C).
|
||||||
|
|
||||||
t_conn_change_client_addr(Config) ->
|
t_conn_change_client_addr(Config) ->
|
||||||
|
@ -1758,3 +1840,8 @@ select_port() ->
|
||||||
end,
|
end,
|
||||||
ct:pal("select port: ~p", [Port]),
|
ct:pal("select port: ~p", [Port]),
|
||||||
Port.
|
Port.
|
||||||
|
|
||||||
|
-spec via_stream({quic, quicer:connection_handle(), quicer:stream_handle()}) ->
|
||||||
|
quicer:stream_handle().
|
||||||
|
via_stream({quic, _Conn, Stream}) ->
|
||||||
|
Stream.
|
||||||
|
|
Loading…
Reference in New Issue