feat(quic): improve coverage and remove unused code

This commit is contained in:
William Yang 2022-12-22 22:20:29 +01:00
parent 0544a3ca0c
commit 0173121a30
3 changed files with 173 additions and 15 deletions

View File

@ -135,20 +135,17 @@ new_conn(
%% @doc callback when connection is connected. %% @doc callback when connection is connected.
-spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) -> -spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}. {ok, cb_state()} | {error, any()}.
connected(Conn, Props, #{slow_start := false} = S) ->
?SLOG(debug, Props),
{ok, Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S),
{ok, S#{ctrl_pid => Pid}};
connected(_Conn, Props, S) -> connected(_Conn, Props, S) ->
?SLOG(debug, Props), ?SLOG(debug, Props),
{ok, S}. {ok, S}.
%% @doc callback when connection is resumed from 0-RTT %% @doc callback when connection is resumed from 0-RTT
-spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret(). -spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret().
resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when %% reserve resume conn with callback.
is_function(ResumeFun) %% resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when
-> %% is_function(ResumeFun)
ResumeFun(Conn, Data, S); %% ->
%% ResumeFun(Conn, Data, S);
resumed(_Conn, _Data, S) -> resumed(_Conn, _Data, S) ->
{ok, S#{is_resumed := true}}. {ok, S#{is_resumed := true}}.
@ -245,9 +242,11 @@ handle_call(
_From, _From,
#{streams := Streams} = S #{streams := Streams} = S
) -> ) ->
[emqx_quic_data_stream:activate_data(OwnerPid, ActivateData) || {OwnerPid, _Stream} <- Streams], [
catch emqx_quic_data_stream:activate_data(OwnerPid, ActivateData)
|| {OwnerPid, _Stream} <- Streams
],
{reply, ok, S#{ {reply, ok, S#{
%streams := [], %% @FIXME what ??????
channel := Channel, channel := Channel,
serialize := Serialize, serialize := Serialize,
parse_state := PS parse_state := PS

View File

@ -68,11 +68,11 @@ activate_data(StreamPid, {PS, Serialize, Channel}) ->
%% @TODO -spec %% @TODO -spec
init_handoff( init_handoff(
Stream, Stream,
#{parse_state := PS} = _StreamOpts, _StreamOpts,
Connection, Connection,
#{is_orphan := true, flags := Flags} #{is_orphan := true, flags := Flags}
) -> ) ->
{ok, init_state(Stream, Connection, Flags, PS)}. {ok, init_state(Stream, Connection, Flags)}.
%% %%
%% @doc Post handoff data stream %% @doc Post handoff data stream
@ -239,6 +239,7 @@ do_handle_appl_msg(
do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when
Channel =/= undefined Channel =/= undefined
-> ->
ok = inc_incoming_stats(Packet),
with_channel(handle_in, [Packet], S); with_channel(handle_in, [Packet], S);
do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) when do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) when
Channel =/= undefined Channel =/= undefined
@ -422,6 +423,19 @@ do_parse_incoming(Data, Packets, ParseState) ->
end. end.
%% followings are copied from emqx_connection %% followings are copied from emqx_connection
-compile({inline, [inc_incoming_stats/1]}).
inc_incoming_stats(Packet = ?PACKET(Type)) ->
inc_counter(recv_pkt, 1),
case Type =:= ?PUBLISH of
true ->
inc_counter(recv_msg, 1),
inc_qos_stats(recv_msg, Packet),
inc_counter(incoming_pubs, 1);
false ->
ok
end,
emqx_metrics:inc_recv(Packet).
-compile({inline, [inc_outgoing_stats/1]}). -compile({inline, [inc_outgoing_stats/1]}).
inc_outgoing_stats({error, message_too_large}) -> inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped', 1), inc_counter('send_msg.dropped', 1),

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("quicer/include/quicer.hrl"). -include_lib("quicer/include/quicer.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
suite() -> suite() ->
[{timetrap, {seconds, 30}}]. [{timetrap, {seconds, 30}}].
@ -79,7 +80,10 @@ groups() ->
t_multi_streams_shutdown_data_stream_abortive, t_multi_streams_shutdown_data_stream_abortive,
t_multi_streams_dup_sub, t_multi_streams_dup_sub,
t_multi_streams_packet_boundary, t_multi_streams_packet_boundary,
t_multi_streams_packet_malform t_multi_streams_packet_malform,
t_multi_streams_kill_sub_stream,
t_multi_streams_packet_too_large,
t_conn_change_client_addr
]}, ]},
{shutdown, [ {shutdown, [
@ -537,12 +541,84 @@ t_multi_streams_packet_malform(Config) ->
timer:sleep(200), timer:sleep(200),
?assert(is_list(emqtt:info(C))), ?assert(is_list(emqtt:info(C))),
{error, stm_send_error, aborted} = quicer:send(MalformStream, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>), {error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>),
timer:sleep(200), timer:sleep(200),
?assert(is_list(emqtt:info(C))), ?assert(is_list(emqtt:info(C))),
ok = emqtt:disconnect(C). ok = emqtt:disconnect(C).
t_multi_streams_packet_too_large(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
Topic = atom_to_binary(?FUNCTION_NAME),
meck:new(emqx_frame, [passthrough, no_history]),
ok = meck:expect(
emqx_frame,
serialize_opts,
fun(#mqtt_packet_connect{proto_ver = ProtoVer}) ->
#{version => ProtoVer, max_size => 1024}
end
),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C),
{ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]),
{ok, PubVia} = emqtt:start_data_stream(C, []),
ok = emqtt:publish_async(
C,
PubVia,
Topic,
binary:copy(<<"stream data 1">>, 1024),
[{qos, PubQos}],
undefined
),
timeout = recv_pub(1),
?assert(is_list(emqtt:info(C))),
ok = meck:unload(emqx_frame),
ok = emqtt:disconnect(C).
t_conn_change_client_addr(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos),
Topic = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C),
{ok, _, [SubQos]} = emqtt:subscribe(C, #{}, [{Topic, [{qos, SubQos}]}]),
{ok, {quic, Conn, _} = PubVia} = emqtt:start_data_stream(C, []),
ok = emqtt:publish_async(
C,
PubVia,
Topic,
<<"stream data 1">>,
[{qos, PubQos}],
undefined
),
?assertMatch(
[
{publish, #{
client_pid := C,
packet_id := _PktId1,
payload := <<"stream data 1">>,
qos := RecQos
}}
],
recv_pub(1)
),
NewPort = select_port(),
{ok, OldAddr} = quicer:sockname(Conn),
?assertEqual(
ok, quicer:setopt(Conn, param_conn_local_address, "127.0.0.1:" ++ integer_to_list(NewPort))
),
{ok, NewAddr} = quicer:sockname(Conn),
ct:pal("NewAddr: ~p, Old Addr: ~p", [NewAddr, OldAddr]),
?assertNotEqual(OldAddr, NewAddr),
?assert(is_list(emqtt:info(C))),
ok = emqtt:disconnect(C).
t_multi_streams_sub_pub_async(Config) -> t_multi_streams_sub_pub_async(Config) ->
Topic = atom_to_binary(?FUNCTION_NAME), Topic = atom_to_binary(?FUNCTION_NAME),
PubQos = ?config(pub_qos, Config), PubQos = ?config(pub_qos, Config),
@ -815,6 +891,57 @@ t_multi_streams_unsub(Config) ->
timeout = recv_pub(1), timeout = recv_pub(1),
ok = emqtt:disconnect(C). ok = emqtt:disconnect(C).
t_multi_streams_kill_sub_stream(Config) ->
PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config),
RecQos = calc_qos(PubQos, SubQos),
PktId1 = calc_pkt_id(RecQos, 1),
Topic = atom_to_binary(?FUNCTION_NAME),
Topic2 = <<Topic/binary, "two">>,
{ok, C} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:quic_connect(C),
{ok, #{via := _SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
{Topic, [{qos, SubQos}]}
]),
{ok, #{via := _SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
{Topic2, [{qos, SubQos}]}
]),
[TopicStreamOwner] = emqx_broker:subscribers(Topic),
exit(TopicStreamOwner, kill),
case
emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}])
of
ok when PubQos == 0 ->
ok;
{ok, #{reason_code := Code, via := _PVia}} when Code == 0 orelse Code == 16 ->
ok
end,
case
emqtt:publish_via(C, {new_data_stream, []}, Topic2, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}])
of
ok when PubQos == 0 ->
ok;
{ok, #{reason_code := 0, via := _PVia2}} ->
ok
end,
?assertMatch(
[
{publish, #{
client_pid := C,
packet_id := PktId1,
topic := Topic2,
payload := <<1, 2, 3, 4, 5>>,
qos := RecQos
}}
],
recv_pub(1)
),
?assertEqual(timeout, recv_pub(1)),
ok.
t_multi_streams_unsub_via_other(Config) -> t_multi_streams_unsub_via_other(Config) ->
PubQos = ?config(pub_qos, Config), PubQos = ?config(pub_qos, Config),
SubQos = ?config(sub_qos, Config), SubQos = ?config(sub_qos, Config),
@ -1208,7 +1335,9 @@ t_conn_resume(Config) ->
{nst, NST} {nst, NST}
| Config | Config
]), ]),
{ok, _} = emqtt:quic_connect(C). {ok, _} = emqtt:quic_connect(C),
Cid = proplists:get_value(clientid, emqtt:info(C)),
ct:pal("~p~n", [emqx_cm:get_chan_info(Cid)]).
t_conn_without_ctrl_stream(Config) -> t_conn_without_ctrl_stream(Config) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
@ -1289,3 +1418,19 @@ start_emqx_quic(UdpPort) ->
-spec stop_emqx() -> ok. -spec stop_emqx() -> ok.
stop_emqx() -> stop_emqx() ->
emqx_common_test_helpers:stop_apps([]). emqx_common_test_helpers:stop_apps([]).
%% select a random port picked by OS
-spec select_port() -> inet:port_number().
select_port() ->
{ok, S} = gen_udp:open(0, [{reuseaddr, true}]),
{ok, {_, Port}} = inet:sockname(S),
gen_udp:close(S),
case os:type() of
{unix, darwin} ->
%% in MacOS, still get address_in_use after close port
timer:sleep(500);
_ ->
skip
end,
ct:pal("select port: ~p", [Port]),
Port.