fix: prepare for multi stream

This commit is contained in:
William Yang 2022-11-02 14:20:17 +01:00
parent 2d09a054e3
commit a51c886908
3 changed files with 263 additions and 37 deletions

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT/TCP|TLS Connection
%% MQTT/TCP|TLS Connection|QUIC Stream
-module(emqx_connection).
-include("emqx.hrl").
@ -189,12 +189,16 @@
]}
).
-spec start_link(
esockd:transport(),
esockd:socket() | {pid(), quicer:connection_handler()},
emqx_channel:opts()
) ->
{ok, pid()}.
-spec start_link
(esockd:transport(), esockd:socket(), emqx_channel:opts()) ->
{ok, pid()};
(
emqx_quic_stream,
{ConnOwner :: pid(), quicer:connection_handler(), quicer:new_conn_props()},
emqx_quic_connection:cb_state()
) ->
{ok, pid()}.
start_link(Transport, Socket, Options) ->
Args = [self(), Transport, Socket, Options],
CPid = proc_lib:spawn_link(?MODULE, init, Args),
@ -324,6 +328,7 @@ init_state(
Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg),
FrameOpts = #{
%% @TODO:q what is strict_mode?
strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]),
max_size => emqx_config:get_zone_conf(Zone, [mqtt, max_packet_size])
},
@ -476,7 +481,9 @@ process_msg([Msg | More], State) ->
{ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), NState);
{stop, Reason, NState} ->
{stop, Reason, NState}
{stop, Reason, NState};
{stop, Reason} ->
{stop, Reason, State}
end
catch
exit:normal ->
@ -507,7 +514,6 @@ append_msg(Q, Msg) ->
%%--------------------------------------------------------------------
%% Handle a Msg
handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of
{reply, Reply, NState} ->
@ -747,6 +753,7 @@ when_bytes_in(Oct, Data, State) ->
NState
).
%% @doc: return a reversed Msg list
-compile({inline, [next_incoming_msgs/3]}).
next_incoming_msgs([Packet], Msgs, State) ->
{ok, [{incoming, Packet} | Msgs], State};
@ -892,6 +899,8 @@ handle_info({sock_error, Reason}, State) ->
false -> ok
end,
handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, Event, Handle, Prop}, State) ->
emqx_quic_stream:Event(Handle, Prop, State);
%% handle_info({quic, peer_send_shutdown, _Stream}, State) ->
%% handle_info({sock_closed, force}, close_socket(State));
%% handle_info({quic, closed, _Channel, ReasonFlag}, State) ->

View File

