diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index bd3ae4af0..43dcfd411 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -31,7 +31,7 @@ listeners.wss.default { } # listeners.quic.default { -# enabled = false +# enabled = true # bind = "0.0.0.0:14567" # max_connections = 1024000 # keyfile = "{{ platform_etc_dir }}/certs/key.pem" diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 79fda9cc5..f067aed61 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -43,7 +43,7 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.10.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.0"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}} ]}, {extra_src_dirs, [{"test", [recursive]}]} ]} diff --git a/apps/emqx/rebar.config.script b/apps/emqx/rebar.config.script index afcaff941..136e79190 100644 --- a/apps/emqx/rebar.config.script +++ b/apps/emqx/rebar.config.script @@ -1,37 +1,46 @@ %% -*- mode: erlang -*- IsCentos6 = fun() -> - case file:read_file("/etc/centos-release") of - {ok, <<"CentOS release 6", _/binary >>} -> - true; - _ -> - false - end - end, + case file:read_file("/etc/centos-release") of + {ok, <<"CentOS release 6", _/binary>>} -> + true; + _ -> + false + end +end, IsWin32 = fun() -> - win32 =:= element(1, os:type()) - end, + win32 =:= element(1, os:type()) +end, IsMacOS = fun() -> - {unix, darwin} =:= os:type() - end, + {unix, darwin} =:= os:type() +end, IsQuicSupp = fun() -> - not (IsCentos6() orelse IsWin32() - orelse IsMacOS() orelse - false =/= os:getenv("BUILD_WITHOUT_QUIC") - ) - orelse "1" == os:getenv("BUILD_WITH_QUIC") - end, + not (IsCentos6() orelse IsWin32() orelse + IsMacOS() orelse + false =/= os:getenv("BUILD_WITHOUT_QUIC")) orelse + "1" == os:getenv("BUILD_WITH_QUIC") +end, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, -Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}, +Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.11"}}}. ExtraDeps = fun(C) -> - {deps, Deps0} = lists:keyfind(deps, 1, C), - Deps = Deps0 ++ [Bcrypt || not IsWin32()] ++ - [ Quicer || IsQuicSupp()], - lists:keystore(deps, 1, C, {deps, Deps}) - end, + {deps, Deps0} = lists:keyfind(deps, 1, C), + {erl_opts, ErlOpts0} = lists:keyfind(erl_opts, 1, C), + IsQuic = IsQuicSupp(), + New = [ + {deps, Deps0 ++ [Bcrypt || not IsWin32()] ++ [Quicer || IsQuic]}, + {erl_opts, ErlOpts0 ++ [{d, 'BUILD_WITHOUT_QUIC'} || not IsQuic]} + ], + lists:foldl( + fun({Key, _Val} = KV, Acc) -> + lists:keystore(Key, 1, Acc, KV) + end, + C, + New + ) +end, ExtraDeps(CONFIG). diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index ce09cf460..b6194eb31 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -113,7 +113,7 @@ is_quicer_app_present() -> end. is_quic_listener_configured() -> - emqx_listeners:has_enabled_listener_conf_by_type(quic). + maps:is_key(quic, emqx:get_config([listeners])). get_description() -> emqx_release:description(). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 062da5a72..61ed6c47d 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -143,9 +143,13 @@ is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss -> _:_ -> false end; -is_running(quic, _ListenerId, _Conf) -> - %% TODO: quic support - false. +is_running(quic, ListenerId, _Conf) -> + case quicer:listener(ListenerId) of + {ok, Pid} when is_pid(Pid) -> + true; + _ -> + false + end. current_conns(ID, ListenOn) -> {ok, #{type := Type, name := Name}} = parse_listener_id(ID), @@ -325,15 +329,18 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> case [A || {quicer, _, _} = A <- application:which_applications()] of [_] -> DefAcceptors = erlang:system_info(schedulers_online) * 8, + IdleTimeout = timer:seconds(maps:get(idle_timeout, Opts)), ListenOpts = [ {cert, maps:get(certfile, Opts)}, {key, maps:get(keyfile, Opts)}, {alpn, ["mqtt"]}, {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}, + {keep_alive_interval_ms, ceil(IdleTimeout / 3)}, + {server_resumption_level, 2}, {idle_timeout_ms, lists:max([ emqx_config:get_zone_conf(zone(Opts), [mqtt, idle_timeout]) * 3, - timer:seconds(maps:get(idle_timeout, Opts)) + IdleTimeout ])} ], ConnectionOpts = #{ diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index f5c281242..e9a392187 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -16,6 +16,12 @@ -module(emqx_quic_connection). +-ifndef(BUILD_WITHOUT_QUIC). +-include_lib("quicer/include/quicer.hrl"). +-else. +-define(QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0). +-endif. + %% Callbacks -export([ init/1, @@ -59,5 +65,5 @@ connected(_Conn, S) -> -spec shutdown(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}. shutdown(Conn, S) -> - quicer:async_close_connection(Conn), + quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0), {ok, S}. diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index e98ea0137..4f40902ab 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -38,7 +38,7 @@ wait({ConnOwner, Conn}) -> receive %% from msquic {quic, new_stream, Stream} -> - {ok, Stream}; + {ok, {quic, Conn, Stream}}; {'EXIT', ConnOwner, _Reason} -> {error, enotconn} end. @@ -46,18 +46,18 @@ wait({ConnOwner, Conn}) -> type(_) -> quic. -peername(S) -> - quicer:peername(S). +peername({quic, Conn, _Stream}) -> + quicer:peername(Conn). -sockname(S) -> - quicer:sockname(S). +sockname({quic, Conn, _Stream}) -> + quicer:sockname(Conn). peercert(_S) -> %% @todo but unsupported by msquic nossl. -getstat(Socket, Stats) -> - case quicer:getstat(Socket, Stats) of +getstat({quic, Conn, _Stream}, Stats) -> + case quicer:getstat(Conn, Stats) of {error, _} -> {error, closed}; Res -> Res end. @@ -84,9 +84,9 @@ getopts(_Socket, _Opts) -> {buffer, 80000} ]}. -fast_close(Stream) -> - %% Stream might be closed already. - _ = quicer:async_close_stream(Stream), +fast_close({quic, _Conn, Stream}) -> + %% Flush send buffer, gracefully shutdown + quicer:async_shutdown_stream(Stream), ok. -spec ensure_ok_or_exit(atom(), list(term())) -> term(). @@ -102,9 +102,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> Result end. -async_send(Stream, Data, Options) when is_list(Data) -> - async_send(Stream, iolist_to_binary(Data), Options); -async_send(Stream, Data, _Options) when is_binary(Data) -> +async_send({quic, _Conn, Stream}, Data, _Options) -> case quicer:send(Stream, Data) of {ok, _Len} -> ok; Other -> Other diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 82455b3e0..f591d75bd 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -472,7 +472,6 @@ ensure_dashboard_listeners_started(_App) -> -spec ensure_quic_listener(Name :: atom(), UdpPort :: inet:port_number()) -> ok. ensure_quic_listener(Name, UdpPort) -> application:ensure_all_started(quicer), - emqx_config:put([listeners, quic, Name, mountpoint], <<>>), Conf = #{ acceptors => 16, bind => {{0, 0, 0, 0}, UdpPort}, @@ -491,6 +490,7 @@ ensure_quic_listener(Name, UdpPort) -> mountpoint => <<>>, zone => default }, + emqx_config:put([listeners, quic, Name], Conf), case emqx_listeners:start_listener(quic, Name, Conf) of ok -> ok; {error, {already_started, _Pid}} -> ok diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 5ef43e738..88ccda452 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -252,7 +252,8 @@ t_connect_will_message(Config) -> {ok, _, [2]} = emqtt:subscribe(Client4, Topic, qos2), ok = emqtt:disconnect(Client3), %% [MQTT-3.1.2-10] - ?assertEqual(0, length(receive_messages(1))), + MsgRecv = receive_messages(1), + ?assertEqual([], MsgRecv), ok = emqtt:disconnect(Client4). t_batch_subscribe(init, Config) -> diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index eb1a62773..0ac2d0da8 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -21,7 +21,7 @@ %% eredis_cluster's dependency getting resolved earlier. %% Here we pin 1.5.2 to avoid surprises in the future. {poolboy, {git, "https://github.com/emqx/poolboy.git", {tag, "1.5.2"}}}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.1"}}} + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}} ]}. {shell, [ diff --git a/mix.exs b/mix.exs index 29286973c..f29ad1a5b 100644 --- a/mix.exs +++ b/mix.exs @@ -58,7 +58,7 @@ defmodule EMQXUmbrella.MixProject do {:ecpool, github: "emqx/ecpool", tag: "0.5.2"}, {:replayq, "0.3.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, - {:emqtt, github: "emqx/emqtt", tag: "1.5.1", override: true}, + {:emqtt, github: "emqx/emqtt", tag: "1.6.0", override: true}, {:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, {:observer_cli, "1.7.1"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, @@ -591,7 +591,7 @@ defmodule EMQXUmbrella.MixProject do defp quicer_dep() do if enable_quicer?(), # in conflict with emqx and emqtt - do: [{:quicer, github: "emqx/quic", tag: "0.0.9", override: true}], + do: [{:quicer, github: "emqx/quic", tag: "0.0.11", override: true}], else: [] end diff --git a/rebar.config b/rebar.config index daa45abe7..6f1ec259b 100644 --- a/rebar.config +++ b/rebar.config @@ -60,7 +60,7 @@ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} , {replayq, "0.3.4"} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} - , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.5.1"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.6.0"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} diff --git a/rebar.config.erl b/rebar.config.erl index 0349924d7..a53533559 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -38,7 +38,7 @@ bcrypt() -> {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}. quicer() -> - {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.9"}}}. + {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.11"}}}. jq() -> {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.4"}}}. @@ -138,7 +138,8 @@ common_compile_opts(Edition, Vsn) -> {compile_info, [{emqx_vsn, Vsn}]}, {d, 'EMQX_RELEASE_EDITION', Edition} ] ++ - [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"]. + [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++ + [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()]. prod_compile_opts(Edition, Vsn) -> [