From 2534b8dc64c851453616b1a956ebbf247a3b23dd Mon Sep 17 00:00:00 2001 From: Gilbert Date: Wed, 27 Mar 2019 10:19:35 +0800 Subject: [PATCH] Support to pass ws compressing options (#2356) Add new config entries about websocket --- Makefile | 2 +- etc/emqx.conf | 161 ++++++++++++++++++++++++++++++++++--- priv/emqx.schema | 141 +++++++++++++++++++++++++++++--- rebar.config | 3 +- src/emqx_ws_connection.erl | 16 +++- 5 files changed, 294 insertions(+), 29 deletions(-) diff --git a/Makefile b/Makefile index e7bab43ba..5e5d456c9 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0 dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1 dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4 dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3 -dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0 +dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1 dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 NO_AUTOPATCH = cuttlefish diff --git a/etc/emqx.conf b/etc/emqx.conf index cb2d0bb00..35861d77e 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1261,7 +1261,7 @@ listener.ws.external.zone = external ## The access control for the MQTT/WebSocket listener. ## -## See: listener.tcp.$name.access +## See: listener.ws.$name.access ## ## Value: ACL Rule listener.ws.external.access.1 = allow all @@ -1286,74 +1286,143 @@ listener.ws.external.verify_protocol_header = on ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## -## See: listener.tcp.$name.proxy_protocol +## See: listener.ws.$name.proxy_protocol ## ## Value: on | off ## listener.ws.external.proxy_protocol = on ## Sets the timeout for proxy protocol. ## -## See: listener.tcp.$name.proxy_protocol_timeout +## See: listener.ws.$name.proxy_protocol_timeout ## ## Value: Duration ## listener.ws.external.proxy_protocol_timeout = 3s ## The TCP backlog of external MQTT/WebSocket Listener. ## -## See: listener.tcp.$name.backlog +## See: listener.ws.$name.backlog ## ## Value: Number >= 0 listener.ws.external.backlog = 1024 ## The TCP send timeout for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.send_timeout +## See: listener.ws.$name.send_timeout ## ## Value: Duration listener.ws.external.send_timeout = 15s ## Close the MQTT/WebSocket connection if send timeout. ## -## See: listener.tcp.$name.send_timeout_close +## See: listener.ws.$name.send_timeout_close ## ## Value: on | off listener.ws.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.recbuf +## See: listener.ws.$name.recbuf ## ## Value: Bytes ## listener.ws.external.recbuf = 2KB ## The TCP send buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.sndbuf +## See: listener.ws.$name.sndbuf ## ## Value: Bytes ## listener.ws.external.sndbuf = 2KB ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp.$name.buffer +## See: listener.ws.$name.buffer ## ## Value: Bytes ## listener.ws.external.buffer = 2KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## -## See: listener.tcp.$name.tune_buffer +## See: listener.ws.$name.tune_buffer ## ## Value: on | off ## listener.ws.external.tune_buffer = off ## The TCP_NODELAY flag for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.nodelay +## See: listener.ws.$name.nodelay ## ## Value: true | false listener.ws.external.nodelay = true +## The compress flag for external MQTT/WebSocket connections. +## +## If this Value is set true,the websocket message would be compressed +## +## Value: true | false +## listener.ws.external.compress = true + +## The level of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.level +## +## Value: none | default | best_compression | best_speed +## listener.ws.external.deflate_opts.level = default + +## The mem_level of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.mem_level +## +## Valid range is 1-9 +## listener.ws.external.deflate_opts.mem_level = 8 + +## The strategy of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.strategy +## +## Value: default | filtered | huffman_only | rle +## listener.ws.external.deflate_opts.strategy = default + +## The deflate option for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.server_context_takeover +## +## Value: takeover | no_takeover +## listener.ws.external.deflate_opts.server_context_takeover = takeover + +## The deflate option for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.client_context_takeover +## +## Value: takeover | no_takeover +## listener.ws.external.deflate_opts.client_context_takeover = takeover + +## The deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.server_max_window_bits +## +## Valid range is 8-15 +## listener.ws.external.deflate_opts.server_max_window_bits = 15 + +## The deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.client_max_window_bits +## +## Valid range is 8-15 +## listener.ws.external.deflate_opts.client_max_window_bits = 15 + +## The idle timeout for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.idle_timeout +## +## Value: Duration +## listener.ws.external.idle_timeout = 60s + +## The max frame size for external MQTT/WebSocket connections. +## +## +## Value: Number +## listener.ws.external.max_frame_size = 0 + ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol @@ -1486,7 +1555,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot ## be configured at the same time. ## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA +## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## See: listener.ssl.$name.secure_renegotiate ## @@ -1557,6 +1626,74 @@ listener.wss.external.send_timeout_close = on ## Value: true | false ## listener.wss.external.nodelay = true +## The compress flag for external WebSocket/SSL connections. +## +## If this Value is set true,the websocket message would be compressed +## +## Value: true | false +## listener.wss.external.compress = true + +## The level of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.level +## +## Value: none | default | best_compression | best_speed +## listener.wss.external.deflate_opts.level = default + +## The mem_level of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.mem_level +## +## Valid range is 1-9 +## listener.wss.external.deflate_opts.mem_level = 8 + +## The strategy of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.strategy +## +## Value: default | filtered | huffman_only | rle +## listener.wss.external.deflate_opts.strategy = default + +## The deflate option for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.server_context_takeover +## +## Value: takeover | no_takeover +## listener.wss.external.deflate_opts.server_context_takeover = takeover + +## The deflate option for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.client_context_takeover +## +## Value: takeover | no_takeover +## listener.wss.external.deflate_opts.client_context_takeover = takeover + +## The deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.server_max_window_bits +## +## Valid range is 8-15 +## listener.wss.external.deflate_opts.server_max_window_bits = 15 + +## The deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.client_max_window_bits +## +## Valid range is 8-15 +## listener.wss.external.deflate_opts.client_max_window_bits = 15 + +## The idle timeout for external WebSocket/SSL connections. +## +## See: listener.wss.$name.idle_timeout +## +## Value: Duration +## listener.wss.external.idle_timeout = 60s + +## The max frame size for external WebSocket/SSL connections. +## +## Value: Number +## listener.wss.external.max_frame_size = 0 + ##-------------------------------------------------------------------- ## Bridges ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 25589f138..8a97984f6 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1238,6 +1238,57 @@ end}. hidden ]}. +{mapping, "listener.ws.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + %%-------------------------------------------------------------------- %% MQTT/WebSocket/SSL Listeners @@ -1393,6 +1444,61 @@ end}. {datatype, {enum, [cn, dn, crt]}} ]}. +{mapping, "listener.wss.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + + + {translation, "emqx.listeners", fun(Conf) -> Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, @@ -1431,19 +1537,30 @@ end}. {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, + {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, + {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, + {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) end, + DeflateOpts = fun(Prefix) -> + Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, + {mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)}, + {strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)}, + {server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)}, + {client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)}, + {server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)}, + {client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}]) + end, TcpOpts = fun(Prefix) -> - Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, - {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, - {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, - {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, - {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, - {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, - {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, - {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, - SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, MapPSKCiphers = fun(PSKCiphers) -> lists:map( @@ -1496,7 +1613,8 @@ end}. case cuttlefish:conf_get(Prefix, Conf, undefined) of undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, + {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] end end, @@ -1506,7 +1624,8 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}, + [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, + {tcp_options, TcpOpts(Prefix)}, {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] end end, diff --git a/rebar.config b/rebar.config index 2e16b86b9..005331120 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {cowboy, "2.4.0"}, + {cowboy, "2.6.1"}, {meck, "0.8.13"} %% temp workaround for version check ]}. @@ -28,4 +28,3 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. - diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 6d47e16ad..74a853c46 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -113,12 +113,23 @@ call(WSPid, Req) when is_pid(WSPid) -> %%------------------------------------------------------------------------------ init(Req, Opts) -> + IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000), + DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])), + MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of + 0 -> infinity; + MFS -> MFS + end, + Compress = proplists:get_value(compress, Opts, false), + Options = #{compress => Compress, + deflate_opts => DeflateOptions, + max_frame_size => MaxFrameSize, + idle_timeout => IdleTimeout}, case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> - {cowboy_websocket, Req, #state{}}; + {cowboy_websocket, Req, #state{}, Options}; [<<"mqtt", Vsn/binary>>] -> Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req), - {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}}; + {cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options}; _ -> {ok, cowboy_req:reply(400, Req), #state{}} end. @@ -308,4 +319,3 @@ shutdown(Reason, State) -> wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. -