Merge pull request #9949 from qzhuyan/dev/william/e5.0.1/multi-stream

feat(quic): multi streams
This commit is contained in:
William Yang 2023-02-20 17:45:56 +01:00 committed by GitHub
commit a2762e5a37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 3092 additions and 119 deletions

View File

@ -56,7 +56,7 @@ jobs:
echo "runs-on=${RUNS_ON}" | tee -a $GITHUB_OUTPUT echo "runs-on=${RUNS_ON}" | tee -a $GITHUB_OUTPUT
prepare: prepare:
runs-on: aws-amd64 runs-on: ${{ needs.build-matrix.outputs.runs-on }}
needs: [build-matrix] needs: [build-matrix]
strategy: strategy:
fail-fast: false fail-fast: false

View File

@ -34,6 +34,10 @@ listeners.wss.default {
# enabled = true # enabled = true
# bind = "0.0.0.0:14567" # bind = "0.0.0.0:14567"
# max_connections = 1024000 # max_connections = 1024000
# ssl_options {
# verify = verify_none
# keyfile = "{{ platform_etc_dir }}/certs/key.pem" # keyfile = "{{ platform_etc_dir }}/certs/key.pem"
# certfile = "{{ platform_etc_dir }}/certs/cert.pem" # certfile = "{{ platform_etc_dir }}/certs/cert.pem"
#} # cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
# }
# }

View File

@ -1815,8 +1815,8 @@ fields_listener_enabled {
fields_mqtt_quic_listener_certfile { fields_mqtt_quic_listener_certfile {
desc { desc {
en: """Path to the certificate file.""" en: """Path to the certificate file. Will be deprecated in 5.1, use .ssl_options.certfile instead."""
zh: """证书文件。""" zh: """证书文件。在 5.1 中会被废弃,使用 .ssl_options.certfile 代替。"""
} }
label: { label: {
en: "Certificate file" en: "Certificate file"
@ -1826,8 +1826,8 @@ fields_mqtt_quic_listener_certfile {
fields_mqtt_quic_listener_keyfile { fields_mqtt_quic_listener_keyfile {
desc { desc {
en: """Path to the secret key file.""" en: """Path to the secret key file. Will be deprecated in 5.1, use .ssl_options.keyfile instead."""
zh: """私钥文件。""" zh: """私钥文件。在 5.1 中会被废弃,使用 .ssl_options.keyfile 代替。"""
} }
label: { label: {
en: "Key file" en: "Key file"
@ -1868,6 +1868,17 @@ fields_mqtt_quic_listener_keep_alive_interval {
} }
} }
fields_mqtt_quic_listener_ssl_options {
desc {
en: """TLS options for QUIC transport"""
zh: """QUIC 传输层的 TLS 选项"""
}
label: {
en: "TLS Options"
zh: "TLS 选项"
}
}
base_listener_bind { base_listener_bind {
desc { desc {
en: """IP address and port for the listening socket.""" en: """IP address and port for the listening socket."""

View File

@ -0,0 +1,25 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQX_QUIC_HRL).
-define(EMQX_QUIC_HRL, true).
%% MQTT Over QUIC Shutdown Error code.
-define(MQTT_QUIC_CONN_NOERROR, 0).
-define(MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN, 1).
-define(MQTT_QUIC_CONN_ERROR_OVERLOADED, 2).
-endif.

View File

@ -43,7 +43,7 @@
{meck, "0.9.2"}, {meck, "0.9.2"},
{proper, "1.4.0"}, {proper, "1.4.0"},
{bbmustache, "1.10.0"}, {bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0"}}} {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.1"}}}
]}, ]},
{extra_src_dirs, [{"test", [recursive]}]} {extra_src_dirs, [{"test", [recursive]}]}
]} ]}

View File

@ -24,7 +24,20 @@ IsQuicSupp = fun() ->
end, end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.16"}}}. Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.109"}}}.
Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),
{plt_extra_apps, OldExtra} = lists:keyfind(plt_extra_apps, 1, OldDialyzerConfig),
Extra = OldExtra ++ [quicer || IsQuicSupp()],
NewDialyzerConfig = [{plt_extra_apps, Extra} | OldDialyzerConfig],
lists:keystore(
dialyzer,
1,
Config,
{dialyzer, NewDialyzerConfig}
)
end.
ExtraDeps = fun(C) -> ExtraDeps = fun(C) ->
{deps, Deps0} = lists:keyfind(deps, 1, C), {deps, Deps0} = lists:keyfind(deps, 1, C),
@ -43,4 +56,4 @@ ExtraDeps = fun(C) ->
) )
end, end,
ExtraDeps(CONFIG). Dialyzer(ExtraDeps(CONFIG)).

View File