@ -16,6 +16,7 @@
-module(emqx_quic_connection).
-include("logger.hrl").
-ifndef(BUILD_WITHOUT_QUIC).
-include_lib("quicer/include/quicer.hrl").
-else.
@ -40,37 +41,50 @@
new_stream/3
]).
-type cb_state() :: map().
-type cb_ret() :: ok.
-spec init(map() | list()) -> cb_state().
-type cb_state() :: #{
ctrl_pid := undefined | pid(),
conn := undefined | quicer:conneciton_hanlder(),
stream_opts := map(),
is_resumed => boolean(),
_ => _
}.
-type cb_ret() :: quicer_lib:cb_ret().
-spec init(map() | list()) -> {ok, cb_state()}.
init(ConnOpts) when is_list(ConnOpts) ->
init(maps:from_list(ConnOpts));
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(ConnOpts) when is_map(ConnOpts) ->
{ok, ConnOpts}.
{ok, init_cb_state(ConnOpts)}.
-spec closed(quicer:conneciton_hanlder(), quicer:conn_closed_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}.
closed(_Conn, #{is_peer_acked := true}, S) ->
{stop, normal, S};
closed(_Conn, #{is_peer_acked := false}, S) ->
{stop, abnorml, S}.
{stop, normal, cb_state()}.
closed(_Conn, #{is_peer_acked := _} = Prop, S) ->
?SLOG(debug, Prop),
{stop, normal, S}.
-spec new_conn(quicer:connection_handler(), quicer:new_conn_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}.
new_conn(Conn, #{version := _Vsn}, #{zone := Zone} = S) ->
new_conn(
Conn,
#{version := _Vsn} = ConnInfo,
#{zone := Zone, conn := undefined, ctrl_pid := undefined} = S
) ->
process_flag(trap_exit, true),
?SLOG(debug, ConnInfo),
case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of
false ->
{ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S),
{ok, Pid} = emqx_connection:start_link(
emqx_quic_stream,
{self(), Conn, maps:without([crypto_buffer], ConnInfo)},
S
),
receive
{Pid, stream_acceptor_ready} ->
ok = quicer:async_handshake(Conn),
{ok, S#{conn => Conn}};
{'EXIT', Pid, _Reason} ->
{ok, S#{conn := Conn, ctrl_pid := Pid}};
{'EXIT', _Pid, _Reason} ->
{error, stream_accept_error}
end;
true ->
@ -80,10 +94,12 @@ new_conn(Conn, #{version := _Vsn}, #{zone := Zone} = S) ->
-spec connected(quicer:connection_handler(), quicer:connected_props(), cb_state()) ->
{ok, cb_state()} | {error, any()}.
connected(Conn, _Props, #{slow_start := false} = S) ->
connected(Conn, Props, #{slow_start := false} = S) ->
?SLOG(debug, Props),
{ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S),
{ok, S};
connected(_Conn, _Props, S) ->
connected(_Conn, Props, S) ->
?SLOG(debug, Props),
{ok, S}.
-spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret().
@ -92,10 +108,11 @@ resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when
->
ResumeFun(Conn, Data, S);
resumed(_Conn, _Data, S) ->
{ok, S}.
{ok, S#{is_resumed := true}}.
-spec nst_received(quicer:connection_handle(), TicketBin :: binary(), cb_state()) -> cb_ret().
nst_received(_Conn, _Data, S) ->
%% As server we should not recv NST!
{stop, no_nst_for_server, S}.
-spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret().
@ -116,14 +133,17 @@ new_stream(
Other ->
Other
end.
-spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret().
shutdown(Conn, _ErrorCode, S) ->
%% @TODO check spec what to do with the ErrorCode?
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
{ok, S}.
-spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) ->
cb_ret().
transport_shutdown(_C, _DownInfo, S) ->
%% @TODO some counter
{ok, S}.
-spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret().
@ -140,14 +160,21 @@ local_address_changed(_C, _NewAddr, S) ->
{BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()},
cb_state()
) -> cb_ret().
streams_available(_C, {_BidirCnt, _UnidirCnt}, S) ->
{ok, S}.
streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
{ok, S#{
peer_bidi_stream_count => BidirCnt,
peer_unidi_stream_count => UnidirCnt
}}.
-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
%% @TODO this is not going to get triggered.
%% for https://github.com/microsoft/msquic/issues/3120
peer_needs_streams(_C, undefined, S) ->
{ok, S}.
%%%
%%% Internals
%%%
-spec is_zone_olp_enabled(emqx_types:zone()) -> boolean().
is_zone_olp_enabled(Zone) ->
case emqx_config:get_zone_conf(Zone, [overload_protection]) of
@ -156,3 +183,10 @@ is_zone_olp_enabled(Zone) ->
_ ->
false
end.
-spec init_cb_state(map()) -> cb_state().
init_cb_state(Map) ->
Map#{
ctrl_pid => undefined,
conn => undefined
}.

View File

@ -17,6 +17,8 @@
%% MQTT/QUIC Stream
-module(emqx_quic_stream).
-behaviour(quicer_stream).
%% emqx transport Callbacks
-export([
type/1,
@ -32,13 +34,71 @@
peercert/1
]).
wait({ConnOwner, Conn}) ->
-include("logger.hrl").
-ifndef(BUILD_WITHOUT_QUIC).
-include_lib("quicer/include/quicer.hrl").
-else.
%% STREAM SHUTDOWN FLAGS
-define(QUIC_STREAM_SHUTDOWN_FLAG_NONE, 0).
% Cleanly closes the send path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 1).
% Abruptly closes the send path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_SEND, 2).
% Abruptly closes the receive path.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, 4).
% Abruptly closes both send and receive paths.
-define(QUIC_STREAM_SHUTDOWN_FLAG_ABORT, 6).
-define(QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, 8).
-endif.
-type cb_ret() :: gen_statem:event_handler_result().
-type cb_data() :: emqtt_quic:cb_data().
-type connection_handle() :: quicer:connection_handle().
-type stream_handle() :: quicer:stream_handle().
-export([
init_handoff/4,
new_stream/3,
start_completed/3,
send_complete/3,
peer_send_shutdown/3,
peer_send_aborted/3,
peer_receive_aborted/3,
send_shutdown_complete/3,
stream_closed/3,
peer_accepted/3,
passive/3,
handle_call/4
]).
-export_type([socket/0]).
-opaque socket() :: {quic, connection_handle(), stream_handle(), socket_info()}.
-type socket_info() :: #{
is_orphan => boolean(),
ctrl_stream_start_flags => quicer:stream_open_flags(),
%% quicer:new_conn_props
_ => _
}.
-spec wait({pid(), quicer:connection_handle(), socket_info()}) ->
{ok, socket()} | {error, enotconn}.
wait({ConnOwner, Conn, ConnInfo}) ->
{ok, Conn} = quicer:async_accept_stream(Conn, []),
ConnOwner ! {self(), stream_acceptor_ready},
receive
%% from msquic
{quic, new_stream, Stream, _Props} ->
{ok, {quic, Conn, Stream}};
%% New incoming stream, this is a *ctrl* stream
{quic, new_stream, Stream, #{is_orphan := IsOrphan, flags := StartFlags}} ->
SocketInfo = ConnInfo#{
is_orphan => IsOrphan,
ctrl_stream_start_flags => StartFlags
},
{ok, socket(Conn, Stream, SocketInfo)};
%% connection closed event for stream acceptor
{quic, closed, undefined, undefined} ->
{error, enotconn};
%% Connection owner process down
{'EXIT', ConnOwner, _Reason} ->
{error, enotconn}
end.
@ -46,17 +106,17 @@ wait({ConnOwner, Conn}) ->
type(_) ->
quic.
peername({quic, Conn, _Stream}) ->
peername({quic, Conn, _Stream, _Info}) ->
quicer:peername(Conn).
sockname({quic, Conn, _Stream}) ->
sockname({quic, Conn, _Stream, _Info}) ->
quicer:sockname(Conn).
peercert(_S) ->
%% @todo but unsupported by msquic
nossl.
getstat({quic, Conn, _Stream}, Stats) ->
getstat({quic, Conn, _Stream, _Info}, Stats) ->
case quicer:getstat(Conn, Stats) of
{error, _} -> {error, closed};
Res -> Res
@ -84,7 +144,7 @@ getopts(_Socket, _Opts) ->
{buffer, 80000}
]}.
fast_close({quic, _Conn, Stream}) ->
fast_close({quic, _Conn, Stream, _Info}) ->
%% Flush send buffer, gracefully shutdown
quicer:async_shutdown_stream(Stream),
ok.
@ -102,8 +162,131 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
Result
end.
async_send({quic, _Conn, Stream}, Data, _Options) ->
async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
case quicer:send(Stream, Data) of
{ok, _Len} -> ok;
Other -> Other
end.
%%%
%%% quicer stream callbacks
%%%
-spec init_handoff(stream_handle(), #{}, quicer:connection_handle(), #{}) -> cb_ret().
init_handoff(_Stream, _StreamOpts, _Conn, _Flags) ->
%% stream owner already set while starts.
{stop, unimpl}.
-spec new_stream(stream_handle(), quicer:new_stream_props(), cb_data()) -> cb_ret().
new_stream(_Stream, #{flags := _Flags, is_orphan := _IsOrphan}, _Conn) ->
{stop, unimpl}.
-spec peer_accepted(stream_handle(), undefined, cb_data()) -> cb_ret().
peer_accepted(_Stream, undefined, S) ->
%% We just ignore it
{ok, S}.
-spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
%% we abort send with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S};
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := true} = S) ->
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}.
-spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_send_aborted(Stream, ErrorCode, #{is_unidir := false} = S) ->
%% we abort receive with same reason
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
{ok, S};
peer_send_aborted(Stream, ErrorCode, #{is_unidir := true, is_local := false} = S) ->
quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
{ok, S}.
-spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret().
peer_send_shutdown(Stream, undefined, S) ->
ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
{ok, S}.
-spec send_complete(stream_handle(), boolean(), cb_data()) -> cb_ret().
send_complete(_Stream, false, S) ->
{ok, S};
send_complete(_Stream, true = _IsCancelled, S) ->
?SLOG(error, #{message => "send cancelled"}),
{ok, S}.
-spec send_shutdown_complete(stream_handle(), boolean(), cb_data()) -> cb_ret().
send_shutdown_complete(_Stream, _IsGraceful, S) ->
{ok, S}.
-spec start_completed(stream_handle(), quicer:stream_start_completed_props(), cb_data()) ->
cb_ret().
start_completed(_Stream, #{status := success, stream_id := StreamId} = Prop, S) ->
?SLOG(debug, Prop),
{ok, S#{stream_id => StreamId}};
start_completed(_Stream, #{status := stream_limit_reached, stream_id := _StreamId} = Prop, _S) ->
?SLOG(error, #{message => start_completed}, Prop),
{stop, stream_limit_reached};
start_completed(_Stream, #{status := Other} = Prop, S) ->
?SLOG(error, Prop),
%% or we could retry?
{stop, {start_fail, Other}, S}.
%% Local stream, Unidir
%% -spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_data())
%% -> cb_ret().
%% handle_stream_data(Stream, Bin, Flags, #{ is_local := true
%% , parse_state := PS} = S) ->
%% ?SLOG(debug, #{data => Bin}, Flags),
%% case parse(Bin, PS, []) of
%% {keep_state, NewPS, Packets} ->
%% quicer:setopt(Stream, active, once),
%% {keep_state, S#{parse_state := NewPS},
%% [{next_event, cast, P } || P <- lists:reverse(Packets)]};
%% {stop, _} = Stop ->
%% Stop
%% end;
%% %% Remote stream
%% handle_stream_data(_Stream, _Bin, _Flags,
%% #{is_local := false, is_unidir := true, conn := _Conn} = _S) ->
%% {stop, unimpl}.
-spec passive(stream_handle(), undefined, cb_data()) -> cb_ret().
passive(_Stream, undefined, _S) ->
{stop, unimpl}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) -> cb_ret().
stream_closed(
_Stream,
#{
is_conn_shutdown := IsConnShutdown,
is_app_closing := IsAppClosing,
is_shutdown_by_app := IsAppShutdown,
is_closed_remotely := IsRemote,
status := Status,
error := Code
},
S
) when
is_boolean(IsConnShutdown) andalso
is_boolean(IsAppClosing) andalso
is_boolean(IsAppShutdown) andalso
is_boolean(IsRemote) andalso
is_atom(Status) andalso
is_integer(Code)
->
%% @TODO for now we fake a sock_closed for
%% emqx_connection:process_msg to append
%% a msg to be processed
{ok, {sock_closed, Status}, S}.
handle_call(_Stream, _Request, _Opts, S) ->
{error, unimpl, S}.
%%%
%%% Internals
%%%
-spec socket(connection_handle(), stream_handle(), socket_info()) -> socket().
socket(Conn, CtrlStream, Info) when is_map(Info) ->
{quic, Conn, CtrlStream, Info}.