From 381eb8ec682ae9740398333aae0069a5ef1d8840 Mon Sep 17 00:00:00 2001 From: William Yang Date: Fri, 13 Jan 2023 10:02:21 +0100 Subject: [PATCH] chore(quic): fix dialyzer --- apps/emqx/src/emqx_connection.erl | 7 ++++++- apps/emqx/src/emqx_listeners.erl | 7 +++++-- apps/emqx/src/emqx_quic_connection.erl | 28 +++++++++++-------------- apps/emqx/src/emqx_quic_data_stream.erl | 8 +++---- apps/emqx/src/emqx_quic_stream.erl | 13 ++++++------ 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 88c7d28e2..9e0099414 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -929,7 +929,12 @@ handle_info({sock_error, Reason}, State) -> handle_info({sock_closed, Reason}, close_socket(State)); %% handle QUIC control stream events handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) -> - emqx_quic_stream:Event(Handle, Prop, State); + case emqx_quic_stream:Event(Handle, Prop, State) of + {{continue, Msgs}, NewState} -> + {ok, Msgs, NewState}; + Other -> + Other + end; handle_info(Info, State) -> with_channel(handle_info, [Info], State). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 45f3b2cfd..ccf6a667a 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -386,13 +386,16 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> listener => {quic, ListenerName}, limiter => limiter(Opts) }, - StreamOpts = [{stream_callback, emqx_quic_stream}], + StreamOpts = #{ + stream_callback => emqx_quic_stream, + active => 1 + }, Id = listener_id(quic, ListenerName), add_limiter_bucket(Id, Opts), quicer:start_listener( Id, ListenOn, - {ListenOpts, ConnectionOpts, StreamOpts} + {maps:from_list(ListenOpts), ConnectionOpts, StreamOpts} ); [] -> {ok, {skipped, quic_app_missing}} diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index ae195cd6b..39d6a2c2f 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -93,7 +93,7 @@ closed(_Conn, #{is_peer_acked := _} = Prop, S) -> %% @doc handle the new incoming connecion as the connecion acceptor. -spec new_conn(quicer:connection_handle(), quicer:new_conn_props(), cb_state()) -> - {ok, cb_state()} | {error, any()}. + {ok, cb_state()} | {error, any(), cb_state()}. new_conn( Conn, #{version := _Vsn} = ConnInfo, @@ -119,7 +119,7 @@ new_conn( end; true -> emqx_metrics:inc('olp.new_conn'), - quicer:async_shutdown_connection( + _ = quicer:async_shutdown_connection( Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, ?MQTT_QUIC_CONN_ERROR_OVERLOADED @@ -129,7 +129,7 @@ new_conn( %% @doc callback when connection is connected. -spec connected(quicer:connection_handle(), quicer:connected_props(), cb_state()) -> - {ok, cb_state()} | {error, any()}. + {ok, cb_state()} | {error, any(), cb_state()}. connected(_Conn, Props, S) -> ?SLOG(debug, Props), {ok, S}. @@ -193,7 +193,7 @@ new_stream( -spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret(). shutdown(Conn, ErrorCode, S) -> ErrorCode =/= 0 andalso ?SLOG(debug, #{error_code => ErrorCode, state => S}), - quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), + _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. %% @doc callback for handling transport error, such as idle timeout @@ -245,7 +245,7 @@ handle_call( _From, #{streams := Streams} = S ) -> - [ + _ = [ %% Try to activate streams individually if failed, stream will shutdown on its own. %% we dont care about the return val here. %% note, this is only used after control stream pass the validation. The data streams @@ -263,18 +263,14 @@ handle_call(_Req, _From, S) -> %% @doc handle DOWN messages from streams. handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) -> - case Reason of - normal -> - quicer:async_shutdown_connection( - Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, ?MQTT_QUIC_CONN_NOERROR - ); - _ -> - quicer:async_shutdown_connection( - Conn, - ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, + Code = + case Reason of + normal -> + ?MQTT_QUIC_CONN_NOERROR; + _ -> ?MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN - ) - end, + end, + _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, Code), {ok, S}; handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) -> case proplists:is_defined(Pid, Streams) of diff --git a/apps/emqx/src/emqx_quic_data_stream.erl b/apps/emqx/src/emqx_quic_data_stream.erl index e3f6b7adc..2e90edcfb 100644 --- a/apps/emqx/src/emqx_quic_data_stream.erl +++ b/apps/emqx/src/emqx_quic_data_stream.erl @@ -98,19 +98,19 @@ post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Cha {ok, S}; post_handoff(Stream, {PS, Serialize, Channel}, S) -> ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}), - quicer:setopt(Stream, active, 10), + _ = quicer:setopt(Stream, active, 10), {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. -spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret(). peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) -> %% we abort send with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), {ok, S}. -spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret(). peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) -> %% we abort receive with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), + _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), {ok, S}. -spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret(). @@ -157,7 +157,7 @@ handle_stream_data( -spec passive(stream_handle(), undefined, cb_state()) -> cb_ret(). passive(Stream, undefined, S) -> - quicer:setopt(Stream, active, 10), + _ = quicer:setopt(Stream, active, 10), {ok, S}. -spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret(). diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 5f7f93866..f60345fe9 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -136,11 +136,11 @@ getopts(_Socket, _Opts) -> %% @TODO supply some App Error Code from caller fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) -> %% handshake aborted. - quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), + _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), ok; fast_close({quic, _Conn, Stream, _Info}) -> %% Force flush - quicer:async_shutdown_stream(Stream), + _ = quicer:async_shutdown_stream(Stream), %% @FIXME Since we shutdown the control stream, we shutdown the connection as well %% *BUT* Msquic does not flush the send buffer if we shutdown the connection after %% gracefully shutdown the stream. @@ -173,13 +173,13 @@ async_send({quic, _Conn, Stream, _Info}, Data, _Options) -> -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). peer_receive_aborted(Stream, ErrorCode, S) -> - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), {ok, S}. -spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). peer_send_aborted(Stream, ErrorCode, S) -> %% we abort receive with same reason - quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), + _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), {ok, S}. -spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret(). @@ -206,7 +206,8 @@ passive(Stream, undefined, S) -> end, {ok, S}. --spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> cb_ret(). +-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> + {{continue, term()}, cb_data()}. stream_closed( _Stream, #{ @@ -236,7 +237,7 @@ stream_closed( _ -> Status end, - {ok, {sock_closed, Reason}, S}. + {{continue, {sock_closed, Reason}}, S}. %%% %%% Internals