From 0173121a309adea318e06799bd71508d4643aa60 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 22 Dec 2022 22:20:29 +0100 Subject: [PATCH] feat(quic): improve coverage and remove unused code --- apps/emqx/src/emqx_quic_connection.erl | 19 ++- apps/emqx/src/emqx_quic_data_stream.erl | 18 ++- apps/emqx/test/emqtt_quic_SUITE.erl | 151 +++++++++++++++++++++++- 3 files changed, 173 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 8cdd9d5e6..588648483 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -135,20 +135,17 @@ new_conn( %% @doc callback when connection is connected. -spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) -> {ok, cb_state()} | {error, any()}. -connected(Conn, Props, #{slow_start := false} = S) -> - ?SLOG(debug, Props), - {ok, Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S), - {ok, S#{ctrl_pid => Pid}}; connected(_Conn, Props, S) -> ?SLOG(debug, Props), {ok, S}. %% @doc callback when connection is resumed from 0-RTT -spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret(). -resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when - is_function(ResumeFun) --> - ResumeFun(Conn, Data, S); +%% reserve resume conn with callback. +%% resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when +%% is_function(ResumeFun) +%% -> +%% ResumeFun(Conn, Data, S); resumed(_Conn, _Data, S) -> {ok, S#{is_resumed := true}}. @@ -245,9 +242,11 @@ handle_call( _From, #{streams := Streams} = S ) -> - [emqx_quic_data_stream:activate_data(OwnerPid, ActivateData) || {OwnerPid, _Stream} <- Streams], + [ + catch emqx_quic_data_stream:activate_data(OwnerPid, ActivateData) + || {OwnerPid, _Stream} <- Streams + ], {reply, ok, S#{ - %streams := [], %% @FIXME what ?????? channel := Channel, serialize := Serialize, parse_state := PS diff --git a/apps/emqx/src/emqx_quic_data_stream.erl b/apps/emqx/src/emqx_quic_data_stream.erl index 094680b19..24dd71c29 100644 --- a/apps/emqx/src/emqx_quic_data_stream.erl +++ b/apps/emqx/src/emqx_quic_data_stream.erl @@ -68,11 +68,11 @@ activate_data(StreamPid, {PS, Serialize, Channel}) -> %% @TODO -spec init_handoff( Stream, - #{parse_state := PS} = _StreamOpts, + _StreamOpts, Connection, #{is_orphan := true, flags := Flags} ) -> - {ok, init_state(Stream, Connection, Flags, PS)}. + {ok, init_state(Stream, Connection, Flags)}. %% %% @doc Post handoff data stream @@ -239,6 +239,7 @@ do_handle_appl_msg( do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when Channel =/= undefined -> + ok = inc_incoming_stats(Packet), with_channel(handle_in, [Packet], S); do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) when Channel =/= undefined @@ -422,6 +423,19 @@ do_parse_incoming(Data, Packets, ParseState) -> end. %% followings are copied from emqx_connection +-compile({inline, [inc_incoming_stats/1]}). +inc_incoming_stats(Packet = ?PACKET(Type)) -> + inc_counter(recv_pkt, 1), + case Type =:= ?PUBLISH of + true -> + inc_counter(recv_msg, 1), + inc_qos_stats(recv_msg, Packet), + inc_counter(incoming_pubs, 1); + false -> + ok + end, + emqx_metrics:inc_recv(Packet). + -compile({inline, [inc_outgoing_stats/1]}). inc_outgoing_stats({error, message_too_large}) -> inc_counter('send_msg.dropped', 1), diff --git a/apps/emqx/test/emqtt_quic_SUITE.erl b/apps/emqx/test/emqtt_quic_SUITE.erl index 28e9bcd7b..6c19ecdad 100644 --- a/apps/emqx/test/emqtt_quic_SUITE.erl +++ b/apps/emqx/test/emqtt_quic_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("quicer/include/quicer.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). suite() -> [{timetrap, {seconds, 30}}]. @@ -79,7 +80,10 @@ groups() -> t_multi_streams_shutdown_data_stream_abortive, t_multi_streams_dup_sub, t_multi_streams_packet_boundary, - t_multi_streams_packet_malform + t_multi_streams_packet_malform, + t_multi_streams_kill_sub_stream, + t_multi_streams_packet_too_large, + t_conn_change_client_addr ]}, {shutdown, [ @@ -537,12 +541,84 @@ t_multi_streams_packet_malform(Config) -> timer:sleep(200), ?assert(is_list(emqtt:info(C))), - {error, stm_send_error, aborted} = quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>), + {error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>), + timer:sleep(200), ?assert(is_list(emqtt:info(C))), ok = emqtt:disconnect(C). +t_multi_streams_packet_too_large(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + Topic = atom_to_binary(?FUNCTION_NAME), + meck:new(emqx_frame, [passthrough, no_history]), + ok = meck:expect( + emqx_frame, + serialize_opts, + fun(#mqtt_packet_connect{proto_ver = ProtoVer}) -> + #{version => ProtoVer, max_size => 1024} + end + ), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + + {ok, PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + binary:copy(<<"stream data 1">>, 1024), + [{qos, PubQos}], + undefined + ), + timeout = recv_pub(1), + ?assert(is_list(emqtt:info(C))), + ok = meck:unload(emqx_frame), + ok = emqtt:disconnect(C). + +t_conn_change_client_addr(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + Topic = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]), + + {ok, {quic, Conn, _} = PubVia} = emqtt:start_data_stream(C, []), + ok = emqtt:publish_async( + C, + PubVia, + Topic, + <<"stream data 1">>, + [{qos, PubQos}], + undefined + ), + + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := _PktId1, + payload := <<"stream data 1">>, + qos := RecQos + }} + ], + recv_pub(1) + ), + NewPort = select_port(), + {ok, OldAddr} = quicer:sockname(Conn), + ?assertEqual( + ok, quicer:setopt(Conn, param_conn_local_address, "127.0.0.1:" ++ integer_to_list(NewPort)) + ), + {ok, NewAddr} = quicer:sockname(Conn), + ct:pal("NewAddr: ~p, Old Addr: ~p", [NewAddr, OldAddr]), + ?assertNotEqual(OldAddr, NewAddr), + ?assert(is_list(emqtt:info(C))), + ok = emqtt:disconnect(C). + t_multi_streams_sub_pub_async(Config) -> Topic = atom_to_binary(?FUNCTION_NAME), PubQos = ?config(pub_qos, Config), @@ -815,6 +891,57 @@ t_multi_streams_unsub(Config) -> timeout = recv_pub(1), ok = emqtt:disconnect(C). +t_multi_streams_kill_sub_stream(Config) -> + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := _SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + [TopicStreamOwner] = emqx_broker:subscribers(Topic), + exit(TopicStreamOwner, kill), + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := Code, via := _PVia}} when Code == 0 orelse Code == 16 -> + ok + end, + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic2, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> + ok; + {ok, #{reason_code := 0, via := _PVia2}} -> + ok + end, + + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + topic := Topic2, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + recv_pub(1) + ), + ?assertEqual(timeout, recv_pub(1)), + ok. + t_multi_streams_unsub_via_other(Config) -> PubQos = ?config(pub_qos, Config), SubQos = ?config(sub_qos, Config), @@ -1208,7 +1335,9 @@ t_conn_resume(Config) -> {nst, NST} | Config ]), - {ok, _} = emqtt:quic_connect(C). + {ok, _} = emqtt:quic_connect(C), + Cid = proplists:get_value(clientid, emqtt:info(C)), + ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]). t_conn_without_ctrl_stream(Config) -> erlang:process_flag(trap_exit, true), @@ -1289,3 +1418,19 @@ start_emqx_quic(UdpPort) -> -spec stop_emqx() -> ok. stop_emqx() -> emqx_common_test_helpers:stop_apps([]). + +%% select a random port picked by OS +-spec select_port() -> inet:port_number(). +select_port() -> + {ok, S} = gen_udp:open(0, [{reuseaddr, true}]), + {ok, {_, Port}} = inet:sockname(S), + gen_udp:close(S), + case os:type() of + {unix, darwin} -> + %% in MacOS, still get address_in_use after close port + timer:sleep(500); + _ -> + skip + end, + ct:pal("select port: ~p", [Port]), + Port.