@ -14,7 +14,13 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT/TCP|TLS Connection %% This module interacts with the transport layer of MQTT
%% Transport:
%% - TCP connection
%% - TCP/TLS connection
%% - QUIC Stream
%%
%% for WebSocket @see emqx_ws_connection.erl
-module(emqx_connection). -module(emqx_connection).
-include("emqx.hrl"). -include("emqx.hrl").
@ -111,7 +117,10 @@
limiter_buffer :: queue:queue(pending_req()), limiter_buffer :: queue:queue(pending_req()),
%% limiter timers %% limiter timers
limiter_timer :: undefined | reference() limiter_timer :: undefined | reference(),
%% QUIC conn owner pid if in use.
quic_conn_pid :: maybe(pid())
}). }).
-record(retry, { -record(retry, {
@ -189,12 +198,16 @@
]} ]}
). ).
-spec start_link( -spec start_link
esockd:transport(), (esockd:transport(), esockd:socket(), emqx_channel:opts()) ->
esockd:socket() | {pid(), quicer:connection_handler()}, {ok, pid()};
emqx_channel:opts() (
) -> emqx_quic_stream,
{ConnOwner :: pid(), quicer:connection_handle(), quicer:new_conn_props()},
emqx_quic_connection:cb_state()
) ->
{ok, pid()}. {ok, pid()}.
start_link(Transport, Socket, Options) -> start_link(Transport, Socket, Options) ->
Args = [self(), Transport, Socket, Options], Args = [self(), Transport, Socket, Options],
CPid = proc_lib:spawn_link(?MODULE, init, Args), CPid = proc_lib:spawn_link(?MODULE, init, Args),
@ -329,6 +342,7 @@ init_state(
}, },
ParseState = emqx_frame:initial_parse_state(FrameOpts), ParseState = emqx_frame:initial_parse_state(FrameOpts),
Serialize = emqx_frame:serialize_opts(), Serialize = emqx_frame:serialize_opts(),
%% Init Channel
Channel = emqx_channel:init(ConnInfo, Opts), Channel = emqx_channel:init(ConnInfo, Opts),
GcState = GcState =
case emqx_config:get_zone_conf(Zone, [force_gc]) of case emqx_config:get_zone_conf(Zone, [force_gc]) of
@ -359,7 +373,9 @@ init_state(
zone = Zone, zone = Zone,
listener = Listener, listener = Listener,
limiter_buffer = queue:new(), limiter_buffer = queue:new(),
limiter_timer = undefined limiter_timer = undefined,
%% for quic streams to inherit
quic_conn_pid = maps:get(conn_pid, Opts, undefined)
}. }.
run_loop( run_loop(
@ -476,7 +492,9 @@ process_msg([Msg | More], State) ->
{ok, Msgs, NState} -> {ok, Msgs, NState} ->
process_msg(append_msg(More, Msgs), NState); process_msg(append_msg(More, Msgs), NState);
{stop, Reason, NState} -> {stop, Reason, NState} ->
{stop, Reason, NState} {stop, Reason, NState};
{stop, Reason} ->
{stop, Reason, State}
end end
catch catch
exit:normal -> exit:normal ->
@ -507,7 +525,6 @@ append_msg(Q, Msg) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle a Msg %% Handle a Msg
handle_msg({'$gen_call', From, Req}, State) -> handle_msg({'$gen_call', From, Req}, State) ->
case handle_call(From, Req, State) of case handle_call(From, Req, State) of
{reply, Reply, NState} -> {reply, Reply, NState} ->
@ -525,11 +542,10 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State); when_bytes_in(Oct, Data, State);
handle_msg({quic, Data, _Sock, _, _, _}, State) -> handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
Oct = iolist_size(Data), inc_counter(incoming_bytes, Len),
inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Len),
ok = emqx_metrics:inc('bytes.received', Oct), when_bytes_in(Len, Data, State);
when_bytes_in(Oct, Data, State);
handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
case queue:peek(Cache) of case queue:peek(Cache) of
empty -> empty ->
@ -595,9 +611,20 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_msg({connack, ConnAck}, State) -> handle_msg({connack, ConnAck}, State) ->
handle_outgoing(ConnAck, State); handle_outgoing(ConnAck, State);
handle_msg({close, Reason}, State) -> handle_msg({close, Reason}, State) ->
%% @FIXME here it could be close due to appl error.
?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}), ?TRACE("SOCKET", "socket_force_closed", #{reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) -> handle_msg(
{event, connected},
State = #state{
channel = Channel,
serialize = Serialize,
parse_state = PS,
quic_conn_pid = QuicConnPid
}
) ->
QuicConnPid =/= undefined andalso
emqx_quic_connection:activate_data_streams(QuicConnPid, {PS, Serialize, Channel}),
ClientId = emqx_channel:info(clientid, Channel), ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)); emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
handle_msg({event, disconnected}, State = #state{channel = Channel}) -> handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
@ -654,6 +681,12 @@ maybe_raise_exception(#{
stacktrace := Stacktrace stacktrace := Stacktrace
}) -> }) ->
erlang:raise(Exception, Context, Stacktrace); erlang:raise(Exception, Context, Stacktrace);
maybe_raise_exception({shutdown, normal}) ->
ok;
maybe_raise_exception(normal) ->
ok;
maybe_raise_exception(shutdown) ->
ok;
maybe_raise_exception(Reason) -> maybe_raise_exception(Reason) ->
exit(Reason). exit(Reason).
@ -748,6 +781,7 @@ when_bytes_in(Oct, Data, State) ->
NState NState
). ).
%% @doc: return a reversed Msg list
-compile({inline, [next_incoming_msgs/3]}). -compile({inline, [next_incoming_msgs/3]}).
next_incoming_msgs([Packet], Msgs, State) -> next_incoming_msgs([Packet], Msgs, State) ->
{ok, [{incoming, Packet} | Msgs], State}; {ok, [{incoming, Packet} | Msgs], State};
@ -870,6 +904,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel})
ok; ok;
Error = {error, _Reason} -> Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error %% Send an inet_reply to postpone handling the error
%% @FIXME: why not just return error?
self() ! {inet_reply, Socket, Error}, self() ! {inet_reply, Socket, Error},
ok ok
end. end.
@ -893,12 +928,14 @@ handle_info({sock_error, Reason}, State) ->
false -> ok false -> ok
end, end,
handle_info({sock_closed, Reason}, close_socket(State)); handle_info({sock_closed, Reason}, close_socket(State));
handle_info({quic, peer_send_shutdown, _Stream}, State) -> %% handle QUIC control stream events
handle_info({sock_closed, force}, close_socket(State)); handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) ->
handle_info({quic, closed, _Channel, ReasonFlag}, State) -> case emqx_quic_stream:Event(Handle, Prop, State) of
handle_info({sock_closed, ReasonFlag}, State); {{continue, Msgs}, NewState} ->
handle_info({quic, closed, _Stream}, State) -> {ok, Msgs, NewState};
handle_info({sock_closed, force}, State); Other ->
Other
end;
handle_info(Info, State) -> handle_info(Info, State) ->
with_channel(handle_info, [Info], State). with_channel(handle_info, [Info], State).

View File

