chore(quic): fix dialyzer

This commit is contained in:
William Yang 2023-01-13 10:02:21 +01:00
parent 282d1a6829
commit 381eb8ec68
5 changed files with 34 additions and 29 deletions

View File

@ -929,7 +929,12 @@ handle_info({sock_error, Reason}, State) ->
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
%% handle QUIC control stream events %% handle QUIC control stream events
handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) -> handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) ->
emqx_quic_stream:Event(Handle, Prop, State); case emqx_quic_stream:Event(Handle, Prop, State) of
{{continue, Msgs}, NewState} ->
{ok, Msgs, NewState};
Other ->
Other
end;
handle_info(Info, State) -> handle_info(Info, State) ->
with_channel(handle_info, [Info], State). with_channel(handle_info, [Info], State).

View File

@ -386,13 +386,16 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
listener => {quic, ListenerName}, listener => {quic, ListenerName},
limiter => limiter(Opts) limiter => limiter(Opts)
}, },
StreamOpts = [{stream_callback, emqx_quic_stream}], StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},
Id = listener_id(quic, ListenerName), Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Opts), add_limiter_bucket(Id, Opts),
quicer:start_listener( quicer:start_listener(
Id, Id,
ListenOn, ListenOn,
{ListenOpts, ConnectionOpts, StreamOpts} {maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
); );
[] -> [] ->
{ok, {skipped, quic_app_missing}} {ok, {skipped, quic_app_missing}}

View File

@ -93,7 +93,7 @@ closed(_Conn, #{is_peer_acked := _} = Prop, S) ->
%% @doc handle the new incoming connecion as the connecion acceptor. %% @doc handle the new incoming connecion as the connecion acceptor.
-spec new_conn(quicer:connection_handle(), quicer:new_conn_props(), cb_state()) -> -spec new_conn(quicer:connection_handle(), quicer:new_conn_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}. {ok, cb_state()} | {error, any(), cb_state()}.
new_conn( new_conn(
Conn, Conn,
#{version := _Vsn} = ConnInfo, #{version := _Vsn} = ConnInfo,
@ -119,7 +119,7 @@ new_conn(
end; end;
true -> true ->
emqx_metrics:inc('olp.new_conn'), emqx_metrics:inc('olp.new_conn'),
quicer:async_shutdown_connection( _ = quicer:async_shutdown_connection(
Conn, Conn,
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
?MQTT_QUIC_CONN_ERROR_OVERLOADED ?MQTT_QUIC_CONN_ERROR_OVERLOADED
@ -129,7 +129,7 @@ new_conn(
%% @doc callback when connection is connected. %% @doc callback when connection is connected.
-spec connected(quicer:connection_handle(), quicer:connected_props(), cb_state()) -> -spec connected(quicer:connection_handle(), quicer:connected_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}. {ok, cb_state()} | {error, any(), cb_state()}.
connected(_Conn, Props, S) -> connected(_Conn, Props, S) ->
?SLOG(debug, Props), ?SLOG(debug, Props),
{ok, S}. {ok, S}.
@ -193,7 +193,7 @@ new_stream(
-spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret(). -spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret().
shutdown(Conn, ErrorCode, S) -> shutdown(Conn, ErrorCode, S) ->
ErrorCode =/= 0 andalso ?SLOG(debug, #{error_code => ErrorCode, state => S}), ErrorCode =/= 0 andalso ?SLOG(debug, #{error_code => ErrorCode, state => S}),
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
{ok, S}. {ok, S}.
%% @doc callback for handling transport error, such as idle timeout %% @doc callback for handling transport error, such as idle timeout
@ -245,7 +245,7 @@ handle_call(
_From, _From,
#{streams := Streams} = S #{streams := Streams} = S
) -> ) ->
[ _ = [
%% Try to activate streams individually if failed, stream will shutdown on its own. %% Try to activate streams individually if failed, stream will shutdown on its own.
%% we dont care about the return val here. %% we dont care about the return val here.
%% note, this is only used after control stream pass the validation. The data streams %% note, this is only used after control stream pass the validation. The data streams
@ -263,18 +263,14 @@ handle_call(_Req, _From, S) ->
%% @doc handle DOWN messages from streams. %% @doc handle DOWN messages from streams.
handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) -> handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) ->
case Reason of Code =
normal -> case Reason of
quicer:async_shutdown_connection( normal ->
Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, ?MQTT_QUIC_CONN_NOERROR ?MQTT_QUIC_CONN_NOERROR;
); _ ->
_ ->
quicer:async_shutdown_connection(
Conn,
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
?MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN ?MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN
) end,
end, _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, Code),
{ok, S}; {ok, S};
handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) -> handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) ->
case proplists:is_defined(Pid, Streams) of case proplists:is_defined(Pid, Streams) of

View File

@ -98,19 +98,19 @@ post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Cha
{ok, S}; {ok, S};
post_handoff(Stream, {PS, Serialize, Channel}, S) -> post_handoff(Stream, {PS, Serialize, Channel}, S) ->
?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}), ?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}),
quicer:setopt(Stream, active, 10), _ = quicer:setopt(Stream, active, 10),
{ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}. {ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
-spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret(). -spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) -> peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
%% we abort send with same reason %% we abort send with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}. {ok, S}.
-spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret(). -spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) -> peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
%% we abort receive with same reason %% we abort receive with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode), _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
{ok, S}. {ok, S}.
-spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret(). -spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret().
@ -157,7 +157,7 @@ handle_stream_data(
-spec passive(stream_handle(), undefined, cb_state()) -> cb_ret(). -spec passive(stream_handle(), undefined, cb_state()) -> cb_ret().
passive(Stream, undefined, S) -> passive(Stream, undefined, S) ->
quicer:setopt(Stream, active, 10), _ = quicer:setopt(Stream, active, 10),
{ok, S}. {ok, S}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret(). -spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret().

View File

@ -136,11 +136,11 @@ getopts(_Socket, _Opts) ->
%% @TODO supply some App Error Code from caller %% @TODO supply some App Error Code from caller
fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) -> fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) ->
%% handshake aborted. %% handshake aborted.
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), _ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
ok; ok;
fast_close({quic, _Conn, Stream, _Info}) -> fast_close({quic, _Conn, Stream, _Info}) ->
%% Force flush %% Force flush
quicer:async_shutdown_stream(Stream), _ = quicer:async_shutdown_stream(Stream),
%% @FIXME Since we shutdown the control stream, we shutdown the connection as well %% @FIXME Since we shutdown the control stream, we shutdown the connection as well
%% *BUT* Msquic does not flush the send buffer if we shutdown the connection after %% *BUT* Msquic does not flush the send buffer if we shutdown the connection after
%% gracefully shutdown the stream. %% gracefully shutdown the stream.
@ -173,13 +173,13 @@ async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
-spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). -spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, S) -> peer_receive_aborted(Stream, ErrorCode, S) ->
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}. {ok, S}.
-spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret(). -spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_send_aborted(Stream, ErrorCode, S) -> peer_send_aborted(Stream, ErrorCode, S) ->
%% we abort receive with same reason %% we abort receive with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode), _ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}. {ok, S}.
-spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret(). -spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret().
@ -206,7 +206,8 @@ passive(Stream, undefined, S) ->
end, end,
{ok, S}. {ok, S}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> cb_ret(). -spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) ->
{{continue, term()}, cb_data()}.
stream_closed( stream_closed(
_Stream, _Stream,
#{ #{
@ -236,7 +237,7 @@ stream_closed(
_ -> _ ->
Status Status
end, end,
{ok, {sock_closed, Reason}, S}. {{continue, {sock_closed, Reason}}, S}.
%%% %%%
%%% Internals %%% Internals