From f5c426a2f2a42e7c29cba8a4a1f619cf9c3ab5cc Mon Sep 17 00:00:00 2001 From: tigercl Date: Wed, 27 Mar 2019 10:18:51 +0800 Subject: [PATCH 1/6] Add missed case for monitors (#2353) --- src/emqx_os_mon.erl | 4 +++- src/emqx_vm_mon.erl | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index f8d8f41e1..a4fe7a9a7 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -142,7 +142,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, true -> alarm_handler:clear_alarm(cpu_high_watermark); false -> ok end, - {noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})} + {noreply, ensure_check_timer(State#{is_cpu_alarm_set := false})}; + _Busy -> + {noreply, ensure_check_timer(State)} end. terminate(_Reason, #{timer := Timer}) -> diff --git a/src/emqx_vm_mon.erl b/src/emqx_vm_mon.erl index f91fc519c..10be9d71b 100644 --- a/src/emqx_vm_mon.erl +++ b/src/emqx_vm_mon.erl @@ -109,7 +109,9 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, true -> alarm_handler:clear_alarm(too_many_processes); false -> ok end, - {noreply, ensure_check_timer(State#{is_process_alarm_set := false})} + {noreply, ensure_check_timer(State#{is_process_alarm_set := false})}; + _Precent -> + {noreply, ensure_check_timer(State)} end. terminate(_Reason, #{timer := Timer}) -> From 2534b8dc64c851453616b1a956ebbf247a3b23dd Mon Sep 17 00:00:00 2001 From: Gilbert Date: Wed, 27 Mar 2019 10:19:35 +0800 Subject: [PATCH 2/6] Support to pass ws compressing options (#2356) Add new config entries about websocket --- Makefile | 2 +- etc/emqx.conf | 161 ++++++++++++++++++++++++++++++++++--- priv/emqx.schema | 141 +++++++++++++++++++++++++++++--- rebar.config | 3 +- src/emqx_ws_connection.erl | 16 +++- 5 files changed, 294 insertions(+), 29 deletions(-) diff --git a/Makefile b/Makefile index e7bab43ba..5e5d456c9 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ dep_gproc = git-emqx https://github.com/uwiger/gproc 0.8.0 dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.1 dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4 dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3 -dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.4.0 +dep_cowboy = git-emqx https://github.com/ninenines/cowboy 2.6.1 dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1 NO_AUTOPATCH = cuttlefish diff --git a/etc/emqx.conf b/etc/emqx.conf index cb2d0bb00..35861d77e 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1261,7 +1261,7 @@ listener.ws.external.zone = external ## The access control for the MQTT/WebSocket listener. ## -## See: listener.tcp.$name.access +## See: listener.ws.$name.access ## ## Value: ACL Rule listener.ws.external.access.1 = allow all @@ -1286,74 +1286,143 @@ listener.ws.external.verify_protocol_header = on ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## -## See: listener.tcp.$name.proxy_protocol +## See: listener.ws.$name.proxy_protocol ## ## Value: on | off ## listener.ws.external.proxy_protocol = on ## Sets the timeout for proxy protocol. ## -## See: listener.tcp.$name.proxy_protocol_timeout +## See: listener.ws.$name.proxy_protocol_timeout ## ## Value: Duration ## listener.ws.external.proxy_protocol_timeout = 3s ## The TCP backlog of external MQTT/WebSocket Listener. ## -## See: listener.tcp.$name.backlog +## See: listener.ws.$name.backlog ## ## Value: Number >= 0 listener.ws.external.backlog = 1024 ## The TCP send timeout for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.send_timeout +## See: listener.ws.$name.send_timeout ## ## Value: Duration listener.ws.external.send_timeout = 15s ## Close the MQTT/WebSocket connection if send timeout. ## -## See: listener.tcp.$name.send_timeout_close +## See: listener.ws.$name.send_timeout_close ## ## Value: on | off listener.ws.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.recbuf +## See: listener.ws.$name.recbuf ## ## Value: Bytes ## listener.ws.external.recbuf = 2KB ## The TCP send buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.sndbuf +## See: listener.ws.$name.sndbuf ## ## Value: Bytes ## listener.ws.external.sndbuf = 2KB ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp.$name.buffer +## See: listener.ws.$name.buffer ## ## Value: Bytes ## listener.ws.external.buffer = 2KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## -## See: listener.tcp.$name.tune_buffer +## See: listener.ws.$name.tune_buffer ## ## Value: on | off ## listener.ws.external.tune_buffer = off ## The TCP_NODELAY flag for external MQTT/WebSocket connections. ## -## See: listener.tcp.$name.nodelay +## See: listener.ws.$name.nodelay ## ## Value: true | false listener.ws.external.nodelay = true +## The compress flag for external MQTT/WebSocket connections. +## +## If this Value is set true,the websocket message would be compressed +## +## Value: true | false +## listener.ws.external.compress = true + +## The level of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.level +## +## Value: none | default | best_compression | best_speed +## listener.ws.external.deflate_opts.level = default + +## The mem_level of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.mem_level +## +## Valid range is 1-9 +## listener.ws.external.deflate_opts.mem_level = 8 + +## The strategy of deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.strategy +## +## Value: default | filtered | huffman_only | rle +## listener.ws.external.deflate_opts.strategy = default + +## The deflate option for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.server_context_takeover +## +## Value: takeover | no_takeover +## listener.ws.external.deflate_opts.server_context_takeover = takeover + +## The deflate option for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.client_context_takeover +## +## Value: takeover | no_takeover +## listener.ws.external.deflate_opts.client_context_takeover = takeover + +## The deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.server_max_window_bits +## +## Valid range is 8-15 +## listener.ws.external.deflate_opts.server_max_window_bits = 15 + +## The deflate options for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.deflate_opts.client_max_window_bits +## +## Valid range is 8-15 +## listener.ws.external.deflate_opts.client_max_window_bits = 15 + +## The idle timeout for external MQTT/WebSocket connections. +## +## See: listener.ws.$name.idle_timeout +## +## Value: Duration +## listener.ws.external.idle_timeout = 60s + +## The max frame size for external MQTT/WebSocket connections. +## +## +## Value: Number +## listener.ws.external.max_frame_size = 0 + ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol @@ -1486,7 +1555,7 @@ listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G ## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot ## be configured at the same time. ## See 'https://tools.ietf.org/html/rfc4279#section-2'. -#listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA +## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA ## See: listener.ssl.$name.secure_renegotiate ## @@ -1557,6 +1626,74 @@ listener.wss.external.send_timeout_close = on ## Value: true | false ## listener.wss.external.nodelay = true +## The compress flag for external WebSocket/SSL connections. +## +## If this Value is set true,the websocket message would be compressed +## +## Value: true | false +## listener.wss.external.compress = true + +## The level of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.level +## +## Value: none | default | best_compression | best_speed +## listener.wss.external.deflate_opts.level = default + +## The mem_level of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.mem_level +## +## Valid range is 1-9 +## listener.wss.external.deflate_opts.mem_level = 8 + +## The strategy of deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.strategy +## +## Value: default | filtered | huffman_only | rle +## listener.wss.external.deflate_opts.strategy = default + +## The deflate option for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.server_context_takeover +## +## Value: takeover | no_takeover +## listener.wss.external.deflate_opts.server_context_takeover = takeover + +## The deflate option for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.client_context_takeover +## +## Value: takeover | no_takeover +## listener.wss.external.deflate_opts.client_context_takeover = takeover + +## The deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.server_max_window_bits +## +## Valid range is 8-15 +## listener.wss.external.deflate_opts.server_max_window_bits = 15 + +## The deflate options for external WebSocket/SSL connections. +## +## See: listener.wss.$name.deflate_opts.client_max_window_bits +## +## Valid range is 8-15 +## listener.wss.external.deflate_opts.client_max_window_bits = 15 + +## The idle timeout for external WebSocket/SSL connections. +## +## See: listener.wss.$name.idle_timeout +## +## Value: Duration +## listener.wss.external.idle_timeout = 60s + +## The max frame size for external WebSocket/SSL connections. +## +## Value: Number +## listener.wss.external.max_frame_size = 0 + ##-------------------------------------------------------------------- ## Bridges ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 25589f138..8a97984f6 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1238,6 +1238,57 @@ end}. hidden ]}. +{mapping, "listener.ws.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + %%-------------------------------------------------------------------- %% MQTT/WebSocket/SSL Listeners @@ -1393,6 +1444,61 @@ end}. {datatype, {enum, [cn, dn, crt]}} ]}. +{mapping, "listener.wss.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + + + {translation, "emqx.listeners", fun(Conf) -> Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, @@ -1431,19 +1537,30 @@ end}. {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, + {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, + {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, + {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) end, + DeflateOpts = fun(Prefix) -> + Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, + {mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)}, + {strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)}, + {server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)}, + {client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)}, + {server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)}, + {client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}]) + end, TcpOpts = fun(Prefix) -> - Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, - {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, - {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, - {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, - {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, - {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, - {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, - {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, - SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, MapPSKCiphers = fun(PSKCiphers) -> lists:map( @@ -1496,7 +1613,8 @@ end}. case cuttlefish:conf_get(Prefix, Conf, undefined) of undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, + {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] end end, @@ -1506,7 +1624,8 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}, + [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, + {tcp_options, TcpOpts(Prefix)}, {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] end end, diff --git a/rebar.config b/rebar.config index 2e16b86b9..005331120 100644 --- a/rebar.config +++ b/rebar.config @@ -1,6 +1,6 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {cowboy, "2.4.0"}, + {cowboy, "2.6.1"}, {meck, "0.8.13"} %% temp workaround for version check ]}. @@ -28,4 +28,3 @@ {cover_export_enabled, true}. {plugins, [coveralls]}. - diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 6d47e16ad..74a853c46 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -113,12 +113,23 @@ call(WSPid, Req) when is_pid(WSPid) -> %%------------------------------------------------------------------------------ init(Req, Opts) -> + IdleTimeout = proplists:get_value(idle_timeout, Opts, 60000), + DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])), + MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of + 0 -> infinity; + MFS -> MFS + end, + Compress = proplists:get_value(compress, Opts, false), + Options = #{compress => Compress, + deflate_opts => DeflateOptions, + max_frame_size => MaxFrameSize, + idle_timeout => IdleTimeout}, case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> - {cowboy_websocket, Req, #state{}}; + {cowboy_websocket, Req, #state{}, Options}; [<<"mqtt", Vsn/binary>>] -> Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req), - {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}}; + {cowboy_websocket, Resp, #state{request = Req, options = Opts}, Options}; _ -> {ok, cowboy_req:reply(400, Req), #state{}} end. @@ -308,4 +319,3 @@ shutdown(Reason, State) -> wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. - From c223f62c5afa1695324ed9b547d5e4aa30247be2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 27 Mar 2019 10:20:12 +0800 Subject: [PATCH 3/6] Put cn/dn of client cert into credentials (#2357) Put cn/dn of client cert into credentials --- src/emqx_alarm_handler.erl | 2 +- src/emqx_protocol.erl | 30 +++++++++++++++++++++--------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 6c7d1a470..b0f442867 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -25,7 +25,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% gen_server callbacks +%% gen_event callbacks -export([ init/1 , handle_event/2 , handle_call/2 diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index cd5756671..6dcd9aba1 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -204,14 +204,26 @@ client_id(#pstate{client_id = ClientId}) -> credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 -> Credentials; -credentials(#pstate{zone = Zone, - client_id = ClientId, - username = Username, - peername = Peername}) -> - #{zone => Zone, - client_id => ClientId, - username => Username, - peername => Peername}. +credentials(#pstate{zone = Zone, + client_id = ClientId, + username = Username, + peername = Peername, + peercert = Peercert}) -> + with_cert(#{zone => Zone, + client_id => ClientId, + username => Username, + peername => Peername}, Peercert). + +with_cert(Credentials, undefined) -> Credentials; +with_cert(Credentials, Peercert) -> + Credentials#{dn => esockd_peercert:subject(Peercert), + cn => esockd_peercert:common_name(Peercert)}. + +keepsafety(Credentials) -> + maps:filter(fun(password, _) -> false; + (dn, _) -> false; + (cn, _) -> false; + (_, _) -> true end, Credentials). stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg}, send_stats = #{pkt := SendPkt, msg := SendMsg}}) -> @@ -389,7 +401,7 @@ process(?CONNECT_PACKET( case try_open_session(SessAttrs, PState3) of {ok, SPid, SP} -> PState4 = PState3#pstate{session = SPid, connected = true, - credentials = maps:remove(password, Credentials0)}, + credentials = keepsafety(Credentials0)}, ok = emqx_cm:register_connection(client_id(PState4)), true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)), %% Start keepalive From e6d90d5758d1cab7d45917439e42782c05e3dc1b Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Wed, 27 Mar 2019 11:10:51 +0800 Subject: [PATCH 4/6] Improve mountpoint (#2363) --- src/emqx_protocol.erl | 100 +++++++++++++++++++----------------------- 1 file changed, 46 insertions(+), 54 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6dcd9aba1..4ddce1067 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -57,7 +57,6 @@ will_topic, will_msg, keepalive, - mountpoint, is_bridge, enable_ban, enable_acl, @@ -101,7 +100,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF clean_start = false, topic_aliases = #{}, packet_size = emqx_zone:get_env(Zone, max_packet_size), - mountpoint = emqx_zone:get_env(Zone, mountpoint), is_bridge = false, enable_ban = emqx_zone:get_env(Zone, enable_ban, false), enable_acl = emqx_zone:get_env(Zone, enable_acl), @@ -153,7 +151,6 @@ attrs(#pstate{zone = Zone, proto_ver = ProtoVer, proto_name = ProtoName, keepalive = Keepalive, - mountpoint = Mountpoint, is_bridge = IsBridge, connected_at = ConnectedAt, conn_mod = ConnMod, @@ -167,7 +164,6 @@ attrs(#pstate{zone = Zone, {proto_name, ProtoName}, {clean_start, CleanStart}, {keepalive, Keepalive}, - {mountpoint, Mountpoint}, {is_bridge, IsBridge}, {connected_at, ConnectedAt}, {conn_mod, ConnMod}, @@ -202,8 +198,6 @@ caps(#pstate{zone = Zone}) -> client_id(#pstate{client_id = ClientId}) -> ClientId. -credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 -> - Credentials; credentials(#pstate{zone = Zone, client_id = ClientId, username = Username, @@ -212,7 +206,8 @@ credentials(#pstate{zone = Zone, with_cert(#{zone => Zone, client_id => ClientId, username => Username, - peername => Peername}, Peercert). + peername => Peername, + mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert). with_cert(Credentials, undefined) -> Credentials; with_cert(Credentials, Peercert) -> @@ -481,24 +476,12 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState}; process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = Mountpoint, - proto_ver = ProtoVer, is_bridge = IsBridge, - ignore_loop = IgnoreLoop}) -> - RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 -> - IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, - case IsBridge of - true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; - false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] - end; - true -> - RawTopicFilters - end, - case check_subscribe( - parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of + PState = #pstate{session = SPid, credentials = Credentials}) -> + case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of {ok, TopicFilters} -> - TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters), - ok = emqx_session:subscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(Mountpoint, TopicFilters0)), + TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters), + TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0), + ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1), {ok, PState}; {error, TopicFilters} -> {SubTopics, ReasonCodes} = @@ -518,11 +501,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), end; process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), - PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)], + PState = #pstate{session = SPid, credentials = Credentials}) -> + TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials], parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)), ok = emqx_session:unsubscribe(SPid, PacketId, Properties, - emqx_mountpoint:mount(MountPoint, TopicFilters)), + emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)), {ok, PState}; process(?PACKET(?PINGREQ), PState) -> @@ -550,12 +533,12 @@ process(?DISCONNECT_PACKET(_), PState) -> %% ConnAck --> Client %%------------------------------------------------------------------------------ -connack({?RC_SUCCESS, SP, PState}) -> - ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]), - deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); +connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) -> + ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]), + deliver({connack, ?RC_SUCCESS, sp(SP)}, PState); -connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> - ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]), +connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) -> + ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]), [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer), _ = deliver({connack, ReasonCode1}, PState), {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}. @@ -565,9 +548,9 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> %%------------------------------------------------------------------------------ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId), - PState = #pstate{session = SPid, mountpoint = MountPoint}) -> - Msg = emqx_mountpoint:mount(MountPoint, - emqx_packet:to_message(credentials(PState), Packet)), + PState = #pstate{session = SPid, credentials = Credentials}) -> + Msg = emqx_mountpoint:mount(mountpoint(Credentials), + emqx_packet:to_message(Credentials, Packet)), puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState). %%------------------------------------------------------------------------------ @@ -663,10 +646,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, deliver({connack, ReasonCode, SP}, PState) -> send(?CONNACK_PACKET(ReasonCode, SP), PState); -deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) -> - Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg), +deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) -> + Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg), Msg1 = emqx_message:update_expiry(Msg0), - Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1), + Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1), send(emqx_packet:from_message(PacketId, Msg2), PState); deliver({puback, PacketId, ReasonCode}, PState) -> @@ -830,8 +813,8 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState) check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl -> ok; -check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) -> - case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of +check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) -> + case emqx_access_control:check_acl(Credentials, publish, WillTopic) of allow -> ok; deny -> ?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]), @@ -850,8 +833,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> ok; -check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) -> - case emqx_access_control:check_acl(credentials(PState), publish, Topic) of +check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) -> + case emqx_access_control:check_acl(Credentials, publish, Topic) of allow -> ok; deny -> {error, ?RC_NOT_AUTHORIZED} end. @@ -877,10 +860,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) -> check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl}) when IsSuper orelse (not EnableAcl) -> {ok, TopicFilters}; -check_sub_acl(TopicFilters, PState) -> +check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) -> lists:foldr( fun({Topic, SubOpts}, {Ok, Acc}) -> - case emqx_access_control:check_acl(credentials(PState), publish, Topic) of + case emqx_access_control:check_acl(Credentials, publish, Topic) of allow -> {Ok, [{Topic, SubOpts}|Acc]}; deny -> {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]} @@ -912,9 +895,9 @@ terminate(conflict, _PState) -> ok; terminate(discard, _PState) -> ok; -terminate(Reason, PState) -> +terminate(Reason, #pstate{credentials = Credentials}) -> ?LOG(info, "Shutdown for ~p", [Reason]), - ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]). + ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]). start_keepalive(0, _PState) -> ignore; @@ -932,14 +915,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) -> parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) -> lists:map(fun emqx_topic:parse/1, RawTopicFilters). -%%------------------------------------------------------------------------------ -%% Update mountpoint - -update_mountpoint(PState = #pstate{mountpoint = undefined}) -> - PState; -update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> - PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}. - sp(true) -> 1; sp(false) -> 0. @@ -986,3 +961,20 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) -> undefined; reason_codes_compat(PktType, ReasonCodes, _ProtoVer) -> [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes]. + +raw_topic_filters(#pstate{proto_ver = ProtoVer, + is_bridge = IsBridge, + ignore_loop = IgnoreLoop}, RawTopicFilters) -> + case ProtoVer < ?MQTT_PROTO_V5 of + true -> + IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end, + case IsBridge of + true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]; + false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters] + end; + false -> + RawTopicFilters + end. + +mountpoint(Credentials) -> + maps:get(mountpoint, Credentials, undefined). From a3fd8846a58d791ddb3547c3b7374241a48b7aeb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Mar 2019 11:12:28 +0800 Subject: [PATCH 5/6] Bin key map (#2362) Suppport nested bin-key map --- src/emqx_message.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/emqx_message.erl b/src/emqx_message.erl index f7cab7f6c..d4f490ed7 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -174,7 +174,7 @@ to_list(Msg) -> to_bin_key_list(Msg) -> lists:zipwith( fun(Key, Val) -> - {bin(Key), Val} + {bin(Key), bin_key_map(Val)} end, record_info(fields, message), tl(tuple_to_list(Msg))). %% MilliSeconds @@ -192,6 +192,13 @@ format(flags, Flags) -> format(headers, Headers) -> io_lib:format("~p", [Headers]). +bin_key_map(Map) when is_map(Map) -> + maps:fold(fun(Key, Val, Acc) -> + Acc#{bin(Key) => bin_key_map(Val)} + end, #{}, Map); +bin_key_map(Data) -> + Data. + bin(Bin) when is_binary(Bin) -> Bin; bin(Atom) when is_atom(Atom) -> list_to_binary(atom_to_list(Atom)); -bin(Str) when is_list(Str) -> list_to_binary(Str). \ No newline at end of file +bin(Str) when is_list(Str) -> list_to_binary(Str). From fee94525db5eae0904bff063f261d950ecca6cb7 Mon Sep 17 00:00:00 2001 From: YoukiLin <1045735402@qq.com> Date: Wed, 27 Mar 2019 13:53:40 +0800 Subject: [PATCH 6/6] Add etcd client certificate configuration (#2367) * Add etcd client certificate --- etc/emqx.conf | 16 ++++++++++++++++ priv/emqx.schema | 23 +++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 35861d77e..55d889deb 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -114,6 +114,22 @@ cluster.autoclean = 5m ## Default: 1m, 1 minute ## cluster.etcd.node_ttl = 1m +## Path to a file containing the client's private PEM-encoded key. +## +## Value: File +## cluster.etcd.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem + +## The path to a file containing the client's certificate. +## +## Value: File +## cluster.etcd.ssl.certfile = {{ platform_etc_dir }}/certs/client.pem + +## Path to the file containing PEM-encoded CA certificates. The CA certificates +## are used during server authentication and when building the client certificate chain. +## +## Value: File +## cluster.etcd.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem + ##-------------------------------------------------------------------- ## Cluster using Kubernates diff --git a/priv/emqx.schema b/priv/emqx.schema index 8a97984f6..e6944ba79 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -105,6 +105,18 @@ {default, "1m"} ]}. +{mapping, "cluster.etcd.ssl.keyfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.ssl.certfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.ssl.cacertfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + %%-------------------------------------------------------------------- %% Cluster on K8s @@ -149,9 +161,16 @@ [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; (etcd) -> + SslOpts = fun(Conf) -> + Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf), + lists:map(fun({["cluster", "etcd", "ssl", Name], Value}) -> + {list_to_atom(Name), Value} + end, Options) + end, [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, - {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}]; + {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}, + {ssl_options, SslOpts(Conf)}]; (k8s) -> [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, @@ -2106,4 +2125,4 @@ end}. [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf)}, {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf)}] -end}. +end}. \ No newline at end of file