@ -72,9 +72,7 @@ id_example() -> 'tcp:default'.
list_raw() -> list_raw() ->
[ [
{listener_id(Type, LName), Type, LConf} {listener_id(Type, LName), Type, LConf}
|| %% FIXME: quic is not supported update vi dashboard yet || {Type, LName, LConf} <- do_list_raw()
{Type, LName, LConf} <- do_list_raw(),
Type =/= <<"quic">>
]. ].
list() -> list() ->
@ -170,6 +168,11 @@ current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
esockd:get_current_connections({listener_id(Type, Name), ListenOn}); esockd:get_current_connections({listener_id(Type, Name), ListenOn});
current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss -> current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss ->
proplists:get_value(all_connections, ranch:info(listener_id(Type, Name))); proplists:get_value(all_connections, ranch:info(listener_id(Type, Name)));
current_conns(quic, _Name, _ListenOn) ->
case quicer:perf_counters() of
{ok, PerfCnts} -> proplists:get_value(conn_active, PerfCnts);
_ -> 0
end;
current_conns(_, _, _) -> current_conns(_, _, _) ->
{error, not_support}. {error, not_support}.
@ -367,16 +370,26 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
case [A || {quicer, _, _} = A <- application:which_applications()] of case [A || {quicer, _, _} = A <- application:which_applications()] of
[_] -> [_] ->
DefAcceptors = erlang:system_info(schedulers_online) * 8, DefAcceptors = erlang:system_info(schedulers_online) * 8,
ListenOpts = [ SSLOpts = maps:merge(
{cert, maps:get(certfile, Opts)}, maps:with([certfile, keyfile], Opts),
{key, maps:get(keyfile, Opts)}, maps:get(ssl_options, Opts, #{})
),
ListenOpts =
[
{certfile, str(maps:get(certfile, SSLOpts))},
{keyfile, str(maps:get(keyfile, SSLOpts))},
{alpn, ["mqtt"]}, {alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}, {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)}, {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
{idle_timeout_ms, maps:get(idle_timeout, Opts, 0)}, {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
{handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)}, {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
{server_resumption_level, 2} {server_resumption_level, 2},
], {verify, maps:get(verify, SSLOpts, verify_none)}
] ++
case maps:get(cacertfile, SSLOpts, undefined) of
undefined -> [];
CaCertFile -> [{cacertfile, binary_to_list(CaCertFile)}]
end,
ConnectionOpts = #{ ConnectionOpts = #{
conn_callback => emqx_quic_connection, conn_callback => emqx_quic_connection,
peer_unidi_stream_count => 1, peer_unidi_stream_count => 1,
@ -385,13 +398,16 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
listener => {quic, ListenerName}, listener => {quic, ListenerName},
limiter => limiter(Opts) limiter => limiter(Opts)
}, },
StreamOpts = [{stream_callback, emqx_quic_stream}], StreamOpts = #{
stream_callback => emqx_quic_stream,
active => 1
},
Id = listener_id(quic, ListenerName), Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Opts), add_limiter_bucket(Id, Opts),
quicer:start_listener( quicer:start_listener(
Id, Id,
ListenOn, ListenOn,
{ListenOpts, ConnectionOpts, StreamOpts} {maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
); );
[] -> [] ->
{ok, {skipped, quic_app_missing}} {ok, {skipped, quic_app_missing}}

View File

@ -14,60 +14,282 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc impl. the quic connection owner process.
-module(emqx_quic_connection). -module(emqx_quic_connection).
-ifndef(BUILD_WITHOUT_QUIC). -ifndef(BUILD_WITHOUT_QUIC).
-include_lib("quicer/include/quicer.hrl").
-else.
-define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0).
-endif.
%% Callbacks -include("logger.hrl").
-include_lib("quicer/include/quicer.hrl").
-include_lib("emqx/include/emqx_quic.hrl").
-behaviour(quicer_connection).
-export([ -export([
init/1, init/1,
new_conn/2, new_conn/3,
connected/2, connected/3,
shutdown/2 transport_shutdown/3,
shutdown/3,
closed/3,
local_address_changed/3,
peer_address_changed/3,
streams_available/3,
peer_needs_streams/3,
resumed/3,
new_stream/3
]). ]).
-type cb_state() :: map() | proplists:proplist(). -export([activate_data_streams/2]).
-spec init(cb_state()) -> cb_state(). -export([
init(ConnOpts) when is_list(ConnOpts) -> handle_call/3,
init(maps:from_list(ConnOpts)); handle_info/2
]).
-type cb_state() :: #{
%% connecion owner pid
conn_pid := pid(),
%% Pid of ctrl stream
ctrl_pid := undefined | pid(),
%% quic connecion handle
conn := undefined | quicer:conneciton_handle(),
%% Data streams that handoff from this process
%% these streams could die/close without effecting the connecion/session.
%@TODO type?
streams := [{pid(), quicer:stream_handle()}],
%% New stream opts
stream_opts := map(),
%% If conneciton is resumed from session ticket
is_resumed => boolean(),
%% mqtt message serializer config
serialize => undefined,
_ => _
}.
-type cb_ret() :: quicer_lib:cb_ret().
%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked
%% for the activation from control stream after it is accepted as a legit conneciton.
%% For security, the initial number of allowed data streams from client should be limited by
%% 'peer_bidi_stream_count` & 'peer_unidi_stream_count`
-spec activate_data_streams(pid(), {
emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel()
}) -> ok.
activate_data_streams(ConnOwner, {PS, Serialize, Channel}) ->
gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity).
%% @doc conneciton owner init callback
-spec init(map()) -> {ok, cb_state()}.
init(#{stream_opts := SOpts} = S) when is_list(SOpts) ->
init(S#{stream_opts := maps:from_list(SOpts)});
init(ConnOpts) when is_map(ConnOpts) -> init(ConnOpts) when is_map(ConnOpts) ->
ConnOpts. {ok, init_cb_state(ConnOpts)}.
-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. -spec closed(quicer:conneciton_handle(), quicer:conn_closed_props(), cb_state()) ->
new_conn(Conn, #{zone := Zone} = S) -> {stop, normal, cb_state()}.
closed(_Conn, #{is_peer_acked := _} = Prop, S) ->
?SLOG(debug, Prop),
{stop, normal, S}.
%% @doc handle the new incoming connecion as the connecion acceptor.
-spec new_conn(quicer:connection_handle(), quicer:new_conn_props(), cb_state()) ->
{ok, cb_state()} | {error, any(), cb_state()}.
new_conn(
Conn,
#{version := _Vsn} = ConnInfo,
#{zone := Zone, conn := undefined, ctrl_pid := undefined} = S
) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
?SLOG(debug, ConnInfo),
case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of case emqx_olp:is_overloaded() andalso is_zone_olp_enabled(Zone) of
false -> false ->
{ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S), %% Start control stream process
StartOption = S,
{ok, CtrlPid} = emqx_connection:start_link(
emqx_quic_stream,
{self(), Conn, maps:without([crypto_buffer], ConnInfo)},
StartOption
),
receive receive
{Pid, stream_acceptor_ready} -> {CtrlPid, stream_acceptor_ready} ->
ok = quicer:async_handshake(Conn), ok = quicer:async_handshake(Conn),
{ok, S}; {ok, S#{conn := Conn, ctrl_pid := CtrlPid}};
{'EXIT', Pid, _Reason} -> {'EXIT', _Pid, _Reason} ->
{error, stream_accept_error} {stop, stream_accept_error, S}
end; end;
true -> true ->
emqx_metrics:inc('olp.new_conn'), emqx_metrics:inc('olp.new_conn'),
{error, overloaded} _ = quicer:async_shutdown_connection(
Conn,
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
?MQTT_QUIC_CONN_ERROR_OVERLOADED
),
{stop, normal, S}
end. end.
-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. %% @doc callback when connection is connected.
connected(Conn, #{slow_start := false} = S) -> -spec connected(quicer:connection_handle(), quicer:connected_props(), cb_state()) ->
{ok, _Pid} = emqx_connection:start_link(emqx_quic_stream, Conn, S), {ok, cb_state()} | {error, any(), cb_state()}.
connected(_Conn, Props, S) ->
?SLOG(debug, Props),
{ok, S}.
%% @doc callback when connection is resumed from 0-RTT
-spec resumed(quicer:connection_handle(), SessionData :: binary() | false, cb_state()) -> cb_ret().
%% reserve resume conn with callback.
%% resumed(Conn, Data, #{resumed_callback := ResumeFun} = S) when
%% is_function(ResumeFun)
%% ->
%% ResumeFun(Conn, Data, S);
resumed(_Conn, _Data, S) ->
{ok, S#{is_resumed := true}}.
%% @doc callback for handling orphan data streams
%% depends on the connecion state and control stream state.
-spec new_stream(quicer:stream_handle(), quicer:new_stream_props(), cb_state()) -> cb_ret().
new_stream(
Stream,
#{is_orphan := true, flags := _Flags} = Props,
#{
conn := Conn,
streams := Streams,
stream_opts := SOpts,
zone := Zone,
limiter := Limiter,
parse_state := PS,
channel := Channel,
serialize := Serialize
} = S
) ->
%% Cherry pick options for data streams
SOpts1 = SOpts#{
is_local => false,
zone => Zone,
% unused
limiter => Limiter,
parse_state => PS,
channel => Channel,
serialize => Serialize,
quic_event_mask => ?QUICER_STREAM_EVENT_MASK_START_COMPLETE
},
{ok, NewStreamOwner} = quicer_stream:start_link(
emqx_quic_data_stream,
Stream,
Conn,
SOpts1,
Props
),
case quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}) of
ok ->
ok;
E ->
%% Only log, keep connecion alive.
?SLOG(error, #{message => "new stream handoff failed", stream => Stream, error => E})
end,
%% @TODO maybe keep them in `inactive_streams'
{ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}.
%% @doc callback for handling remote connecion shutdown.
-spec shutdown(quicer:connection_handle(), quicer:error_code(), cb_state()) -> cb_ret().
shutdown(Conn, ErrorCode, S) ->
ErrorCode =/= 0 andalso ?SLOG(debug, #{error_code => ErrorCode, state => S}),
_ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
{ok, S}.
%% @doc callback for handling transport error, such as idle timeout
-spec transport_shutdown(quicer:connection_handle(), quicer:transport_shutdown_props(), cb_state()) ->
cb_ret().
transport_shutdown(_C, DownInfo, S) when is_map(DownInfo) ->
?SLOG(debug, DownInfo),
{ok, S}.
%% @doc callback for handling for peer addr changed.
-spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret().
peer_address_changed(_C, _NewAddr, S) ->
%% @TODO update conn info in emqx_quic_stream
{ok, S}.
%% @doc callback for handling local addr change, currently unused
-spec local_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state()) ->
cb_ret().
local_address_changed(_C, _NewAddr, S) ->
{ok, S}.
%% @doc callback for handling remote stream limit updates
-spec streams_available(
quicer:connection_handle(),
{BidirStreams :: non_neg_integer(), UnidirStreams :: non_neg_integer()},
cb_state()
) -> cb_ret().
streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
{ok, S#{
peer_bidi_stream_count => BidirCnt,
peer_unidi_stream_count => UnidirCnt
}}.
%% @doc callback for handling request when remote wants for more streams
%% should cope with rate limiting
%% @TODO this is not going to get triggered in current version
%% ref: https://github.com/microsoft/msquic/issues/3120
-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
peer_needs_streams(_C, undefined, S) ->
?SLOG(info, #{
msg => "ignore: peer need more streames", info => maps:with([conn_pid, ctrl_pid], S)
}),
{ok, S}.
%% @doc handle API calls
-spec handle_call(Req :: term(), gen_server:from(), cb_state()) -> cb_ret().
handle_call(
{activate_data_streams, {PS, Serialize, Channel} = ActivateData},
_From,
#{streams := Streams} = S
) ->
_ = [
%% Try to activate streams individually if failed, stream will shutdown on its own.
%% we dont care about the return val here.
%% note, this is only used after control stream pass the validation. The data streams
%% that are called here are assured to be inactived (data processing hasn't been started).
catch emqx_quic_data_stream:activate_data(OwnerPid, ActivateData)
|| {OwnerPid, _Stream} <- Streams
],
{reply, ok, S#{
channel := Channel,
serialize := Serialize,
parse_state := PS
}};
handle_call(_Req, _From, S) ->
{reply, {error, unimpl}, S}.
%% @doc handle DOWN messages from streams.
handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) ->
Code =
case Reason of
normal ->
?MQTT_QUIC_CONN_NOERROR;
_ ->
?MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN
end,
_ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, Code),
{ok, S}; {ok, S};
connected(_Conn, S) -> handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) ->
{ok, S}. case proplists:is_defined(Pid, Streams) of
true when
-spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. Reason =:= normal orelse
shutdown(Conn, S) -> Reason =:= {shutdown, protocol_error} orelse
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), Reason =:= killed
{ok, S}. ->
{ok, S};
true ->
?SLOG(info, #{message => "Data stream unexpected exit", reason => Reason}),
{ok, S};
false ->
{stop, unknown_pid_down, S}
end.
%%%
%%% Internals
%%%
-spec is_zone_olp_enabled(emqx_types:zone()) -> boolean(). -spec is_zone_olp_enabled(emqx_types:zone()) -> boolean().
is_zone_olp_enabled(Zone) -> is_zone_olp_enabled(Zone) ->
case emqx_config:get_zone_conf(Zone, [overload_protection]) of case emqx_config:get_zone_conf(Zone, [overload_protection]) of
@ -76,3 +298,20 @@ is_zone_olp_enabled(Zone) ->
_ -> _ ->
false false
end. end.
-spec init_cb_state(map()) -> cb_state().
init_cb_state(#{zone := _Zone} = Map) ->
Map#{
conn_pid => self(),
ctrl_pid => undefined,
conn => undefined,
streams => [],
parse_state => undefined,
channel => undefined,
serialize => undefined,
is_resumed => false
}.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

View File

@ -0,0 +1,469 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%%
%% @doc QUIC data stream
%% Following the behaviour of emqx_connection:
%% The MQTT packets and their side effects are handled *atomically*.
%%
-module(emqx_quic_data_stream).
-ifndef(BUILD_WITHOUT_QUIC).
-behaviour(quicer_remote_stream).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("quicer/include/quicer.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
%% Connection Callbacks
-export([
init_handoff/4,
post_handoff/3,
send_complete/3,
peer_send_shutdown/3,
peer_send_aborted/3,
peer_receive_aborted/3,
send_shutdown_complete/3,
stream_closed/3,
passive/3
]).
-export([handle_stream_data/4]).
%% gen_server API
-export([activate_data/2]).
-export([
handle_call/3,
handle_info/2,
handle_continue/2
]).
-type cb_ret() :: quicer_stream:cb_ret().
-type cb_state() :: quicer_stream:cb_state().
-type error_code() :: quicer:error_code().
-type connection_handle() :: quicer:connection_handle().
-type stream_handle() :: quicer:stream_handle().
-type handoff_data() :: {
emqx_frame:parse_state() | undefined,
emqx_frame:serialize_opts() | undefined,
emqx_channel:channel() | undefined
}.
%%
%% @doc Activate the data handling.
%% Note, data handling is disabled before finishing the validation over control stream.
-spec activate_data(pid(), {
emqx_frame:parse_state(), emqx_frame:serialize_opts(), emqx_channel:channel()
}) -> ok.
activate_data(StreamPid, {PS, Serialize, Channel}) ->
gen_server:call(StreamPid, {activate, {PS, Serialize, Channel}}, infinity).
%%
%% @doc Handoff from previous owner, from the connection owner.
%% Note, unlike control stream, there is no acceptor for data streams.
%% The connection owner get new stream, spawn new proc and then handover to it.
%%
-spec init_handoff(stream_handle(), map(), connection_handle(), quicer:new_stream_props()) ->
{ok, cb_state()}.
init_handoff(
Stream,
_StreamOpts,
Connection,
#{is_orphan := true, flags := Flags}
) ->
{ok, init_state(Stream, Connection, Flags)}.
%%
%% @doc Post handoff data stream
%%
-spec post_handoff(stream_handle(), handoff_data(), cb_state()) -> cb_ret().
post_handoff(_Stream, {undefined = _PS, undefined = _Serialize, undefined = _Channel}, S) ->
%% When the channel isn't ready yet.
%% Data stream should wait for activate call with ?MODULE:activate_data/2
{ok, S};
post_handoff(Stream, {PS, Serialize, Channel}, S) ->
?tp(debug, ?FUNCTION_NAME, #{channel => Channel, serialize => Serialize}),
_ = quicer:setopt(Stream, active, 10),
{ok, S#{channel := Channel, serialize := Serialize, parse_state := PS}}.
-spec peer_receive_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
%% we abort send with same reason
_ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}.
-spec peer_send_aborted(stream_handle(), error_code(), cb_state()) -> cb_ret().
peer_send_aborted(Stream, ErrorCode, #{is_unidir := _} = S) ->
%% we abort receive with same reason
_ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE, ErrorCode),
{ok, S}.
-spec peer_send_shutdown(stream_handle(), undefined, cb_state()) -> cb_ret().
peer_send_shutdown(Stream, undefined, S) ->
ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
{ok, S}.
-spec send_complete(stream_handle(), IsCanceled :: boolean(), cb_state()) -> cb_ret().
send_complete(_Stream, false, S) ->
{ok, S};
send_complete(_Stream, true = _IsCanceled, S) ->
{ok, S}.
-spec send_shutdown_complete(stream_handle(), error_code(), cb_state()) -> cb_ret().
send_shutdown_complete(_Stream, _Flags, S) ->
{ok, S}.
-spec handle_stream_data(stream_handle(), binary(), quicer:recv_data_props(), cb_state()) ->
cb_ret().
handle_stream_data(
_Stream,
Bin,
_Flags,
#{
is_unidir := false,
channel := Channel,
parse_state := PS,
data_queue := QueuedData,
task_queue := TQ
} = State
) when
%% assert get stream data only after channel is created
Channel =/= undefined
->
{MQTTPackets, NewPS} = parse_incoming(list_to_binary(lists:reverse([Bin | QueuedData])), PS),
NewTQ = lists:foldl(
fun(Item, Acc) ->
queue:in(Item, Acc)
end,
TQ,
[{incoming, P} || P <- lists:reverse(MQTTPackets)]
),
{{continue, handle_appl_msg}, State#{parse_state := NewPS, task_queue := NewTQ}}.
-spec passive(stream_handle(), undefined, cb_state()) -> cb_ret().
passive(Stream, undefined, S) ->
_ = quicer:setopt(Stream, active, 10),
{ok, S}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_state()) -> cb_ret().
stream_closed(
_Stream,
#{
is_conn_shutdown := IsConnShutdown,
is_app_closing := IsAppClosing,
is_shutdown_by_app := IsAppShutdown,
is_closed_remotely := IsRemote,
status := Status,
error := Code
},
S
) when
is_boolean(IsConnShutdown) andalso
is_boolean(IsAppClosing) andalso
is_boolean(IsAppShutdown) andalso
is_boolean(IsRemote) andalso
is_atom(Status) andalso
is_integer(Code)
->
{stop, normal, S}.
-spec handle_call(Request :: term(), From :: {pid(), term()}, cb_state()) -> cb_ret().
handle_call(Call, _From, S) ->
do_handle_call(Call, S).
-spec handle_continue(Continue :: term(), cb_state()) -> cb_ret().
handle_continue(handle_appl_msg, #{task_queue := Q} = S) ->
case queue:out(Q) of
{{value, Item}, Q2} ->
do_handle_appl_msg(Item, S#{task_queue := Q2});
{empty, _Q} ->
{ok, S}
end.
%%% Internals
do_handle_appl_msg(
{outgoing, Packets},
#{
channel := Channel,
stream := _Stream,
serialize := _Serialize
} = S
) when
Channel =/= undefined
->
case handle_outgoing(Packets, S) of
{ok, Size} ->
ok = emqx_metrics:inc('bytes.sent', Size),
{{continue, handle_appl_msg}, S};
{error, E1, E2} ->
{stop, {E1, E2}, S};
{error, E} ->
{stop, E, S}
end;
do_handle_appl_msg({incoming, #mqtt_packet{} = Packet}, #{channel := Channel} = S) when
Channel =/= undefined
->
ok = inc_incoming_stats(Packet),
with_channel(handle_in, [Packet], S);
do_handle_appl_msg({incoming, {frame_error, _} = FE}, #{channel := Channel} = S) when
Channel =/= undefined
->
with_channel(handle_in, [FE], S);
do_handle_appl_msg({close, Reason}, S) ->
%% @TODO shall we abort shutdown or graceful shutdown here?
with_channel(handle_info, [{sock_closed, Reason}], S);
do_handle_appl_msg({event, updated}, S) ->
%% Data stream don't care about connection state changes.
{{continue, handle_appl_msg}, S}.
handle_info(Deliver = {deliver, _, _}, S) ->
Delivers = [Deliver],
with_channel(handle_deliver, [Delivers], S);
handle_info({timeout, Ref, Msg}, S) ->
with_channel(handle_timeout, [Ref, Msg], S);
handle_info(Info, State) ->
with_channel(handle_info, [Info], State).
with_channel(Fun, Args, #{channel := Channel, task_queue := Q} = S) when
Channel =/= undefined
->
case apply(emqx_channel, Fun, Args ++ [Channel]) of
ok ->
{{continue, handle_appl_msg}, S};
{ok, Msgs, NewChannel} when is_list(Msgs) ->
{{continue, handle_appl_msg}, S#{
task_queue := queue:join(Q, queue:from_list(Msgs)),
channel := NewChannel
}};
{ok, Msg, NewChannel} when is_record(Msg, mqtt_packet) ->
{{continue, handle_appl_msg}, S#{
task_queue := queue:in({outgoing, Msg}, Q), channel := NewChannel
}};
%% @FIXME WTH?
{ok, {outgoing, _} = Msg, NewChannel} ->
{{continue, handle_appl_msg}, S#{task_queue := queue:in(Msg, Q), channel := NewChannel}};
{ok, NewChannel} ->
{{continue, handle_appl_msg}, S#{channel := NewChannel}};
%% @TODO optimisation for shutdown wrap
{shutdown, Reason, NewChannel} ->
{stop, {shutdown, Reason}, S#{channel := NewChannel}};
{shutdown, Reason, Msgs, NewChannel} when is_list(Msgs) ->
%% @TODO handle outgoing?
{stop, {shutdown, Reason}, S#{
channel := NewChannel,
task_queue := queue:join(Q, queue:from_list(Msgs))
}};
{shutdown, Reason, Msg, NewChannel} ->
{stop, {shutdown, Reason}, S#{
channel := NewChannel,
task_queue := queue:in(Msg, Q)
}}
end.
handle_outgoing(#mqtt_packet{} = P, S) ->
handle_outgoing([P], S);
handle_outgoing(Packets, #{serialize := Serialize, stream := Stream, is_unidir := false}) when
is_list(Packets)
->
OutBin = [serialize_packet(P, Serialize) || P <- filter_disallowed_out(Packets)],
%% Send data async but still want send feedback via {quic, send_complete, ...}
Res = quicer:async_send(Stream, OutBin, ?QUICER_SEND_FLAG_SYNC),
?TRACE("MQTT", "mqtt_packet_sent", #{packets => Packets}),
[ok = inc_outgoing_stats(P) || P <- Packets],
Res.
serialize_packet(Packet, Serialize) ->
try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> ->
?SLOG(warning, #{
msg => "packet_is_discarded",
reason => "frame_is_too_large",
packet => emqx_packet:format(Packet, hidden)
}),
ok = emqx_metrics:inc('delivery.dropped.too_large'),
ok = emqx_metrics:inc('delivery.dropped'),
ok = inc_outgoing_stats({error, message_too_large}),
<<>>;
Data ->
Data
catch
%% Maybe Never happen.
throw:{?FRAME_SERIALIZE_ERROR, Reason} ->
?SLOG(info, #{
reason => Reason,
input_packet => Packet
}),
erlang:error({?FRAME_SERIALIZE_ERROR, Reason});
error:Reason:Stacktrace ->
?SLOG(error, #{
input_packet => Packet,
exception => Reason,
stacktrace => Stacktrace
}),
erlang:error(?FRAME_SERIALIZE_ERROR)
end.
-spec init_state(
quicer:stream_handle(),
quicer:connection_handle(),
quicer:new_stream_props()
) ->
% @TODO
map().
init_state(Stream, Connection, OpenFlags) ->
init_state(Stream, Connection, OpenFlags, undefined).
init_state(Stream, Connection, OpenFlags, PS) ->
%% quic stream handle
#{
stream => Stream,
%% quic connection handle
conn => Connection,
%% if it is QUIC unidi stream
is_unidir => quicer:is_unidirectional(OpenFlags),
%% Frame Parse State
parse_state => PS,
%% Peer Stream handle in a pair for type unidir only
peer_stream => undefined,
%% if the stream is locally initiated.
is_local => false,
%% queue binary data when is NOT connected, in reversed order.
data_queue => [],
%% Channel from connection
%% `undefined' means the connection is not connected.
channel => undefined,
%% serialize opts for connection
serialize => undefined,
%% Current working queue
task_queue => queue:new()
}.
-spec do_handle_call(term(), cb_state()) -> cb_ret().
do_handle_call(
{activate, {PS, Serialize, Channel}},
#{
channel := undefined,
stream := Stream,
serialize := undefined
} = S
) ->
NewS = S#{channel := Channel, serialize := Serialize, parse_state := PS},
%% We use quic protocol for flow control, and we don't check return val
case quicer:setopt(Stream, active, true) of
ok ->
{reply, ok, NewS};
{error, E} ->
?SLOG(error, #{msg => "set stream active failed", error => E}),
{stop, E, NewS}
end;
do_handle_call(_Call, _S) ->
{error, unimpl}.
%% @doc return reserved order of Packets
parse_incoming(Data, PS) ->
try
do_parse_incoming(Data, [], PS)
catch
throw:{?FRAME_PARSE_ERROR, Reason} ->
?SLOG(info, #{
reason => Reason,
input_bytes => Data
}),
{[{frame_error, Reason}], PS};
error:Reason:Stacktrace ->
?SLOG(error, #{
input_bytes => Data,
reason => Reason,
stacktrace => Stacktrace
}),
{[{frame_error, Reason}], PS}
end.
do_parse_incoming(<<>>, Packets, ParseState) ->
{Packets, ParseState};
do_parse_incoming(Data, Packets, ParseState) ->
case emqx_frame:parse(Data, ParseState) of
{more, NParseState} ->
{Packets, NParseState};
{ok, Packet, Rest, NParseState} ->
do_parse_incoming(Rest, [Packet | Packets], NParseState)
end.
%% followings are copied from emqx_connection
-compile({inline, [inc_incoming_stats/1]}).
inc_incoming_stats(Packet = ?PACKET(Type)) ->
inc_counter(recv_pkt, 1),
case Type =:= ?PUBLISH of
true ->
inc_counter(recv_msg, 1),
inc_qos_stats(recv_msg, Packet),
inc_counter(incoming_pubs, 1);
false ->
ok
end,
emqx_metrics:inc_recv(Packet).
-compile({inline, [inc_outgoing_stats/1]}).
inc_outgoing_stats({error, message_too_large}) ->
inc_counter('send_msg.dropped', 1),
inc_counter('send_msg.dropped.too_large', 1);
inc_outgoing_stats(Packet = ?PACKET(Type)) ->
inc_counter(send_pkt, 1),
case Type of
?PUBLISH ->
inc_counter(send_msg, 1),
inc_counter(outgoing_pubs, 1),
inc_qos_stats(send_msg, Packet);
_ ->
ok
end,
emqx_metrics:inc_sent(Packet).
inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
inc_qos_stats(Type, Packet) ->
case inc_qos_stats_key(Type, emqx_packet:qos(Packet)) of
undefined ->
ignore;
Key ->
inc_counter(Key, 1)
end.
inc_qos_stats_key(send_msg, ?QOS_0) -> 'send_msg.qos0';
inc_qos_stats_key(send_msg, ?QOS_1) -> 'send_msg.qos1';
inc_qos_stats_key(send_msg, ?QOS_2) -> 'send_msg.qos2';
inc_qos_stats_key(recv_msg, ?QOS_0) -> 'recv_msg.qos0';
inc_qos_stats_key(recv_msg, ?QOS_1) -> 'recv_msg.qos1';
inc_qos_stats_key(recv_msg, ?QOS_2) -> 'recv_msg.qos2';
%% for bad qos
inc_qos_stats_key(_, _) -> undefined.
filter_disallowed_out(Packets) ->
lists:filter(fun is_datastream_out_pkt/1, Packets).
is_datastream_out_pkt(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) when
Type > 2 andalso Type < 12
->
true;
is_datastream_out_pkt(_) ->
false.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

View File

@ -14,9 +14,18 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT/QUIC Stream %% MQTT over QUIC
%% multistreams: This is the control stream.
%% single stream: This is the only main stream.
%% callbacks are from emqx_connection process rather than quicer_stream
-module(emqx_quic_stream). -module(emqx_quic_stream).
-ifndef(BUILD_WITHOUT_QUIC).
-behaviour(quicer_remote_stream).
-include("logger.hrl").
%% emqx transport Callbacks %% emqx transport Callbacks
-export([ -export([
type/1, type/1,
@ -31,44 +40,84 @@
sockname/1, sockname/1,
peercert/1 peercert/1
]). ]).
-include_lib("quicer/include/quicer.hrl").
-include_lib("emqx/include/emqx_quic.hrl").
wait({ConnOwner, Conn}) -> -type cb_ret() :: quicer_stream:cb_ret().
-type cb_data() :: quicer_stream:cb_state().
-type connection_handle() :: quicer:connection_handle().
-type stream_handle() :: quicer:stream_handle().
-export([
send_complete/3,
peer_send_shutdown/3,
peer_send_aborted/3,
peer_receive_aborted/3,
send_shutdown_complete/3,
stream_closed/3,
passive/3
]).
-export_type([socket/0]).
-opaque socket() :: {quic, connection_handle(), stream_handle(), socket_info()}.
-type socket_info() :: #{
is_orphan => boolean(),
ctrl_stream_start_flags => quicer:stream_open_flags(),
%% and quicer:new_conn_props()
_ => _
}.
%%% For Accepting New Remote Stream
-spec wait({pid(), connection_handle(), socket_info()}) ->
{ok, socket()} | {error, enotconn}.
wait({ConnOwner, Conn, ConnInfo}) ->
{ok, Conn} = quicer:async_accept_stream(Conn, []), {ok, Conn} = quicer:async_accept_stream(Conn, []),
ConnOwner ! {self(), stream_acceptor_ready}, ConnOwner ! {self(), stream_acceptor_ready},
receive receive
%% from msquic %% New incoming stream, this is a *control* stream
{quic, new_stream, Stream} -> {quic, new_stream, Stream, #{is_orphan := IsOrphan, flags := StartFlags}} ->
{ok, {quic, Conn, Stream}}; SocketInfo = ConnInfo#{
is_orphan => IsOrphan,
ctrl_stream_start_flags => StartFlags
},
{ok, socket(Conn, Stream, SocketInfo)};
%% connection closed event for stream acceptor
{quic, closed, undefined, undefined} ->
{error, enotconn};
%% Connection owner process down
{'EXIT', ConnOwner, _Reason} -> {'EXIT', ConnOwner, _Reason} ->
{error, enotconn} {error, enotconn}
end. end.
-spec type(_) -> quic.
type(_) -> type(_) ->
quic. quic.
peername({quic, Conn, _Stream}) -> peername({quic, Conn, _Stream, _Info}) ->
quicer:peername(Conn). quicer:peername(Conn).
sockname({quic, Conn, _Stream}) -> sockname({quic, Conn, _Stream, _Info}) ->
quicer:sockname(Conn). quicer:sockname(Conn).
peercert(_S) -> peercert(_S) ->
%% @todo but unsupported by msquic %% @todo but unsupported by msquic
nossl. nossl.
getstat({quic, Conn, _Stream}, Stats) -> getstat({quic, Conn, _Stream, _Info}, Stats) ->
case quicer:getstat(Conn, Stats) of case quicer:getstat(Conn, Stats) of
{error, _} -> {error, closed}; {error, _} -> {error, closed};
Res -> Res Res -> Res
end. end.
setopts(Socket, Opts) -> setopts({quic, _Conn, Stream, _Info}, Opts) ->
lists:foreach( lists:foreach(
fun fun
({Opt, V}) when is_atom(Opt) -> ({Opt, V}) when is_atom(Opt) ->
quicer:setopt(Socket, Opt, V); quicer:setopt(Stream, Opt, V);
(Opt) when is_atom(Opt) -> (Opt) when is_atom(Opt) ->
quicer:setopt(Socket, Opt, true) quicer:setopt(Stream, Opt, true)
end, end,
Opts Opts
), ),
@ -84,9 +133,18 @@ getopts(_Socket, _Opts) ->
{buffer, 80000} {buffer, 80000}
]}. ]}.
fast_close({quic, _Conn, Stream}) -> %% @TODO supply some App Error Code from caller
%% Flush send buffer, gracefully shutdown fast_close({ConnOwner, Conn, _ConnInfo}) when is_pid(ConnOwner) ->
quicer:async_shutdown_stream(Stream), %% handshake aborted.
_ = quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
ok;
fast_close({quic, _Conn, Stream, _Info}) ->
%% Force flush
_ = quicer:async_shutdown_stream(Stream),
%% @FIXME Since we shutdown the control stream, we shutdown the connection as well
%% *BUT* Msquic does not flush the send buffer if we shutdown the connection after
%% gracefully shutdown the stream.
% quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0),
ok. ok.
-spec ensure_ok_or_exit(atom(), list(term())) -> term(). -spec ensure_ok_or_exit(atom(), list(term())) -> term().
@ -102,8 +160,92 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
Result Result
end. end.
async_send({quic, _Conn, Stream}, Data, _Options) -> async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
case quicer:send(Stream, Data) of case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of
{ok, _Len} -> ok; {ok, _Len} -> ok;
{error, X, Y} -> {error, {X, Y}};
Other -> Other Other -> Other
end. end.
%%%
%%% quicer stream callbacks
%%%
-spec peer_receive_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_receive_aborted(Stream, ErrorCode, S) ->
_ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}.
-spec peer_send_aborted(stream_handle(), non_neg_integer(), cb_data()) -> cb_ret().
peer_send_aborted(Stream, ErrorCode, S) ->
%% we abort receive with same reason
_ = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT, ErrorCode),
{ok, S}.
-spec peer_send_shutdown(stream_handle(), undefined, cb_data()) -> cb_ret().
peer_send_shutdown(Stream, undefined, S) ->
ok = quicer:async_shutdown_stream(Stream, ?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0),
{ok, S}.
-spec send_complete(stream_handle(), boolean(), cb_data()) -> cb_ret().
send_complete(_Stream, false, S) ->
{ok, S};
send_complete(_Stream, true = _IsCancelled, S) ->
?SLOG(error, #{message => "send cancelled"}),
{ok, S}.
-spec send_shutdown_complete(stream_handle(), boolean(), cb_data()) -> cb_ret().
send_shutdown_complete(_Stream, _IsGraceful, S) ->
{ok, S}.
-spec passive(stream_handle(), undefined, cb_data()) -> cb_ret().
passive(Stream, undefined, S) ->
case quicer:setopt(Stream, active, 10) of
ok -> ok;
Error -> ?SLOG(error, #{message => "set active error", error => Error})
end,
{ok, S}.
-spec stream_closed(stream_handle(), quicer:stream_closed_props(), cb_data()) ->
{{continue, term()}, cb_data()}.
stream_closed(
_Stream,
#{
is_conn_shutdown := IsConnShutdown,
is_app_closing := IsAppClosing,
is_shutdown_by_app := IsAppShutdown,
is_closed_remotely := IsRemote,
status := Status,
error := Code
},
S
) when
is_boolean(IsConnShutdown) andalso
is_boolean(IsAppClosing) andalso
is_boolean(IsAppShutdown) andalso
is_boolean(IsRemote) andalso
is_atom(Status) andalso
is_integer(Code)
->
%% For now we fake a sock_closed for
%% emqx_connection:process_msg to append
%% a msg to be processed
Reason =
case Code of
?MQTT_QUIC_CONN_NOERROR ->
normal;
_ ->
Status
end,
{{continue, {sock_closed, Reason}}, S}.
%%%
%%% Internals
%%%
-spec socket(connection_handle(), stream_handle(), socket_info()) -> socket().
socket(Conn, CtrlStream, Info) when is_map(Info) ->
{quic, Conn, CtrlStream, Info}.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

View File

@ -845,16 +845,21 @@ fields("mqtt_wss_listener") ->
]; ];
fields("mqtt_quic_listener") -> fields("mqtt_quic_listener") ->
[ [
%% TODO: ensure cacertfile is configurable
{"certfile", {"certfile",
sc( sc(
string(), string(),
#{desc => ?DESC(fields_mqtt_quic_listener_certfile)} #{
%% TODO: deprecated => {since, "5.1.0"}
desc => ?DESC(fields_mqtt_quic_listener_certfile)
}
)}, )},
{"keyfile", {"keyfile",
sc( sc(
string(), string(),
#{desc => ?DESC(fields_mqtt_quic_listener_keyfile)} %% TODO: deprecated => {since, "5.1.0"}
#{
desc => ?DESC(fields_mqtt_quic_listener_keyfile)
}
)}, )},
{"ciphers", ciphers_schema(quic)}, {"ciphers", ciphers_schema(quic)},
{"idle_timeout", {"idle_timeout",
@ -880,6 +885,14 @@ fields("mqtt_quic_listener") ->
default => 0, default => 0,
desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval) desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval)
} }
)},
{"ssl_options",
sc(
ref("listener_quic_ssl_opts"),
#{
required => false,
desc => ?DESC(fields_mqtt_quic_listener_ssl_options)
}
)} )}
] ++ base_listener(14567); ] ++ base_listener(14567);
fields("ws_opts") -> fields("ws_opts") ->
@ -1090,6 +1103,8 @@ fields("listener_wss_opts") ->
}, },
true true
); );
fields("listener_quic_ssl_opts") ->
server_ssl_opts_schema(#{}, false);
fields("ssl_client_opts") -> fields("ssl_client_opts") ->
client_ssl_opts_schema(#{}); client_ssl_opts_schema(#{});
fields("deflate_opts") -> fields("deflate_opts") ->
@ -1769,6 +1784,12 @@ desc("listener_ssl_opts") ->
"Socket options for SSL connections."; "Socket options for SSL connections.";
desc("listener_wss_opts") -> desc("listener_wss_opts") ->
"Socket options for WebSocket/SSL connections."; "Socket options for WebSocket/SSL connections.";
desc("fields_mqtt_quic_listener_certfile") ->
"Path to the certificate file. Will be deprecated in 5.1, use '.ssl_options.certfile' instead.";
desc("fields_mqtt_quic_listener_keyfile") ->
"Path to the secret key file. Will be deprecated in 5.1, use '.ssl_options.keyfile' instead.";
desc("listener_quic_ssl_opts") ->
"TLS options for QUIC transport.";
desc("ssl_client_opts") -> desc("ssl_client_opts") ->
"Socket options for SSL clients."; "Socket options for SSL clients.";
desc("deflate_opts") -> desc("deflate_opts") ->

View File

@ -499,8 +499,8 @@ ensure_quic_listener(Name, UdpPort) ->
application:ensure_all_started(quicer), application:ensure_all_started(quicer),
Conf = #{ Conf = #{
acceptors => 16, acceptors => 16,
bind => {{0, 0, 0, 0}, UdpPort}, bind => UdpPort,
certfile => filename:join(code:lib_dir(emqx), "etc/certs/cert.pem"),
ciphers => ciphers =>
[ [
"TLS_AES_256_GCM_SHA384", "TLS_AES_256_GCM_SHA384",
@ -509,7 +509,10 @@ ensure_quic_listener(Name, UdpPort) ->
], ],
enabled => true, enabled => true,
idle_timeout => 15000, idle_timeout => 15000,
keyfile => filename:join(code:lib_dir(emqx), "etc/certs/key.pem"), ssl_options => #{
certfile => filename:join(code:lib_dir(emqx), "etc/certs/cert.pem"),
keyfile => filename:join(code:lib_dir(emqx), "etc/certs/key.pem")
},
limiter => #{}, limiter => #{},
max_connections => 1024000, max_connections => 1024000,
mountpoint => <<>>, mountpoint => <<>>,

View File

@ -905,7 +905,7 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) ->
emqtt, emqtt,
connected, connected,
fun fun
(cast, ?PUBLISH_PACKET(?QOS_2, _PacketId), _State) -> (cast, {?PUBLISH_PACKET(?QOS_2, _PacketId), _Via}, _State) ->
ok = counters:add(CRef, 1, 1), ok = counters:add(CRef, 1, 1),
{stop, {shutdown, for_testing}}; {stop, {shutdown, for_testing}};
(Arg1, ARg2, Arg3) -> (Arg1, ARg2, Arg3) ->

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,2 @@
QUIC transport Multistreams support and QUIC TLS cacert support.

View File

@ -0,0 +1 @@
QUIC 传输多流支持和 QUIC TLS cacert 支持。

View File

@ -60,7 +60,7 @@ defmodule EMQXUmbrella.MixProject do
{:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true},
{:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
{:emqtt, github: "emqx/emqtt", tag: "1.7.0", override: true}, {:emqtt, github: "emqx/emqtt", tag: "1.8.1", override: true},
{:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, {:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
{:observer_cli, "1.7.1"}, {:observer_cli, "1.7.1"},
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
@ -645,7 +645,7 @@ defmodule EMQXUmbrella.MixProject do
defp quicer_dep() do defp quicer_dep() do
if enable_quicer?(), if enable_quicer?(),
# in conflict with emqx and emqtt # in conflict with emqx and emqtt
do: [{:quicer, github: "emqx/quic", tag: "0.0.16", override: true}], do: [{:quicer, github: "emqx/quic", tag: "0.0.109", override: true}],
else: [] else: []
end end

View File

@ -62,7 +62,7 @@
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}}
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.1"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}

View File

@ -39,7 +39,7 @@ bcrypt() ->
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}. {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}.
quicer() -> quicer() ->
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.16"}}}. {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.109"}}}.
jq() -> jq() ->
{jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.9"}}}. {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.9"}}}.
@ -548,17 +548,20 @@ dialyzer(Config) ->
AppsToExclude = AppNames -- KnownApps, AppsToExclude = AppNames -- KnownApps,
case length(AppsToAnalyse) > 0 of Extra =
true -> [bcrypt || provide_bcrypt_dep()] ++
[jq || is_jq_supported()] ++
[quicer || is_quicer_supported()],
NewDialyzerConfig =
OldDialyzerConfig ++
[{exclude_apps, AppsToExclude} || length(AppsToAnalyse) > 0] ++
[{plt_extra_apps, Extra} || length(Extra) > 0],
lists:keystore( lists:keystore(
dialyzer, dialyzer,
1, 1,
Config, Config,
{dialyzer, OldDialyzerConfig ++ [{exclude_apps, AppsToExclude}]} {dialyzer, NewDialyzerConfig}
); ).
false ->
Config
end.
coveralls() -> coveralls() ->
case {os:getenv("GITHUB_ACTIONS"), os:getenv("GITHUB_TOKEN")} of case {os:getenv("GITHUB_ACTIONS"), os:getenv("GITHUB_TOKEN")} of

View File

@ -1,6 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -euo pipefail set -euo pipefail
exit 0
latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*' --exclude '*docker*') latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*' --exclude '*docker*')
echo "Compare base: $latest_release" echo "Compare base: $latest_release"

View File

@ -160,6 +160,7 @@ jenkins
jq jq
kb kb
keepalive keepalive
keyfile
libcoap libcoap
lifecycle lifecycle
localhost localhost