From 1692a16778731711db58ac17eea2a400f810e6d6 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 11 Jan 2023 16:24:06 +0100 Subject: [PATCH] feat(quic): handle ctrl stream normal shutdown --- apps/emqx/include/emqx_quic.hrl | 1 + apps/emqx/src/emqx_connection.erl | 3 +- apps/emqx/src/emqx_quic_connection.erl | 27 +++- apps/emqx/src/emqx_quic_stream.erl | 14 ++- .../test/emqx_quic_multistreams_SUITE.erl | 118 +++++++++++++++++- 5 files changed, 152 insertions(+), 11 deletions(-) diff --git a/apps/emqx/include/emqx_quic.hrl b/apps/emqx/include/emqx_quic.hrl index 3366b8938..a16784d5d 100644 --- a/apps/emqx/include/emqx_quic.hrl +++ b/apps/emqx/include/emqx_quic.hrl @@ -19,6 +19,7 @@ %% MQTT Over QUIC Shutdown Error code. -define(MQTT_QUIC_CONN_NOERROR, 0). +-define(MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN, 1). -define(MQTT_QUIC_CONN_ERROR_OVERLOADED, 2). -endif. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ff3ee81a9..2916f37bb 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -921,7 +921,8 @@ handle_info({sock_error, Reason}, State) -> false -> ok end, handle_info({sock_closed, Reason}, close_socket(State)); -handle_info({quic, Event, Handle, Prop}, State) -> +%% handle QUIC control stream events +handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) -> emqx_quic_stream:Event(Handle, Prop, State); handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index 69d16cbc3..7538307e8 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -179,7 +179,13 @@ new_stream( SOpts1, Props ), - quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}), + case quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}) of + ok -> + ok; + E -> + %% Only log, keep connecion alive. + ?SLOG(error, #{message => "new stream handoff failed", stream => Stream, error => E}) + end, %% @TODO maybe keep them in `inactive_streams' {ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}. @@ -200,7 +206,7 @@ transport_shutdown(_C, DownInfo, S) when is_map(DownInfo) -> %% @doc callback for handling for peer addr changed. -spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret(). peer_address_changed(_C, _NewAddr, S) -> - %% @TODO update session info? + %% @TODO update conn info in emqx_quic_stream {ok, S}. %% @doc callback for handling local addr change, currently unused @@ -224,7 +230,7 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) -> %% @doc callback for handling request when remote wants for more streams %% should cope with rate limiting %% @TODO this is not going to get triggered in current version -%% for https://github.com/microsoft/msquic/issues/3120 +%% ref: https://github.com/microsoft/msquic/issues/3120 -spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret(). peer_needs_streams(_C, undefined, S) -> ?SLOG(info, #{ @@ -240,6 +246,10 @@ handle_call( #{streams := Streams} = S ) -> [ + %% Try to activate streams individually if failed, stream will shutdown on its own. + %% we dont care about the return val here. + %% note, this is only used after control stream pass the validation. The data streams + %% that are called here are assured to be inactived (data processing hasn't been started). catch emqx_quic_data_stream:activate_data(OwnerPid, ActivateData) || {OwnerPid, _Stream} <- Streams ], @@ -255,10 +265,15 @@ handle_call(_Req, _From, S) -> handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) -> case Reason of normal -> - quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); + quicer:async_shutdown_connection( + Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, ?MQTT_QUIC_CONN_NOERROR + ); _ -> - %% @TODO have some reasons mappings here. - quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 1) + quicer:async_shutdown_connection( + Conn, + ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, + ?MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN + ) end, {ok, S}; handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) -> diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index a8ef7d41d..d1b205cf0 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% MQTT/QUIC Stream +%% MQTT/QUIC control Stream -module(emqx_quic_stream). -ifndef(BUILD_WITHOUT_QUIC). @@ -38,6 +38,7 @@ peercert/1 ]). -include_lib("quicer/include/quicer.hrl"). +-include_lib("emqx/include/emqx_quic.hrl"). -type cb_ret() :: quicer_stream:cb_ret(). -type cb_data() :: quicer_stream:cb_state(). @@ -223,10 +224,17 @@ stream_closed( is_atom(Status) andalso is_integer(Code) -> - %% @TODO for now we fake a sock_closed for + %% For now we fake a sock_closed for %% emqx_connection:process_msg to append %% a msg to be processed - {ok, {sock_closed, Status}, S}. + Reason = + case Code of + ?MQTT_QUIC_CONN_NOERROR -> + normal; + _ -> + Status + end, + {ok, {sock_closed, Reason}, S}. %%% %%% Internals diff --git a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl index 025790ef7..17f4cbbc2 100644 --- a/apps/emqx/test/emqx_quic_multistreams_SUITE.erl +++ b/apps/emqx/test/emqx_quic_multistreams_SUITE.erl @@ -118,6 +118,8 @@ groups() -> t_multi_streams_shutdown_ctrl_stream, t_multi_streams_shutdown_ctrl_stream_then_reconnect, t_multi_streams_remote_shutdown, + t_multi_streams_emqx_ctrl_kill, + t_multi_streams_emqx_ctrl_exit_normal, t_multi_streams_remote_shutdown_with_reconnect ]}, @@ -1327,7 +1329,13 @@ t_multi_streams_shutdown_ctrl_stream(Config) -> ), {quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)), - quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 1000), + Flag = ?config(stream_shutdown_flag, Config), + AppErrorCode = + case Flag of + ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL -> 0; + _ -> 500 + end, + quicer:shutdown_stream(Ctrlstream, Flag, AppErrorCode, 1000), timer:sleep(500), %% Client should be closed ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). @@ -1384,6 +1392,114 @@ t_multi_streams_shutdown_ctrl_stream_then_reconnect(Config) -> %% Client should be closed ?assert(is_list(emqtt:info(C))). +t_multi_streams_emqx_ctrl_kill(Config) -> + erlang:process_flag(trap_exit, true), + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {reconnect, false}, + %% speedup test + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia2 =/= SVia), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + ClientId = proplists:get_value(clientid, emqtt:info(C)), + [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), + exit(TransPid, kill), + + timer:sleep(200), + %% Client should be closed + ?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + +t_multi_streams_emqx_ctrl_exit_normal(Config) -> + erlang:process_flag(trap_exit, true), + PubQos = ?config(pub_qos, Config), + SubQos = ?config(sub_qos, Config), + RecQos = calc_qos(PubQos, SubQos), + PktId1 = calc_pkt_id(RecQos, 1), + + Topic = atom_to_binary(?FUNCTION_NAME), + Topic2 = <>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {reconnect, false}, + %% speedup test + {connect_timeout, 5} + | Config + ]), + {ok, _} = emqtt:quic_connect(C), + {ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic, [{qos, SubQos}]} + ]), + {ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [ + {Topic2, [{qos, SubQos}]} + ]), + + ?assert(SVia2 =/= SVia), + + case + emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}]) + of + ok when PubQos == 0 -> ok; + {ok, #{reason_code := 0, via := _PVia}} -> ok + end, + + PubRecvs = recv_pub(1), + ?assertMatch( + [ + {publish, #{ + client_pid := C, + packet_id := PktId1, + payload := <<1, 2, 3, 4, 5>>, + qos := RecQos + }} + ], + PubRecvs + ), + + ClientId = proplists:get_value(clientid, emqtt:info(C)), + [{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId), + + emqx_connection:stop(TransPid), + timer:sleep(200), + %% Client exit normal. + ?assertMatch({'EXIT', {normal, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)). + t_multi_streams_remote_shutdown(Config) -> erlang:process_flag(trap_exit, true), PubQos = ?config(pub_qos, Config),