diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 011204b95..9834563a7 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -43,7 +43,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}} + {emqtt, {git, "https://github.com/qzhuyan/emqtt", {branch, "dev/william/bump-quicer"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]} diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index afcaff941..888e911f3 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -1,16 +1,16 @@ %% -*- mode: erlang -*- IsCentos6 = fun() -> - case file:read_file("/etc/centos-release") of - {ok, <<"CentOS release 6", _/binary >>} -> - true; - _ -> - false - end - end, + case file:read_file("/etc/centos-release") of + {ok, <<"CentOS release 6", _/binary>>} -> + true; + _ -> + false + end +end, IsWin32 = fun() -> - win32 =:= element(1, os:type()) - end, + win32 =:= element(1, os:type()) +end, IsMacOS = fun() -> {unix, darwin} =:= os:type() @@ -25,13 +25,14 @@ IsQuicSupp = fun() -> end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}, +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.11"}}}. ExtraDeps = fun(C) -> - {deps, Deps0} = lists:keyfind(deps, 1, C), - Deps = Deps0 ++ [Bcrypt || not IsWin32()] ++ - [ Quicer || IsQuicSupp()], - lists:keystore(deps, 1, C, {deps, Deps}) - end, + {deps, Deps0} = lists:keyfind(deps, 1, C), + Deps = + Deps0 ++ [Bcrypt || not IsWin32()] ++ + [Quicer || IsQuicSupp()], + lists:keystore(deps, 1, C, {deps, Deps}) +end, ExtraDeps(CONFIG). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 087555b1c..9d46fe975 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -324,15 +324,18 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> case [A || {quicer, _, _} = A <- application:which_applications()] of [_] -> DefAcceptors = erlang:system_info(schedulers_online) * 8, + IdleTimeout = timer:seconds(maps:get(idle_timeout, Opts)), ListenOpts = [ {cert, maps:get(certfile, Opts)}, {key, maps:get(keyfile, Opts)}, {alpn, ["mqtt"]}, {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}, + {keep_alive_interval_ms, ceil(IdleTimeout / 3)}, + {server_resumption_level, 2}, {idle_timeout_ms, lists:max([ emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3, - timer:seconds(maps:get(idle_timeout, Opts)) + IdleTimeout ])} ], ConnectionOpts = #{ diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index f5c281242..b44a4f1c2 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_quic_connection). +-include_lib("quicer/include/quicer.hrl"). %% Callbacks -export([ @@ -59,5 +60,5 @@ connected(_Conn, S) -> -spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. shutdown(Conn, S) -> - quicer:async_close_connection(Conn), + quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index e98ea0137..bed5805e2 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -16,6 +16,7 @@ %% MQTT/QUIC Stream -module(emqx_quic_stream). +-include_lib("quicer/include/quicer.hrl"). %% emqx transport Callbacks -export([ @@ -38,7 +39,7 @@ wait({ConnOwner, Conn}) -> receive %% from msquic {quic, new_stream, Stream} -> - {ok, Stream}; + {ok, {quic, Conn, Stream}}; {'EXIT', ConnOwner, _Reason} -> {error, enotconn} end. @@ -46,18 +47,18 @@ wait({ConnOwner, Conn}) -> type(_) -> quic. -peername(S) -> - quicer:peername(S). +peername({quic, Conn, _Stream}) -> + quicer:peername(Conn). -sockname(S) -> - quicer:sockname(S). +sockname({quic, Conn, _Stream}) -> + quicer:sockname(Conn). peercert(_S) -> %% @todo but unsupported by msquic nossl. -getstat(Socket, Stats) -> - case quicer:getstat(Socket, Stats) of +getstat({quic, Conn, _Stream}, Stats) -> + case quicer:getstat(Conn, Stats) of {error, _} -> {error, closed}; Res -> Res end. @@ -84,9 +85,9 @@ getopts(_Socket, _Opts) -> {buffer, 80000} ]}. -fast_close(Stream) -> - %% Stream might be closed already. - _ = quicer:async_close_stream(Stream), +fast_close({quic, _Conn, Stream}) -> + %% Flush send buffer, gracefully shutdown + quicer:async_shutdown_stream(Stream), ok. -spec ensure_ok_or_exit(atom(), list(term())) -> term(). @@ -102,9 +103,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> Result end. -async_send(Stream, Data, Options) when is_list(Data) -> - async_send(Stream, iolist_to_binary(Data), Options); -async_send(Stream, Data, _Options) when is_binary(Data) -> +async_send({quic, _Conn, Stream}, Data, _Options) -> case quicer:send(Stream, Data) of {ok, _Len} -> ok; Other -> Other diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 5ef43e738..88ccda452 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -252,7 +252,8 @@ t_connect_will_message(Config) -> {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2), ok = emqtt:disconnect(Client3), %% [MQTT-3.1.2-10] - ?assertEqual(0, length(receive_messages(1))), + MsgRecv = receive_messages(1), + ?assertEqual([], MsgRecv), ok = emqtt:disconnect(Client4). t_batch_subscribe(init, Config) -> diff --git a/mix.exs b/mix.exs index 0c3a4ff37..8125ee9f1 100644 --- a/mix.exs +++ b/mix.exs @@ -600,7 +600,7 @@ defmodule EMQXUmbrella.MixProject do defp quicer_dep() do if enable_quicer?(), # in conflict with emqx and emqtt - do: [{:quicer, github: "emqx/quic", tag: "0.0.9", override: true}], + do: [{:quicer, github: "emqx/quic", tag: "0.0.11", override: true}], else: [] end diff --git a/rebar.config b/rebar.config index 744feb8de..56c427678 100644 --- a/rebar.config +++ b/rebar.config @@ -60,7 +60,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.4"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.1"}}} + , {emqtt, {git, "https://github.com/qzhuyan/emqtt", {branch, "dev/william/bump-quicer"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 0700a0bad..1d04d773f 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -38,7 +38,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.11"}}}. jq() -> {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.4"}}}.