Fix conflict
This commit is contained in:
commit
6e3c4b0855
2
Makefile
2
Makefile
|
@ -10,7 +10,7 @@ dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0
|
|||
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1
|
||||
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
|
||||
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3
|
||||
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0
|
||||
dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1
|
||||
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
|
||||
|
||||
NO_AUTOPATCH = cuttlefish
|
||||
|
|
177
etc/emqx.conf
177
etc/emqx.conf
|
@ -114,6 +114,22 @@ cluster.autoclean = 5m
|
|||
## Default: 1m, 1 minute
|
||||
## cluster.etcd.node_ttl = 1m
|
||||
|
||||
## Path to a file containing the client's private PEM-encoded key.
|
||||
##
|
||||
## Value: File
|
||||
## cluster.etcd.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
|
||||
|
||||
## The path to a file containing the client's certificate.
|
||||
##
|
||||
## Value: File
|
||||
## cluster.etcd.ssl.certfile = {{ platform_etc_dir }}/certs/client.pem
|
||||
|
||||
## Path to the file containing PEM-encoded CA certificates. The CA certificates
|
||||
## are used during server authentication and when building the client certificate chain.
|
||||
##
|
||||
## Value: File
|
||||
## cluster.etcd.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Cluster using Kubernates
|
||||
|
||||
|
@ -1261,7 +1277,7 @@ listener.ws.external.zone = external
|
|||
|
||||
## The access control for the MQTT/WebSocket listener.
|
||||
##
|
||||
## See: listener.tcp.$name.access
|
||||
## See: listener.ws.$name.access
|
||||
##
|
||||
## Value: ACL Rule
|
||||
listener.ws.external.access.1 = allow all
|
||||
|
@ -1286,74 +1302,143 @@ listener.ws.external.verify_protocol_header = on
|
|||
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
|
||||
## HAProxy or Nginx.
|
||||
##
|
||||
## See: listener.tcp.$name.proxy_protocol
|
||||
## See: listener.ws.$name.proxy_protocol
|
||||
##
|
||||
## Value: on | off
|
||||
## listener.ws.external.proxy_protocol = on
|
||||
|
||||
## Sets the timeout for proxy protocol.
|
||||
##
|
||||
## See: listener.tcp.$name.proxy_protocol_timeout
|
||||
## See: listener.ws.$name.proxy_protocol_timeout
|
||||
##
|
||||
## Value: Duration
|
||||
## listener.ws.external.proxy_protocol_timeout = 3s
|
||||
|
||||
## The TCP backlog of external MQTT/WebSocket Listener.
|
||||
##
|
||||
## See: listener.tcp.$name.backlog
|
||||
## See: listener.ws.$name.backlog
|
||||
##
|
||||
## Value: Number >= 0
|
||||
listener.ws.external.backlog = 1024
|
||||
|
||||
## The TCP send timeout for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.tcp.$name.send_timeout
|
||||
## See: listener.ws.$name.send_timeout
|
||||
##
|
||||
## Value: Duration
|
||||
listener.ws.external.send_timeout = 15s
|
||||
|
||||
## Close the MQTT/WebSocket connection if send timeout.
|
||||
##
|
||||
## See: listener.tcp.$name.send_timeout_close
|
||||
## See: listener.ws.$name.send_timeout_close
|
||||
##
|
||||
## Value: on | off
|
||||
listener.ws.external.send_timeout_close = on
|
||||
|
||||
## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.tcp.$name.recbuf
|
||||
## See: listener.ws.$name.recbuf
|
||||
##
|
||||
## Value: Bytes
|
||||
## listener.ws.external.recbuf = 2KB
|
||||
|
||||
## The TCP send buffer(os kernel) for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.tcp.$name.sndbuf
|
||||
## See: listener.ws.$name.sndbuf
|
||||
##
|
||||
## Value: Bytes
|
||||
## listener.ws.external.sndbuf = 2KB
|
||||
|
||||
## The size of the user-level software buffer used by the driver.
|
||||
##
|
||||
## See: listener.tcp.$name.buffer
|
||||
## See: listener.ws.$name.buffer
|
||||
##
|
||||
## Value: Bytes
|
||||
## listener.ws.external.buffer = 2KB
|
||||
|
||||
## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
|
||||
##
|
||||
## See: listener.tcp.$name.tune_buffer
|
||||
## See: listener.ws.$name.tune_buffer
|
||||
##
|
||||
## Value: on | off
|
||||
## listener.ws.external.tune_buffer = off
|
||||
|
||||
## The TCP_NODELAY flag for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.tcp.$name.nodelay
|
||||
## See: listener.ws.$name.nodelay
|
||||
##
|
||||
## Value: true | false
|
||||
listener.ws.external.nodelay = true
|
||||
|
||||
## The compress flag for external MQTT/WebSocket connections.
|
||||
##
|
||||
## If this Value is set true,the websocket message would be compressed
|
||||
##
|
||||
## Value: true | false
|
||||
## listener.ws.external.compress = true
|
||||
|
||||
## The level of deflate options for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.level
|
||||
##
|
||||
## Value: none | default | best_compression | best_speed
|
||||
## listener.ws.external.deflate_opts.level = default
|
||||
|
||||
## The mem_level of deflate options for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.mem_level
|
||||
##
|
||||
## Valid range is 1-9
|
||||
## listener.ws.external.deflate_opts.mem_level = 8
|
||||
|
||||
## The strategy of deflate options for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.strategy
|
||||
##
|
||||
## Value: default | filtered | huffman_only | rle
|
||||
## listener.ws.external.deflate_opts.strategy = default
|
||||
|
||||
## The deflate option for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.server_context_takeover
|
||||
##
|
||||
## Value: takeover | no_takeover
|
||||
## listener.ws.external.deflate_opts.server_context_takeover = takeover
|
||||
|
||||
## The deflate option for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.client_context_takeover
|
||||
##
|
||||
## Value: takeover | no_takeover
|
||||
## listener.ws.external.deflate_opts.client_context_takeover = takeover
|
||||
|
||||
## The deflate options for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.server_max_window_bits
|
||||
##
|
||||
## Valid range is 8-15
|
||||
## listener.ws.external.deflate_opts.server_max_window_bits = 15
|
||||
|
||||
## The deflate options for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.deflate_opts.client_max_window_bits
|
||||
##
|
||||
## Valid range is 8-15
|
||||
## listener.ws.external.deflate_opts.client_max_window_bits = 15
|
||||
|
||||
## The idle timeout for external MQTT/WebSocket connections.
|
||||
##
|
||||
## See: listener.ws.$name.idle_timeout
|
||||
##
|
||||
## Value: Duration
|
||||
## listener.ws.external.idle_timeout = 60s
|
||||
|
||||
## The max frame size for external MQTT/WebSocket connections.
|
||||
##
|
||||
##
|
||||
## Value: Number
|
||||
## listener.ws.external.max_frame_size = 0
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## External WebSocket/SSL listener for MQTT Protocol
|
||||
|
||||
|
@ -1486,7 +1571,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
|
|||
## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot
|
||||
## be configured at the same time.
|
||||
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
|
||||
#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
|
||||
## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
|
||||
|
||||
## See: listener.ssl.$name.secure_renegotiate
|
||||
##
|
||||
|
@ -1557,6 +1642,74 @@ listener.wss.external.send_timeout_close = on
|
|||
## Value: true | false
|
||||
## listener.wss.external.nodelay = true
|
||||
|
||||
## The compress flag for external WebSocket/SSL connections.
|
||||
##
|
||||
## If this Value is set true,the websocket message would be compressed
|
||||
##
|
||||
## Value: true | false
|
||||
## listener.wss.external.compress = true
|
||||
|
||||
## The level of deflate options for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.level
|
||||
##
|
||||
## Value: none | default | best_compression | best_speed
|
||||
## listener.wss.external.deflate_opts.level = default
|
||||
|
||||
## The mem_level of deflate options for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.mem_level
|
||||
##
|
||||
## Valid range is 1-9
|
||||
## listener.wss.external.deflate_opts.mem_level = 8
|
||||
|
||||
## The strategy of deflate options for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.strategy
|
||||
##
|
||||
## Value: default | filtered | huffman_only | rle
|
||||
## listener.wss.external.deflate_opts.strategy = default
|
||||
|
||||
## The deflate option for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.server_context_takeover
|
||||
##
|
||||
## Value: takeover | no_takeover
|
||||
## listener.wss.external.deflate_opts.server_context_takeover = takeover
|
||||
|
||||
## The deflate option for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.client_context_takeover
|
||||
##
|
||||
## Value: takeover | no_takeover
|
||||
## listener.wss.external.deflate_opts.client_context_takeover = takeover
|
||||
|
||||
## The deflate options for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.server_max_window_bits
|
||||
##
|
||||
## Valid range is 8-15
|
||||
## listener.wss.external.deflate_opts.server_max_window_bits = 15
|
||||
|
||||
## The deflate options for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.deflate_opts.client_max_window_bits
|
||||
##
|
||||
## Valid range is 8-15
|
||||
## listener.wss.external.deflate_opts.client_max_window_bits = 15
|
||||
|
||||
## The idle timeout for external WebSocket/SSL connections.
|
||||
##
|
||||
## See: listener.wss.$name.idle_timeout
|
||||
##
|
||||
## Value: Duration
|
||||
## listener.wss.external.idle_timeout = 60s
|
||||
|
||||
## The max frame size for external WebSocket/SSL connections.
|
||||
##
|
||||
## Value: Number
|
||||
## listener.wss.external.max_frame_size = 0
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Bridges
|
||||
##--------------------------------------------------------------------
|
||||
|
|
164
priv/emqx.schema
164
priv/emqx.schema
|
@ -105,6 +105,18 @@
|
|||
{default, "1m"}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.etcd.ssl.keyfile", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.etcd.ssl.certfile", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "cluster.etcd.ssl.cacertfile", "ekka.cluster_discovery", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Cluster on K8s
|
||||
|
||||
|
@ -149,9 +161,16 @@
|
|||
[{name, cuttlefish:conf_get("cluster.dns.name", Conf)},
|
||||
{app, cuttlefish:conf_get("cluster.dns.app", Conf)}];
|
||||
(etcd) ->
|
||||
SslOpts = fun(Conf) ->
|
||||
Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf),
|
||||
lists:map(fun({["cluster", "etcd", "ssl", Name], Value}) ->
|
||||
{list_to_atom(Name), Value}
|
||||
end, Options)
|
||||
end,
|
||||
[{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")},
|
||||
{prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")},
|
||||
{node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}];
|
||||
{node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)},
|
||||
{ssl_options, SslOpts(Conf)}];
|
||||
(k8s) ->
|
||||
[{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)},
|
||||
{service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)},
|
||||
|
@ -1238,6 +1257,57 @@ end}.
|
|||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.compress", "emqx.listeners", [
|
||||
{datatype, {enum, [true, false]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [
|
||||
{datatype, {enum, [none, default, best_compression, best_speed]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
{validators, ["range:1-9"]},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [
|
||||
{datatype, {enum, [default, filtered, huffman_only, rle]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [
|
||||
{datatype, {enum, [takeover, no_takeover]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [
|
||||
{datatype, {enum, [takeover, no_takeover]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [
|
||||
{datatype, {duration, ms}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT/WebSocket/SSL Listeners
|
||||
|
||||
|
@ -1393,6 +1463,61 @@ end}.
|
|||
{datatype, {enum, [cn, dn, crt]}}
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.compress", "emqx.listeners", [
|
||||
{datatype, {enum, [true, false]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [
|
||||
{datatype, {enum, [none, default, best_compression, best_speed]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
{validators, ["range:1-9"]},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [
|
||||
{datatype, {enum, [default, filtered, huffman_only, rle]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [
|
||||
{datatype, {enum, [takeover, no_takeover]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [
|
||||
{datatype, {enum, [takeover, no_takeover]}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
{validators, ["range:8-15"]},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
{validators, ["range:8-15"]},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [
|
||||
{datatype, {duration, ms}},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [
|
||||
{datatype, integer},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
|
||||
|
||||
{translation, "emqx.listeners", fun(Conf) ->
|
||||
|
||||
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
|
||||
|
@ -1431,19 +1556,30 @@ end}.
|
|||
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
|
||||
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
|
||||
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
|
||||
{compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)},
|
||||
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)},
|
||||
{max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)},
|
||||
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
|
||||
end,
|
||||
DeflateOpts = fun(Prefix) ->
|
||||
Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)},
|
||||
{mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)},
|
||||
{strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)},
|
||||
{server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)},
|
||||
{client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)},
|
||||
{server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)},
|
||||
{client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}])
|
||||
end,
|
||||
TcpOpts = fun(Prefix) ->
|
||||
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
|
||||
{send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
|
||||
{send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
|
||||
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
|
||||
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
|
||||
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
|
||||
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
|
||||
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
|
||||
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
|
||||
{send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
|
||||
{send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
|
||||
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
|
||||
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
|
||||
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
|
||||
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
|
||||
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
|
||||
end,
|
||||
|
||||
SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
|
||||
MapPSKCiphers = fun(PSKCiphers) ->
|
||||
lists:map(
|
||||
|
@ -1496,7 +1632,8 @@ end}.
|
|||
case cuttlefish:conf_get(Prefix, Conf, undefined) of
|
||||
undefined -> [];
|
||||
ListenOn ->
|
||||
[{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
|
||||
[{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
|
||||
{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
|
||||
end
|
||||
end,
|
||||
|
||||
|
@ -1506,7 +1643,8 @@ end}.
|
|||
undefined ->
|
||||
[];
|
||||
ListenOn ->
|
||||
[{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)},
|
||||
[{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)},
|
||||
{tcp_options, TcpOpts(Prefix)},
|
||||
{ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}]
|
||||
end
|
||||
end,
|
||||
|
@ -1987,4 +2125,4 @@ end}.
|
|||
[{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)},
|
||||
{process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)},
|
||||
{process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}]
|
||||
end}.
|
||||
end}.
|
|
@ -1,6 +1,6 @@
|
|||
{deps, [{jsx, "2.9.0"},
|
||||
{gproc, "0.8.0"},
|
||||
{cowboy, "2.4.0"},
|
||||
{cowboy, "2.6.1"},
|
||||
{meck, "0.8.13"} %% temp workaround for version check
|
||||
]}.
|
||||
|
||||
|
@ -28,4 +28,3 @@
|
|||
{cover_export_enabled, true}.
|
||||
|
||||
{plugins, [coveralls]}.
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-boot_mnesia({mnesia, [boot]}).
|
||||
-copy_mnesia({mnesia, [copy]}).
|
||||
|
||||
%% gen_server callbacks
|
||||
%% gen_event callbacks
|
||||
-export([ init/1
|
||||
, handle_event/2
|
||||
, handle_call/2
|
||||
|
|
|
@ -174,7 +174,7 @@ to_list(Msg) ->
|
|||
to_bin_key_list(Msg) ->
|
||||
lists:zipwith(
|
||||
fun(Key, Val) ->
|
||||
{bin(Key), Val}
|
||||
{bin(Key), bin_key_map(Val)}
|
||||
end, record_info(fields, message), tl(tuple_to_list(Msg))).
|
||||
|
||||
%% MilliSeconds
|
||||
|
@ -192,6 +192,13 @@ format(flags, Flags) ->
|
|||
format(headers, Headers) ->
|
||||
io_lib:format("~p", [Headers]).
|
||||
|
||||
bin_key_map(Map) when is_map(Map) ->
|
||||
maps:fold(fun(Key, Val, Acc) ->
|
||||
Acc#{bin(Key) => bin_key_map(Val)}
|
||||
end, #{}, Map);
|
||||
bin_key_map(Data) ->
|
||||
Data.
|
||||
|
||||
bin(Bin) when is_binary(Bin) -> Bin;
|
||||
bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom));
|
||||
bin(Str) when is_list(Str) -> list_to_binary(Str).
|
||||
bin(Str) when is_list(Str) -> list_to_binary(Str).
|
||||
|
|
|
@ -142,7 +142,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
|||
true -> alarm_handler:clear_alarm(cpu_high_watermark);
|
||||
false -> ok
|
||||
end,
|
||||
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})}
|
||||
{noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})};
|
||||
_Busy ->
|
||||
{noreply, ensure_check_timer(State)}
|
||||
end.
|
||||
|
||||
terminate(_Reason, #{timer := Timer}) ->
|
||||
|
|
|
@ -57,7 +57,6 @@
|
|||
will_topic,
|
||||
will_msg,
|
||||
keepalive,
|
||||
mountpoint,
|
||||
is_bridge,
|
||||
enable_ban,
|
||||
enable_acl,
|
||||
|
@ -101,7 +100,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
|
|||
clean_start = false,
|
||||
topic_aliases = #{},
|
||||
packet_size = emqx_zone:get_env(Zone, max_packet_size),
|
||||
mountpoint = emqx_zone:get_env(Zone, mountpoint),
|
||||
is_bridge = false,
|
||||
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
|
||||
enable_acl = emqx_zone:get_env(Zone, enable_acl),
|
||||
|
@ -153,7 +151,6 @@ attrs(#pstate{zone = Zone,
|
|||
proto_ver = ProtoVer,
|
||||
proto_name = ProtoName,
|
||||
keepalive = Keepalive,
|
||||
mountpoint = Mountpoint,
|
||||
is_bridge = IsBridge,
|
||||
connected_at = ConnectedAt,
|
||||
conn_mod = ConnMod,
|
||||
|
@ -167,7 +164,6 @@ attrs(#pstate{zone = Zone,
|
|||
{proto_name, ProtoName},
|
||||
{clean_start, CleanStart},
|
||||
{keepalive, Keepalive},
|
||||
{mountpoint, Mountpoint},
|
||||
{is_bridge, IsBridge},
|
||||
{connected_at, ConnectedAt},
|
||||
{conn_mod, ConnMod},
|
||||
|
@ -202,16 +198,27 @@ caps(#pstate{zone = Zone}) ->
|
|||
client_id(#pstate{client_id = ClientId}) ->
|
||||
ClientId.
|
||||
|
||||
credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 ->
|
||||
Credentials;
|
||||
credentials(#pstate{zone = Zone,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
peername = Peername}) ->
|
||||
#{zone => Zone,
|
||||
client_id => ClientId,
|
||||
username => Username,
|
||||
peername => Peername}.
|
||||
credentials(#pstate{zone = Zone,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
peername = Peername,
|
||||
peercert = Peercert}) ->
|
||||
with_cert(#{zone => Zone,
|
||||
client_id => ClientId,
|
||||
username => Username,
|
||||
peername => Peername,
|
||||
mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
|
||||
|
||||
with_cert(Credentials, undefined) -> Credentials;
|
||||
with_cert(Credentials, Peercert) ->
|
||||
Credentials#{dn => esockd_peercert:subject(Peercert),
|
||||
cn => esockd_peercert:common_name(Peercert)}.
|
||||
|
||||
keepsafety(Credentials) ->
|
||||
maps:filter(fun(password, _) -> false;
|
||||
(dn, _) -> false;
|
||||
(cn, _) -> false;
|
||||
(_, _) -> true end, Credentials).
|
||||
|
||||
stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
|
||||
send_stats = #{pkt := SendPkt, msg := SendMsg}}) ->
|
||||
|
@ -389,7 +396,7 @@ process(?CONNECT_PACKET(
|
|||
case try_open_session(SessAttrs, PState3) of
|
||||
{ok, SPid, SP} ->
|
||||
PState4 = PState3#pstate{session = SPid, connected = true,
|
||||
credentials = maps:remove(password, Credentials0)},
|
||||
credentials = keepsafety(Credentials0)},
|
||||
ok = emqx_cm:register_connection(client_id(PState4)),
|
||||
true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
|
||||
%% Start keepalive
|
||||
|
@ -469,24 +476,12 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid})
|
|||
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
|
||||
|
||||
process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
||||
PState = #pstate{session = SPid, mountpoint = Mountpoint,
|
||||
proto_ver = ProtoVer, is_bridge = IsBridge,
|
||||
ignore_loop = IgnoreLoop}) ->
|
||||
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
|
||||
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
|
||||
case IsBridge of
|
||||
true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
|
||||
false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
|
||||
end;
|
||||
true ->
|
||||
RawTopicFilters
|
||||
end,
|
||||
case check_subscribe(
|
||||
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
|
||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
||||
case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
|
||||
{ok, TopicFilters} ->
|
||||
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters),
|
||||
ok = emqx_session:subscribe(SPid, PacketId, Properties,
|
||||
emqx_mountpoint:mount(Mountpoint, TopicFilters0)),
|
||||
TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
|
||||
TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0),
|
||||
ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1),
|
||||
{ok, PState};
|
||||
{error, TopicFilters} ->
|
||||
{SubTopics, ReasonCodes} =
|
||||
|
@ -506,11 +501,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
|||
end;
|
||||
|
||||
process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
|
||||
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
|
||||
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)],
|
||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
||||
TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials],
|
||||
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
|
||||
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
|
||||
emqx_mountpoint:mount(MountPoint, TopicFilters)),
|
||||
emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)),
|
||||
{ok, PState};
|
||||
|
||||
process(?PACKET(?PINGREQ), PState) ->
|
||||
|
@ -538,12 +533,12 @@ process(?DISCONNECT_PACKET(_), PState) ->
|
|||
%% ConnAck --> Client
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
connack({?RC_SUCCESS, SP, PState}) ->
|
||||
ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
|
||||
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
|
||||
connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) ->
|
||||
ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]),
|
||||
deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
|
||||
|
||||
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
|
||||
ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
|
||||
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) ->
|
||||
ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]),
|
||||
[ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
|
||||
_ = deliver({connack, ReasonCode1}, PState),
|
||||
{error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
|
||||
|
@ -553,9 +548,9 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
||||
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
|
||||
Msg = emqx_mountpoint:mount(MountPoint,
|
||||
emqx_packet:to_message(credentials(PState), Packet)),
|
||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
||||
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
|
||||
emqx_packet:to_message(Credentials, Packet)),
|
||||
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -651,10 +646,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
|
|||
deliver({connack, ReasonCode, SP}, PState) ->
|
||||
send(?CONNACK_PACKET(ReasonCode, SP), PState);
|
||||
|
||||
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
|
||||
Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
|
||||
deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) ->
|
||||
Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg),
|
||||
Msg1 = emqx_message:update_expiry(Msg0),
|
||||
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
|
||||
Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1),
|
||||
send(emqx_packet:from_message(PacketId, Msg2), PState);
|
||||
|
||||
deliver({puback, PacketId, ReasonCode}, PState) ->
|
||||
|
@ -818,8 +813,8 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState)
|
|||
|
||||
check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl ->
|
||||
ok;
|
||||
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
|
||||
case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of
|
||||
check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) ->
|
||||
case emqx_access_control:check_acl(Credentials, publish, WillTopic) of
|
||||
allow -> ok;
|
||||
deny ->
|
||||
?LOG(warning, "[Protocol] Cannot publish will message to ~p for acl denied", [WillTopic]),
|
||||
|
@ -838,8 +833,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
|
|||
check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
|
||||
when IsSuper orelse (not EnableAcl) ->
|
||||
ok;
|
||||
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
|
||||
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
|
||||
check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) ->
|
||||
case emqx_access_control:check_acl(Credentials, publish, Topic) of
|
||||
allow -> ok;
|
||||
deny -> {error, ?RC_NOT_AUTHORIZED}
|
||||
end.
|
||||
|
@ -865,10 +860,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
|
|||
check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
|
||||
when IsSuper orelse (not EnableAcl) ->
|
||||
{ok, TopicFilters};
|
||||
check_sub_acl(TopicFilters, PState) ->
|
||||
check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) ->
|
||||
lists:foldr(
|
||||
fun({Topic, SubOpts}, {Ok, Acc}) ->
|
||||
case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
|
||||
case emqx_access_control:check_acl(Credentials, publish, Topic) of
|
||||
allow -> {Ok, [{Topic, SubOpts}|Acc]};
|
||||
deny ->
|
||||
{error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
|
||||
|
@ -900,9 +895,10 @@ terminate(conflict, _PState) ->
|
|||
ok;
|
||||
terminate(discard, _PState) ->
|
||||
ok;
|
||||
terminate(Reason, PState) ->
|
||||
|
||||
terminate(Reason, #pstate{credentials = Credentials}) ->
|
||||
?LOG(info, "[Protocol] Shutdown for ~p", [Reason]),
|
||||
ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
|
||||
ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
|
||||
|
||||
start_keepalive(0, _PState) ->
|
||||
ignore;
|
||||
|
@ -920,14 +916,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
|
|||
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
|
||||
lists:map(fun emqx_topic:parse/1, RawTopicFilters).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Update mountpoint
|
||||
|
||||
update_mountpoint(PState = #pstate{mountpoint = undefined}) ->
|
||||
PState;
|
||||
update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
|
||||
PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}.
|
||||
|
||||
sp(true) -> 1;
|
||||
sp(false) -> 0.
|
||||
|
||||
|
@ -974,3 +962,20 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
|
|||
undefined;
|
||||
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
|
||||
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
|
||||
|
||||
raw_topic_filters(#pstate{proto_ver = ProtoVer,
|
||||
is_bridge = IsBridge,
|
||||
ignore_loop = IgnoreLoop}, RawTopicFilters) ->
|
||||
case ProtoVer < ?MQTT_PROTO_V5 of
|
||||
true ->
|
||||
IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
|
||||
case IsBridge of
|
||||
true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
|
||||
false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
|
||||
end;
|
||||
false ->
|
||||
RawTopicFilters
|
||||
end.
|
||||
|
||||
mountpoint(Credentials) ->
|
||||
maps:get(mountpoint, Credentials, undefined).
|
||||
|
|
|
@ -109,7 +109,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer,
|
|||
true -> alarm_handler:clear_alarm(too_many_processes);
|
||||
false -> ok
|
||||
end,
|
||||
{noreply, ensure_check_timer(State#{is_process_alarm_set := false})}
|
||||
{noreply, ensure_check_timer(State#{is_process_alarm_set := false})};
|
||||
_Precent ->
|
||||
{noreply, ensure_check_timer(State)}
|
||||
end.
|
||||
|
||||
terminate(_Reason, #{timer := Timer}) ->
|
||||
|
|
|
@ -113,12 +113,23 @@ call(WSPid, Req) when is_pid(WSPid) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init(Req, Opts) ->
|
||||
IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000),
|
||||
DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
|
||||
MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
|
||||
0 -> infinity;
|
||||
MFS -> MFS
|
||||
end,
|
||||
Compress = proplists:get_value(compress, Opts, false),
|
||||
Options = #{compress => Compress,
|
||||
deflate_opts => DeflateOptions,
|
||||
max_frame_size => MaxFrameSize,
|
||||
idle_timeout => IdleTimeout},
|
||||
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
||||
undefined ->
|
||||
{cowboy_websocket, Req, #state{}};
|
||||
{cowboy_websocket, Req, #state{}, Options};
|
||||
[<<"mqtt", Vsn/binary>>] ->
|
||||
Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req),
|
||||
{cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}};
|
||||
{cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options};
|
||||
_ ->
|
||||
{ok, cowboy_req:reply(400, Req), #state{}}
|
||||
end.
|
||||
|
@ -308,4 +319,3 @@ shutdown(Reason, State) ->
|
|||
|
||||
wsock_stats() ->
|
||||
[{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS].
|
||||
|
||||
|
|
Loading…
Reference in New Issue