diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a12df9c64..e82adc786 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1136,7 +1136,6 @@ do_deliver(Publishes, Channel) when is_list(Publishes) -> {Packets, NChannel} = lists:foldl( fun(Publish, {Acc, Chann}) -> - %% @FIXME perf: list append with copy left list {Packets, NChann} = do_deliver(Publish, Chann), {Packets ++ Acc, NChann} end, diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index be420d65e..ff3ee81a9 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -118,7 +118,7 @@ %% limiter timers limiter_timer :: undefined | reference(), - %% QUIC conn pid if is a pid + %% QUIC conn owner pid if in use. quic_conn_pid :: maybe(pid()) }). @@ -336,7 +336,6 @@ init_state( Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg), FrameOpts = #{ - %% @TODO:q what is strict_mode? strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]), max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size]) }, diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index b6d3c661c..025790ef7 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -661,14 +661,14 @@ t_multi_streams_packet_too_large(Config) -> PubQos = ?config(pub_qos, Config), SubQos = ?config(sub_qos, Config), Topic = atom_to_binary(?FUNCTION_NAME), - meck:new(emqx_frame, [passthrough, no_history]), - ok = meck:expect( - emqx_frame, - serialize_opts, - fun(#mqtt_packet_connect{proto_ver = ProtoVer}) -> - #{version => ProtoVer, max_size => 1024} - end - ), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + PktId2 = calc_pkt_id(RecQos, 2), + PktId3 = calc_pkt_id(RecQos, 3), + + OldMax = emqx_config:get_zone_conf(default, [mqtt, max_packet_size]), + emqx_config:put_zone_conf(default, [mqtt, max_packet_size], 1000), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, _} = emqtt:quic_connect(C), {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), @@ -678,13 +678,95 @@ t_multi_streams_packet_too_large(Config) -> C, PubVia, Topic, - binary:copy(<<"stream data 1">>, 1024), + <<"stream data 1">>, [{qos, PubQos}], undefined ), + + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 2">>, + [{qos, PubQos}], + undefined + ), + + PubRecvs = recv_pub(2), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<"stream data 1">>, + qos := RecQos, + topic := Topic + }}, + {publish, #{ + client_pid := C, + packet_id := PktId2, + payload := <<"stream data 2">>, + qos := RecQos, + topic := Topic + }} + ], + PubRecvs + ), + + {ok, PubVia2} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia2, + Topic, + binary:copy(<<"too large">>, 200), + [{qos, PubQos}], + undefined + ), + timer:sleep(200), + ?assert(is_list(emqtt:info(C))), + + timeout = recv_pub(1), + + %% send large payload on stream 1 + ok = emqtt:publish_async( + C, + PubVia, + Topic, + binary:copy(<<"too large">>, 200), + [{qos, PubQos}], + undefined + ), + timer:sleep(200), timeout = recv_pub(1), ?assert(is_list(emqtt:info(C))), - ok = meck:unload(emqx_frame), + + %% Connection could be kept + {error, stm_send_error, _} = quicer:send(via_stream(PubVia), <<1>>), + {error, stm_send_error, _} = quicer:send(via_stream(PubVia2), <<1>>), + %% We could send data over new stream + {ok, PubVia3} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia3, + Topic, + <<"stream data 3">>, + [{qos, PubQos}], + undefined + ), + [ + {publish, #{ + client_pid := C, + packet_id := PktId3, + payload := <<"stream data 3">>, + qos := RecQos, + topic := Topic + }} + ] = recv_pub(1), + timer:sleep(200), + + ?assert(is_list(emqtt:info(C))), + + emqx_config:put_zone_conf(default, [mqtt, max_packet_size], OldMax), ok = emqtt:disconnect(C). t_conn_change_client_addr(Config) -> @@ -1758,3 +1840,8 @@ select_port() -> end, ct:pal("select port: ~p", [Port]), Port. + +-spec via_stream({quic, quicer:connection_handle(), quicer:stream_handle()}) -> + quicer:stream_handle(). +via_stream({quic, _Conn, Stream}) -> + Stream.