chore(quic): clean code

This commit is contained in:
William Yang 2023-01-09 09:17:03 +01:00
parent 22dcf5907e
commit 00f615a1e3
8 changed files with 1817 additions and 2037 deletions

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -119,10 +119,7 @@
limiter_timer :: undefined | reference(),
%% QUIC conn pid if is a pid
quic_conn_pid :: maybe(pid()),
%% QUIC control stream callback state
quic_ctrl_state :: map()
quic_conn_pid :: maybe(pid())
}).
-record(retry, {
@ -378,8 +375,7 @@ init_state(
limiter_buffer = queue:new(),
limiter_timer = undefined,
%% for quic streams to inherit
quic_conn_pid = maps:get(conn_pid, Opts, undefined),
quic_ctrl_state = #{}
quic_conn_pid = maps:get(conn_pid, Opts, undefined)
}.
run_loop(
@ -928,12 +924,6 @@ handle_info({sock_error, Reason}, State) ->
handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, Event, Handle, Prop}, State) ->
emqx_quic_stream:Event(Handle, Prop, State);
%% handle_info({quic, peer_send_shutdown, _Stream}, State) ->
%% handle_info({sock_closed, force}, close_socket(State));
%% handle_info({quic, closed, _Channel, ReasonFlag}, State) ->
%% handle_info({sock_closed, ReasonFlag}, State);
%% handle_info({quic, closed, _Stream}, State) ->
%% handle_info({sock_closed, force}, State);
handle_info(Info, State) ->
with_channel(handle_info, [Info], State).

View File

@ -17,13 +17,11 @@
%% @doc impl. the quic connection owner process.
-module(emqx_quic_connection).
-include("logger.hrl").
-ifndef(BUILD_WITHOUT_QUIC).
-include("logger.hrl").
-include_lib("quicer/include/quicer.hrl").
-include_lib("emqx/include/emqx_quic.hrl").
-else.
-define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0).
-endif.
-behavior(quicer_connection).
@ -55,10 +53,9 @@
%% Pid of ctrl stream
ctrl_pid := undefined | pid(),
%% quic connecion handle
conn := undefined | quicer:conneciton_hanlder(),
%% streams that handoff from this process, excluding control stream
%% these streams could die/closed without effecting the connecion/session.
conn := undefined | quicer:conneciton_handle(),
%% Data streams that handoff from this process
%% these streams could die/close without effecting the connecion/session.
%@TODO type?
streams := [{pid(), quicer:stream_handle()}],
%% New stream opts
@ -82,22 +79,20 @@ activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
%% @doc conneciton owner init callback
-spec init(map() | list()) -> {ok, cb_state()}.
init(ConnOpts) when is_list(ConnOpts) ->
init(maps:from_list(ConnOpts));
-spec init(map()) -> {ok, cb_state()}.
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(ConnOpts) when is_map(ConnOpts) ->
{ok, init_cb_state(ConnOpts)}.
-spec closed(quicer:conneciton_hanlder(), quicer:conn_closed_props(), cb_state()) ->
-spec closed(quicer:conneciton_handle(), quicer:conn_closed_props(), cb_state()) ->
{stop, normal, cb_state()}.
closed(_Conn, #{is_peer_acked := _} = Prop, S) ->
?SLOG(debug, Prop),
{stop, normal, S}.
%% @doc handle the new incoming connecion as the connecion acceptor.
-spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) ->
-spec new_conn(quicer:connection_handle(), quicer:new_conn_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}.
new_conn(
Conn,
@ -133,7 +128,7 @@ new_conn(
end.
%% @doc callback when connection is connected.
-spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) ->
-spec connected(quicer:connection_handle(), quicer:connected_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}.
connected(_Conn, Props, S) ->
?SLOG(debug, Props),
@ -185,21 +180,21 @@ new_stream(
Props
),
quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}),
%% @TODO keep them in ``inactive_streams'
%% @TODO maybe keep them in `inactive_streams'
{ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}.
%% @doc callback for handling for remote connecion shutdown.
%% @doc callback for handling remote connecion shutdown.
-spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret().
shutdown(Conn, _ErrorCode, S) ->
%% @TODO check spec what to set for the ErrorCode?
shutdown(Conn, ErrorCode, S) ->
ErrorCode =/= 0 andalso ?SLOG(debug, #{error_code => ErrorCode, state => S}),
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
{ok, S}.
%% @doc callback for handling for transport error, such as idle timeout
%% @doc callback for handling transport error, such as idle timeout
-spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) ->
cb_ret().
transport_shutdown(_C, _DownInfo, S) ->
%% @TODO some counter
transport_shutdown(_C, DownInfo, S) when is_map(DownInfo) ->
?SLOG(debug, DownInfo),
{ok, S}.
%% @doc callback for handling for peer addr changed.
@ -238,6 +233,7 @@ peer_needs_streams(_C, undefined, S) ->
{ok, S}.
%% @doc handle API calls
-spec handle_call(Req :: term(), gen_server:from(), cb_state()) -> cb_ret().
handle_call(
{activate_data_streams, {PS, Serialize, Channel} = ActivateData},
_From,
@ -256,7 +252,6 @@ handle_call(_Req, _From, S) ->
{reply, {error, unimpl}, S}.
%% @doc handle DOWN messages from streams.
%% @TODO handle DOWN from supervisor?
handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) ->
case Reason of
normal ->
@ -302,3 +297,7 @@ init_cb_state(#{zone := _Zone} = Map) ->
serialize => undefined,
is_resumed => false
}.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -21,11 +21,14 @@
%%
-module(emqx_quic_data_stream).
-ifndef(BUILD_WITHOUT_QUIC).
-behaviour(quicer_remote_stream).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("quicer/include/quicer.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-behaviour(quicer_remote_stream).
%% Connection Callbacks
-export([
@ -37,12 +40,12 @@
peer_receive_aborted/3,
send_shutdown_complete/3,
stream_closed/3,
peer_accepted/3,
passive/3
]).
-export([handle_stream_data/4]).
%% gen_server API
-export([activate_data/2]).
-export([
@ -51,9 +54,19 @@
handle_continue/2
]).
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_state() :: quicer_stream:cb_state().
-type error_code() :: quicer:error_code().
-type connection_handle() :: quicer:connection_handle().
-type stream_handle() :: quicer:stream_handle().
-type handoff_data() :: {
emqx_frame:parse_state() | undefined,
emqx_frame:serialize_opts() | undefined,
emqx_channel:channel() | undefined
}.
%%
%% @doc Activate the data handling.
%% Data handling is disabled before control stream allows the data processing.
%% Note, data handling is disabled before finishing the validation over control stream.
-spec activate_data(pid(), {
emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel()
}) -> ok.
@ -61,9 +74,12 @@ activate_data(StreamPid, {PS, Serialize, Channel}) ->
gen_server:call(StreamPid, {activate, {PS, Serialize, Channel}}, infinity).
%%
%% @doc Handoff from previous owner, mostly from the connection owner.
%% @TODO parse_state doesn't look necessary since we have it in post_handoff
%% @TODO -spec
%% @doc Handoff from previous owner, from the connection owner.
%% Note, unlike control stream, there is no acceptor for data streams.
%% The connection owner get new stream, spawn new proc and then handover to it.
%%
-spec init_handoff(stream_handle(), map(), connection_handle(), quicer:new_stream_props()) ->
{ok, cb_state()}.
init_handoff(
Stream,
_StreamOpts,
@ -75,10 +91,9 @@ init_handoff(
%%
%% @doc Post handoff data stream
%%
%% @TODO -spec
%%
-spec post_handoff(stream_handle(), handoff_data(), cb_state()) -> cb_ret().
post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Channel}, S) ->
%% Channel isn't ready yet.
%% When the channel isn't ready yet.
%% Data stream should wait for activate call with ?MODULE:activate_data/2
{ok, S};
post_handoff(Stream, {PS, Serialize, Channel}, S) ->
@ -86,53 +101,35 @@ post_handoff(Stream, {PS, Serialize, Channel}, S) ->
quicer:setopt(Stream, active, 10),
{ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
%%
%% @doc for local initiated stream
%%
peer_accepted(_Stream, _Flags, S) ->
%% we just ignore it
{ok, S}.
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
-spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
%% we abort send with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S};
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) ->
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}.
peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
-spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
%% we abort receive with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
{ok, S};
peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) ->
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
{ok, S}.
peer_send_shutdown(Stream, _Flags, S) ->
-spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret().
peer_send_shutdown(Stream, undefined, S) ->
ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
{ok, S}.
-spec send_complete(stream_handle(), IsCanceled :: boolean(), cb_state()) -> cb_ret().
send_complete(_Stream, false, S) ->
{ok, S};
send_complete(_Stream, true = _IsCanceled, S) ->
{ok, S}.
-spec send_shutdown_complete(stream_handle(), error_code(), cb_state()) -> cb_ret().
send_shutdown_complete(_Stream, _Flags, S) ->
{ok, S}.
handle_stream_data(
Stream,
Bin,
_Flags,
#{
is_unidir := false,
channel := undefined,
data_queue := Queue,
stream := Stream
} = State
) when is_binary(Bin) ->
{ok, State#{data_queue := [Bin | Queue]}};
-spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_state()) ->
cb_ret().
handle_stream_data(
_Stream,
Bin,
@ -145,6 +142,7 @@ handle_stream_data(
task_queue := TQ
} = State
) when
%% assert get stream data only after channel is created
Channel =/= undefined
->
{MQTTPackets, NewPS} = parse_incoming(list_to_binary(lists:reverse([Bin | QueuedData])), PS),
@ -157,25 +155,12 @@ handle_stream_data(
),
{{continue, handle_appl_msg}, State#{parse_state := NewPS, task_queue := NewTQ}}.
%% Reserved for unidi streams
%% handle_stream_data(Stream, Bin, _Flags, #{is_unidir := true, peer_stream := PeerStream, conn := Conn} = State) ->
%% case PeerStream of
%% undefined ->
%% {ok, StreamProc} = quicer_stream:start_link(?MODULE, Conn,
%% [ {open_flag, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}
%% , {is_local, true}
%% ]),
%% {ok, _} = quicer_stream:send(StreamProc, Bin),
%% {ok, State#{peer_stream := StreamProc}};
%% StreamProc when is_pid(StreamProc) ->
%% {ok, _} = quicer_stream:send(StreamProc, Bin),
%% {ok, State}
%% end.
-spec passive(stream_handle(), undefined, cb_state()) -> cb_ret().
passive(Stream, undefined, S) ->
quicer:setopt(Stream, active, 10),
{ok, S}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret().
stream_closed(
_Stream,
#{
@ -197,28 +182,20 @@ stream_closed(
->
{stop, normal, S}.
-spec handle_call(Request :: term(), From :: {pid(), term()}, cb_state()) -> cb_ret().
handle_call(Call, _From, S) ->
case do_handle_call(Call, S) of
{ok, NewS} ->
{reply, ok, NewS};
{error, Reason, NewS} ->
{reply, {error, Reason}, NewS};
{{continue, _} = Cont, NewS} ->
{reply, ok, NewS, Cont};
{hibernate, NewS} ->
{reply, ok, NewS, hibernate};
{stop, Reason, NewS} ->
{stop, Reason, {stopped, Reason}, NewS}
end.
do_handle_call(Call, S).
-spec handle_continue(Continue :: term(), cb_state()) -> cb_ret().
handle_continue(handle_appl_msg, #{task_queue := Q} = S) ->
case queue:out(Q) of
{{value, Item}, Q2} ->
do_handle_appl_msg(Item, S#{task_queue := Q2});
{empty, Q} ->
{empty, _Q} ->
{ok, S}
end.
%%% Internals
do_handle_appl_msg(
{outgoing, Packets},
#{
@ -248,7 +225,7 @@ do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S)
->
with_channel(handle_in, [FE], S);
do_handle_appl_msg({close, Reason}, S) ->
%% @TODO shall we abort shutdown or graceful shutdown?
%% @TODO shall we abort shutdown or graceful shutdown here?
with_channel(handle_info, [{sock_closed, Reason}], S);
do_handle_appl_msg({event, updated}, S) ->
%% Data stream don't care about connection state changes.
@ -294,7 +271,6 @@ with_channel(Fun, Args, #{channel := Channel, task_queue := Q} = S) when
}}
end.
%%% Internals
handle_outgoing(#mqtt_packet{} = P, S) ->
handle_outgoing([P], S);
handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir := false}) when
@ -373,7 +349,7 @@ init_state(Stream, Connection, OpenFlags, PS) ->
task_queue => queue:new()
}.
-spec do_handle_call(term(), quicer_stream:cb_state()) -> quicer_stream:cb_ret().
-spec do_handle_call(term(), cb_state()) -> cb_ret().
do_handle_call(
{activate, {PS, Serialize, Channel}},
#{
@ -386,7 +362,7 @@ do_handle_call(
%% We use quic protocol for flow control, and we don't check return val
case quicer:setopt(Stream, active, true) of
ok ->
{ok, NewS};
{reply, ok, NewS};
{error, E} ->
?SLOG(error, #{msg => "set stream active failed", error => E}),
{stop, E, NewS}
@ -484,3 +460,6 @@ is_datastream_out_pkt(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) w
true;
is_datastream_out_pkt(_) ->
false.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

View File

@ -17,8 +17,12 @@
%% MQTT/QUIC Stream
-module(emqx_quic_stream).
-ifndef(BUILD_WITHOUT_QUIC).
-behaviour(quicer_remote_stream).
-include("logger.hrl").
%% emqx transport Callbacks
-export([
type/1,
@ -33,31 +37,14 @@
sockname/1,
peercert/1
]).
-include("logger.hrl").
-ifndef(BUILD_WITHOUT_QUIC).
-include_lib("quicer/include/quicer.hrl").
-else.
%% STREAM SHUTDOWN FLAGS
-define(QUIC_STREAM_SHUTDOWN_FLAG_NONE, 0).
% Cleanly closes the send path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1).
% Abruptly closes the send path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND, 2).
% Abruptly closes the receive path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, 4).
% Abruptly closes both send and receive paths.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 6).
-define(QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, 8).
-endif.
-type cb_ret() :: gen_statem:event_handler_result().
-type cb_data() :: emqtt_quic:cb_data().
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_data() :: quicer_stream:cb_state().
-type connection_handle() :: quicer:connection_handle().
-type stream_handle() :: quicer:stream_handle().
-export([
new_stream/3,
send_complete/3,
peer_send_shutdown/3,
peer_send_aborted/3,
@ -79,13 +66,8 @@
}.
%% for accepting
-spec wait
({pid(), connection_handle(), socket_info()}) ->
{ok, socket()} | {error, enotconn};
%% For handover
({pid(), connection_handle(), stream_handle(), socket_info()}) ->
{ok, socket()} | {error, any()}.
-spec wait({pid(), connection_handle(), socket_info()}) ->
{ok, socket()} | {error, enotconn}.
%%% For Accepting New Remote Stream
wait({ConnOwner, Conn, ConnInfo}) ->
{ok, Conn} = quicer:async_accept_stream(Conn, []),
@ -105,15 +87,8 @@ wait({ConnOwner, Conn, ConnInfo}) ->
{'EXIT', ConnOwner, _Reason} ->
{error, enotconn}
end.
%% UNUSED, for ownership handover,
%% wait({PrevOwner, Conn, Stream, SocketInfo}) ->
%% case quicer:wait_for_handoff(PrevOwner, Stream) of
%% ok ->
%% {ok, socket(Conn, Stream, SocketInfo)};
%% owner_down ->
%% {error, owner_down}
%% end.
-spec type(_) -> quic.
type(_) ->
quic.
@ -155,7 +130,7 @@ getopts(_Socket, _Opts) ->
{buffer, 80000}
]}.
%% @TODO supply some App Error Code
%% @TODO supply some App Error Code from caller
fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) ->
%% handshake aborted.
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
@ -185,15 +160,13 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of
{ok, _Len} -> ok;
{error, X, Y} -> {error, {X, Y}};
Other -> Other
end.
%%%
%%% quicer stream callbacks
%%%
-spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret().
new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) ->
{stop, unimpl}.
-spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, S) ->
@ -222,28 +195,12 @@ send_complete(_Stream, true = _IsCancelled, S) ->
send_shutdown_complete(_Stream, _IsGraceful, S) ->
{ok, S}.
%% Local stream, Unidir
%% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data())
%% -> cb_ret().
%% handle_stream_data(Stream, Bin, Flags, #{ is_local := true
%% , parse_state := PS} = S) ->
%% ?SLOG(debug, #{data => Bin}, Flags),
%% case parse(Bin, PS, []) of
%% {keep_state, NewPS, Packets} ->
%% quicer:setopt(Stream, active, once),
%% {keep_state, S#{parse_state := NewPS},
%% [{next_event, cast, P } || P <- lists:reverse(Packets)]};
%% {stop, _} = Stop ->
%% Stop
%% end;
%% %% Remote stream
%% handle_stream_data(_Stream, _Bin, _Flags,
%% #{is_local := false, is_unidir := true, conn := _Conn} = _S) ->
%% {stop, unimpl}.
-spec passive(stream_handle(), undefined, cb_data()) -> cb_ret().
passive(Stream, undefined, S) ->
quicer:setopt(Stream, active, 10),
case quicer:setopt(Stream, active, 10) of
ok -> ok;
Error -> ?SLOG(error, #{message => "set active error", error => Error})
end,
{ok, S}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> cb_ret().
@ -277,3 +234,7 @@ stream_closed(
-spec socket(connection_handle(), stream_handle(), socket_info()) -> socket().
socket(Conn, CtrlStream, Info) when is_map(Info) ->
{quic, Conn, CtrlStream, Info}.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

File diff suppressed because it is too large Load Diff

View File

@ -79,19 +79,6 @@ end_per_group(_Group, _Config) ->
init_per_suite(Config) ->
%% Start Apps
dbg:tracer(process, {fun dbg:dhandler/2, group_leader()}),
dbg:p(all, c),
dbg:tp(emqx_quic_connection, cx),
dbg:tp(quicer_connection, cx),
%% dbg:tp(emqx_quic_stream, cx),
%% dbg:tp(emqtt_quic, cx),
%% dbg:tp(emqtt, cx),
%% dbg:tp(emqtt_quic_stream, cx),
%% dbg:tp(emqtt_quic_connection, cx),
%% dbg:tp(emqx_cm, open_session, cx),
%% dbg:tpl(emqx_cm, lookup_channels, cx),
%% dbg:tpl(emqx_cm, register_channel, cx),
%% dbg:tpl(emqx_cm, unregister_channel, cx),
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.

File diff suppressed because it is too large Load Diff