diff --git a/Makefile b/Makefile index 38051db1c..509794bbd 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker PROJECT_VERSION = 3.0 -DEPS = jsx gproc gen_rpc lager ekka esockd mochiweb clique +DEPS = jsx gproc gen_rpc lager ekka esockd minirest clique dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0 @@ -12,7 +12,7 @@ dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.1.1 dep_lager = git https://github.com/erlang-lager/lager 3.6.4 dep_esockd = git https://github.com/emqx/esockd emqx30 dep_ekka = git https://github.com/emqx/ekka emqx30 -dep_mochiweb = git https://github.com/emqtt/mochiweb emqx30 +dep_minirest = git https://github.com/emqx/minirest emqx30 dep_clique = git https://github.com/emqx/clique NO_AUTOPATCH = gen_rpc cuttlefish diff --git a/etc/emqx.conf b/etc/emqx.conf index 555a6d63b..9db1761bc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1176,57 +1176,11 @@ listener.ws.external.acceptors = 4 ## Value: Number listener.ws.external.max_clients = 102400 -## Maximum MQTT/WebSocket connections per second. -## -## Value: Number -listener.ws.external.max_conn_rate = 1000 - ## Zone of the external MQTT/WebSocket listener belonged to. ## ## Value: String listener.ws.external.zone = external -## Mountpoint of the MQTT/WebSocket Listener. -## -## See: listener.tcp..mountpoint -## -## Value: String -## listener.ws.external.mountpoint = external/ - -## The access control for the MQTT/WebSocket listener. -## -## See: listener.tcp..access -## -## Value: ACL Rule -listener.ws.external.access.1 = allow all - -## Use X-Forwarded-For header for real source IP if the EMQ cluster is -## deployed behind NGINX or HAProxy. -## -## Value: String -## listener.ws.external.proxy_address_header = X-Forwarded-For - -## Use X-Forwarded-Port header for real source port if the EMQ cluster is -## deployed behind NGINX or HAProxy. -## -## Value: String -## listener.ws.external.proxy_port_header = X-Forwarded-Port - -## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind -## HAProxy or Nginx. -## -## See: listener.tcp..proxy_protocol -## -## Value: on | off -## listener.ws.external.proxy_protocol = on - -## Sets the timeout for proxy protocol. -## -## See: listener.tcp..proxy_protocol_timeout -## -## Value: Duration -## listener.ws.external.proxy_protocol_timeout = 3s - ## The TCP backlog of external MQTT/WebSocket Listener. ## ## See: listener.tcp..backlog @@ -1283,11 +1237,6 @@ listener.ws.external.send_timeout_close = on ## Value: true | false listener.ws.external.nodelay = true -## The SO_REUSEADDR flag for MQTT/WebSocket Listener. -## -## Value: true | false -listener.ws.external.reuseaddr = true - ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol @@ -1309,11 +1258,6 @@ listener.wss.external.acceptors = 4 ## Value: Number listener.wss.external.max_clients = 64 -## Maximum MQTT/WebSocket/SSL connections per second. -## -## Value: Number -listener.wss.external.max_conn_rate = 1000 - ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## ## Value: String @@ -1326,37 +1270,6 @@ listener.wss.external.zone = external ## Value: String ## listener.wss.external.mountpoint = inbound/ -## The access control rules for the MQTT/WebSocket/SSL listener. -## -## See: listener.tcp..access. -## -## Value: ACL Rule -listener.wss.external.access.1 = allow all - -## See: listener.ws.external.proxy_address_header -## -## Value: String -## listener.wss.external.proxy_address_header = X-Forwarded-For - -## See: listener.ws.external.proxy_port_header -## -## Value: String -## listener.wss.external.proxy_port_header = X-Forwarded-Port - -## Enable the Proxy Protocol V1/2 support. -## -## See: listener.tcp..proxy_protocol -## -## Value: on | off -## listener.wss.external.proxy_protocol = on - -## Sets the timeout for proxy protocol. -## -## See: listener.tcp..proxy_protocol_timeout -## -## Value: Duration -## listener.wss.external.proxy_protocol_timeout = 3s - ## TLS versions only to protect from POODLE attack. ## ## See: listener.ssl..tls_versions @@ -1364,13 +1277,6 @@ listener.wss.external.access.1 = allow all ## Value: String, seperated by ',' ## listener.wss.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -## TLS Handshake timeout. -## -## See: listener.ssl..handshake_timeout -## -## Value: Duration -listener.wss.external.handshake_timeout = 15s - ## Path to the file containing the user's private PEM-encoded key. ## ## See: listener.ssl..keyfile @@ -1481,10 +1387,7 @@ listener.wss.external.send_timeout_close = on ## Value: true | false ## listener.wss.external.nodelay = true -## The SO_REUSEADDR flag for WebSocket/SSL listener. -## -## Value: true | false -listener.wss.external.reuseaddr = true +listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA ##------------------------------------------------------------------- ## System Monitor diff --git a/include/emqx.hrl b/include/emqx.hrl index 7340cf835..3b42713ff 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -192,13 +192,13 @@ %%-------------------------------------------------------------------- -record(plugin, { - name :: atom(), - version :: string(), - dir :: string(), - descr :: string(), - vendor :: string(), - active :: boolean(), - info :: map() + name :: atom(), + version :: string(), + dir :: string(), + descr :: string(), + vendor :: string(), + active = false :: boolean(), + info :: map() }). -type(plugin() :: #plugin{}). diff --git a/priv/emqx.schema b/priv/emqx.schema index 7ff5fd0a3..7db77f1fb 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -497,7 +497,7 @@ end}. ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - ConsoleHandler = {lager_console_backend, [ConsoleLogLevel]}, + ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]}, ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, {level, ConsoleLogLevel}, {size, cuttlefish:conf_get("log.console.size", Conf)}, @@ -1173,10 +1173,6 @@ end}. {datatype, integer} ]}. -{mapping, "listener.ws.$name.max_conn_rate", "emqx.listeners", [ - {datatype, integer} -]}. - {mapping, "listener.ws.$name.zone", "emqx.listeners", [ {datatype, string} ]}. @@ -1185,28 +1181,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.ws.$name.access.$id", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - -{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - -{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [ - {datatype, flag} -]}. - -{mapping, "listener.ws.$name.proxy_protocol_timeout", "emqx.listeners", [ - {datatype, {duration, ms}} -]}. - {mapping, "listener.ws.$name.backlog", "emqx.listeners", [ {default, 1024}, {datatype, integer} @@ -1247,11 +1221,6 @@ end}. hidden ]}. -{mapping, "listener.ws.$name.reuseaddr", "emqx.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - %%-------------------------------------------------------------------- %% MQTT/WebSocket/SSL Listeners @@ -1269,10 +1238,6 @@ end}. {datatype, integer} ]}. -{mapping, "listener.wss.$name.max_conn_rate", "emqx.listeners", [ - {datatype, integer} -]}. - {mapping, "listener.wss.$name.zone", "emqx.listeners", [ {datatype, string} ]}. @@ -1281,32 +1246,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.access.$id", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - -{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [ - {datatype, string}, - hidden -]}. - -{mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [ - {datatype, flag} -]}. - -{mapping, "listener.wss.$name.proxy_protocol_timeout", "emqx.listeners", [ - {datatype, {duration, ms}} -]}. - {mapping, "listener.wss.$name.backlog", "emqx.listeners", [ {default, 1024}, {datatype, integer} @@ -1347,11 +1286,6 @@ end}. hidden ]}. -{mapping, "listener.wss.$name.reuseaddr", "emqx.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - {mapping, "listener.wss.$name.tls_versions", "emqx.listeners", [ {datatype, string} ]}. @@ -1360,11 +1294,6 @@ end}. {datatype, string} ]}. -{mapping, "listener.wss.$name.handshake_timeout", "emqx.listeners", [ - {default, "15s"}, - {datatype, {duration, ms}} -]}. - {mapping, "listener.wss.$name.keyfile", "emqx.listeners", [ {datatype, string} ]}. @@ -1444,7 +1373,7 @@ end}. {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, - {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, true)}]) + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, @@ -1488,17 +1417,6 @@ end}. end end, - ApiListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - SslOpts1 = case SslOpts(Prefix) of [] -> []; SslOpts0 -> [{ssl_options, SslOpts0}] end, - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}|LisOpts(Prefix)] ++ SslOpts1}] - end - end, - lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) @@ -1506,106 +1424,9 @@ end}. ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] - ++ - [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. -%%-------------------------------------------------------------------- -%% MQTT REST API Listeners - -{mapping, "listener.api.$name", "emqx.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.api.$name.acceptors", "emqx.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.max_clients", "emqx.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.rate_limit", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.access.$id", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.backlog", "emqx.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.send_timeout", "emqx.listeners", [ - {datatype, {duration, ms}}, - {default, "15s"} -]}. - -{mapping, "listener.api.$name.send_timeout_close", "emqx.listeners", [ - {datatype, flag}, - {default, on} -]}. - -{mapping, "listener.api.$name.recbuf", "emqx.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.sndbuf", "emqx.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.buffer", "emqx.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.tune_buffer", "emqx.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.api.$name.nodelay", "emqx.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.api.$name.reuseaddr", "emqx.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.api.$name.handshake_timeout", "emqx.listeners", [ - {datatype, {duration, ms}} -]}. - -{mapping, "listener.api.$name.keyfile", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.certfile", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.cacertfile", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.verify", "emqx.listeners", [ - {datatype, atom} -]}. - -{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqx.listeners", [ - {datatype, {enum, [true, false]}} -]}. - %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqx.app.src b/src/emqx.app.src index 7d0dd0c4d..b7a195c8b 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,7 +3,7 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,mochiweb]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,minirest]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_cli.erl b/src/emqx_cli.erl index 4463f2b27..6be9093b5 100644 --- a/src/emqx_cli.erl +++ b/src/emqx_cli.erl @@ -14,7 +14,7 @@ -module(emqx_cli). --export([print/1, print/2, usage/1]). +-export([print/1, print/2, usage/1, usage/2]). print(Msg) -> io:format(Msg). @@ -28,3 +28,5 @@ usage(CmdList) -> io:format("~-48s# ~s~n", [Cmd, Descr]) end, CmdList). +usage(Format, Args) -> + usage([{Format, Args}]). \ No newline at end of file diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 0adb07603..f05d63a29 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -84,7 +84,7 @@ unregister_client(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid %% @doc Lookup client pid -spec(lookup_client_pid(client_id()) -> pid() | undefined). lookup_client_pid(ClientId) when is_binary(ClientId) -> - case lookup_client_pid(ClientId) of + case ets:lookup(?CLIENT, ClientId) of [] -> undefined; [{_, Pid}] -> Pid end. diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 46d97e757..c16bd23cb 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -18,7 +18,7 @@ -export([start_link/0]). -export([register_command/2, register_command/3, unregister_command/1]). --export([run_command/2, lookup_command/1]). +-export([run_command/1, run_command/2, lookup_command/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -48,7 +48,21 @@ unregister_command(Cmd) when is_atom(Cmd) -> cast(Msg) -> gen_server:cast(?SERVER, Msg). +run_command([]) -> + run_command(help, []); +run_command([Cmd | Args]) -> + run_command(list_to_atom(Cmd), Args). + -spec(run_command(cmd(), [string()]) -> ok | {error, term()}). +run_command(set, []) -> + emqx_mgmt_cli_cfg:set_usage(), ok; + +run_command(set, Args) -> + emqx_mgmt_cli_cfg:run(["config" | Args]), ok; + +run_command(show, Args) -> + emqx_mgmt_cli_cfg:run(["config" | Args]), ok; + run_command(help, []) -> usage(); run_command(Cmd, Args) when is_atom(Cmd) -> diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 9e8445414..257820fb1 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -36,21 +36,31 @@ start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls -> start_mqtt_listener('mqtt:ssl', ListenOn, Options); %% Start MQTT/WS listener start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws -> - start_http_listener('mqtt:ws', ListenOn, Options); + Dispatch = [{"/mqtt", emqx_ws, []}], + NumAcceptors = proplists:get_value(acceptors, Options, 4), + MaxConnections = proplists:get_value(max_clients, Options, 1024), + TcpOptions = proplists:get_value(tcp_options, Options, []), + Options1 = [{port, ListenOn}, + {num_acceptors, NumAcceptors}, + {max_connections, MaxConnections} | TcpOptions], + minirest:start_http(Proto, Options1, Dispatch); %% Start MQTT/WSS listener start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> - start_http_listener('mqtt:wss', ListenOn, Options). + Dispatch = [{"/mqtt", emqx_ws, []}], + NumAcceptors = proplists:get_value(acceptors, Options, 4), + MaxConnections = proplists:get_value(max_clients, Options, 1024), + TcpOptions = proplists:get_value(tcp_options, Options, []), + SslOptions = proplists:get_value(ssl_options, Options, []), + Options1 = [{port, ListenOn}, + {num_acceptors, NumAcceptors}, + {max_connections, MaxConnections} | TcpOptions ++ SslOptions], + minirest:start_https(Proto, Options1, Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> SockOpts = esockd:parse_opt(Options), MFA = {emqx_connection, start_link, [Options -- SockOpts]}, {ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA). -start_http_listener(Name, ListenOn, Options) -> - SockOpts = esockd:parse_opt(Options), - MFA = {emqx_ws, handle_request, [Options -- SockOpts]}, - {ok, _} = mochiweb:start_http(Name, ListenOn, SockOpts, MFA). - %% @doc Restart all listeners -spec(restart_all() -> ok). restart_all() -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 30b2c0294..58c2279c9 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -352,11 +352,11 @@ send(Packet = ?PACKET(Type), ProtoState = #proto_state{proto_ver = Ver, sockprops = #{sendfun := SendFun}}) -> Data = emqx_frame:serialize(Packet, #{version => Ver}), case SendFun(Data) of - ok -> emqx_metrics:sent(Packet), - trace(send, Packet, ProtoState), - {ok, inc_stats(send, Type, ProtoState)}; {error, Reason} -> - {error, Reason} + {error, Reason}; + _ -> emqx_metrics:sent(Packet), + trace(send, Packet, ProtoState), + {ok, inc_stats(send, Type, ProtoState)} end. trace(recv, Packet, ProtoState) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 9380f38f7..2e4772f05 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -38,7 +38,7 @@ -define(TAB, emqx_shared_subscription). -record(state, {pmon}). --record(shared_subscription, {group, topic, subpid}). +-record(emqx_shared_subscription, {group, topic, subpid}). %%------------------------------------------------------------------------------ %% Mnesia bootstrap @@ -48,8 +48,8 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ {type, bag}, {ram_copies, [node()]}, - {record_name, shared_subscription}, - {attributes, record_info(fields, shared_subscription)}]); + {record_name, emqx_shared_subscription}, + {attributes, record_info(fields, emqx_shared_subscription)}]); mnesia(copy) -> ok = ekka_mnesia:copy_table(?TAB). @@ -78,7 +78,7 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)). record(Group, Topic, SubPid) -> - #shared_subscription{group = Group, topic = Topic, subpid = SubPid}. + #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. %% TODO: dispatch strategy, ensure the delivery... dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> @@ -110,7 +110,7 @@ init([]) -> init_monitors() -> mnesia:foldl( - fun(#shared_subscription{subpid = SubPid}, Mon) -> + fun(#emqx_shared_subscription{subpid = SubPid}, Mon) -> emqx_pmon:monitor(SubPid, Mon) end, emqx_pmon:new(), ?TAB). @@ -126,11 +126,11 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) -> - #shared_subscription{subpid = SubPid} = NewRecord, + #emqx_shared_subscription{subpid = SubPid} = NewRecord, {noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> - #shared_subscription{subpid = SubPid} = OldRecord, + #emqx_shared_subscription{subpid = SubPid} = OldRecord, {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; handle_info({mnesia_table_event, _Event}, State) -> @@ -138,7 +138,7 @@ handle_info({mnesia_table_event, _Event}, State) -> handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> emqx_logger:info("[SharedSub] shared subscriber down: ~p", [SubPid]), - mnesia:async_dirty(fun cleanup_down/1, [SubPid]), + cleanup_down(SubPid), {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; handle_info(Info, State) -> @@ -156,8 +156,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cleanup_down(SubPid) -> - lists:foreach(fun(Record) -> mnesia:delete_object(?TAB, Record) end, - mnesia:match_object(#shared_subscription{_ = '_', subpid = SubPid})). + lists:foreach( + fun(Record) -> + mnesia:dirty_delete_object(?TAB, Record) + end,mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). update_stats(State) -> emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 00f4bff3d..afa2d6b06 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -20,7 +20,7 @@ -export([start_link/0]). --export([open_session/1, lookup_session/1, close_session/1]). +-export([open_session/1, lookup_session/1, close_session/1, lookup_session_pid/1]). -export([resume_session/1, resume_session/2, discard_session/1, discard_session/2]). -export([register_session/2, get_session_attrs/1, unregister_session/1]). -export([get_session_stats/1, set_session_stats/2]). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 3df468f1e..b3378ba91 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -72,7 +72,7 @@ init([]) -> %% Connection Manager CMSup = supervisor_spec(emqx_cm_sup), %% WebSocket Connection Sup - %% WSConnSup = supervisor_spec(emqx_ws_connection_sup), + WSConnSup = supervisor_spec(emqx_ws_connection_sup), %% Sys Sup SysSup = supervisor_spec(emqx_sys_sup), {ok, {{one_for_all, 0, 1}, @@ -84,7 +84,7 @@ init([]) -> SMSup, SessionSup, CMSup, - %%WSConnSup, + WSConnSup, SysSup]}}. %%-------------------------------------------------------------------- diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index 0f3a4eaa4..bf6388232 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -199,7 +199,7 @@ mem_info() -> {used_memory, proplists:get_value(total_memory, Dataset) - proplists:get_value(free_memory, Dataset)}]. ftos(F) -> - [S] = io_lib:format("~.2f", [F]), S. + S = io_lib:format("~.2f", [F]), S. %%%% erlang vm scheduler_usage fun copied from recon scheduler_usage(Interval) when is_integer(Interval) -> diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index e7ad63d61..b84c603af 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -20,100 +20,83 @@ -import(proplists, [get_value/3]). --export([handle_request/1, ws_loop/3]). - %% WebSocket Loop State --record(wsocket_state, {peername, client_pid, max_packet_size, parser}). +-record(wsocket_state, {req, peername, client_pid, max_packet_size, parser}). -define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("WsClient(~s): " ++ Format, - [esockd_net:format(State#wsocket_state.peername) | Args])). + lager:Level("WsClient(~s): " ++ Format, + [esockd_net:format(State#wsocket_state.peername) | Args])). +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). -handle_request(Req) -> - handle_request(Req:get(method), Req:get(path), Req). - -%%-------------------------------------------------------------------- -%% MQTT Over WebSocket -%%-------------------------------------------------------------------- - -handle_request('GET', "/mqtt", Req) -> - emqx_logger:debug("WebSocket Connection from: ~s", [Req:get(peer)]), - Upgrade = Req:get_header_value("Upgrade"), - Proto = check_protocol_header(Req), - case {is_websocket(Upgrade), Proto} of - {true, "mqtt" ++ _Vsn} -> - case Req:get(peername) of - {ok, Peername} -> - {ok, ProtoEnv} = emqx_config:get_env(protocol), - PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), - Parser = emqx_parser:initial_state(PacketSize), - %% Upgrade WebSocket. - {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), - {ok, ClientPid} = emqx_ws_conn_sup:start_connection(self(), Req, ReplyChannel), - ReentryWs(#wsocket_state{peername = Peername, - parser = Parser, - max_packet_size = PacketSize, - client_pid = ClientPid}); - {error, Reason} -> - emqx_logger:error("Get peername with error ~s", [Reason]), - Req:respond({400, [], <<"Bad Request">>}) - end; - {false, _} -> - emqx_logger:error("Not WebSocket: Upgrade = ~s", [Upgrade]), - Req:respond({400, [], <<"Bad Request">>}); - {_, Proto} -> - emqx_logger:error("WebSocket with error Protocol: ~s", [Proto]), - Req:respond({400, [], <<"Bad WebSocket Protocol">>}) - end; - -handle_request(Method, Path, Req) -> - emqx_logger:error("Unexpected WS Request: ~s ~s", [Method, Path]), - Req:not_found(). - -is_websocket(Upgrade) -> - (not emqx_config:get_env(websocket_check_upgrade_header, true)) orelse - (Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket"). - -check_protocol_header(Req) -> - case emqx_config:get_env(websocket_protocol_header, false) of - true -> get_protocol_header(Req); - false -> "mqtt-v3.1.1" +init(Req0, State) -> + case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req0) of + undefined -> + {cowboy_websocket, Req0, #wsocket_state{}}; + Subprotocols -> + case lists:member(<<"mqtt">>, Subprotocols) of + true -> + Peername = cowboy_req:peer(Req0), + Req = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req0), + {cowboy_websocket, Req, #wsocket_state{req = Req, peername = Peername}, #{idle_timeout => 86400000}}; + false -> + Req = cowboy_req:reply(400, Req0), + {ok, Req, #wsocket_state{}} + end end. -get_protocol_header(Req) -> - case Req:get_header_value("EMQ-WebSocket-Protocol") of - undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); - Proto -> Proto +websocket_init(State = #wsocket_state{req = Req}) -> + case emqx_ws_connection_sup:start_connection(self(), Req) of + {ok, ClientPid} -> + {ok, ProtoEnv} = emqx_config:get_env(protocol), + PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), + Parser = emqx_frame:initial_state(#{max_packet_size => PacketSize}), + NewState = State#wsocket_state{parser = Parser, + max_packet_size = PacketSize, + client_pid = ClientPid}, + {ok, NewState}; + Error -> + ?WSLOG(error, "Start client fail: ~p", [Error], State), + {stop, State} end. -%%-------------------------------------------------------------------- -%% Receive Loop -%%-------------------------------------------------------------------- +websocket_handle({binary, <<>>}, State) -> + {ok, State}; +websocket_handle({binary, [<<>>]}, State) -> + {ok, State}; -%% @doc WebSocket frame receive loop. -ws_loop(<<>>, State, _ReplyChannel) -> - State; -ws_loop([<<>>], State, _ReplyChannel) -> - State; -ws_loop(Data, State = #wsocket_state{client_pid = ClientPid, parser = Parser}, ReplyChannel) -> +websocket_handle({binary, Data}, State = #wsocket_state{client_pid = ClientPid, parser = Parser}) -> ?WSLOG(debug, "RECV ~p", [Data], State), - emqx_metrics:inc('bytes/received', iolist_size(Data)), - case catch emqx_parser:parse(iolist_to_binary(Data), Parser) of + BinSize = iolist_size(Data), + emqx_metrics:inc('bytes/received', BinSize), + case catch emqx_frame:parse(iolist_to_binary(Data), Parser) of {more, NewParser} -> - State#wsocket_state{parser = NewParser}; + {ok, State#wsocket_state{parser = NewParser}}; {ok, Packet, Rest} -> - gen_server:cast(ClientPid, {received, Packet}), - ws_loop(Rest, reset_parser(State), ReplyChannel); + gen_server:cast(ClientPid, {received, Packet, BinSize}), + websocket_handle({binary, Rest}, reset_parser(State)); {error, Error} -> ?WSLOG(error, "Frame error: ~p", [Error], State), - exit({shutdown, Error}); + {stop, State}; {'EXIT', Reason} -> ?WSLOG(error, "Frame error: ~p", [Reason], State), ?WSLOG(error, "Error data: ~p", [Data], State), - exit({shutdown, parser_error}) + {stop, State} end. -reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) -> - State#wsocket_state{parser = emqx_parser:initial_state(PacketSize)}. +websocket_info({binary, Data}, State) -> + {reply, {binary, Data}, State}; + +websocket_info({'EXIT', _Pid, {shutdown, kick}}, State) -> + {stop, State}; + +websocket_info(_Info, State) -> + {ok, State}. + +reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) -> + State#wsocket_state{parser = emqx_frame:initial_state(#{max_packet_size => PacketSize})}. + diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index dadc2cc57..93a289636 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -23,7 +23,7 @@ -import(proplists, [get_value/2, get_value/3]). %% API Exports --export([start_link/4]). +-export([start_link/3]). %% Management and Monitor API -export([info/1, stats/1, kick/1, clean_acl_cache/2]). @@ -38,13 +38,15 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% TODO: remove ... --export([handle_pre_hibernate/1]). - %% WebSocket Client State --record(wsclient_state, {ws_pid, transport, socket, peername, - proto_state, keepalive, enable_stats, - force_gc_count}). +-record(wsclient_state, {ws_pid, peername, proto_state, keepalive, + enable_stats, force_gc_count}). + +%% recv_oct +%% Number of bytes received by the socket. + +%% recv_cnt +%% Number of packets received by the socket. -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -53,8 +55,8 @@ [esockd_net:format(State#wsclient_state.peername) | Args])). %% @doc Start WebSocket Client. -start_link(Env, WsPid, Req, ReplyChannel) -> - gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], +start_link(Env, WsPid, Req) -> + gen_server:start_link(?MODULE, [Env, WsPid, Req], [[{hibernate_after, 10000}]]). info(CPid) -> @@ -82,38 +84,29 @@ clean_acl_cache(CPid, Topic) -> %% gen_server Callbacks %%-------------------------------------------------------------------- -init([Env, WsPid, Req, ReplyChannel]) -> +init([Options, WsPid, Req]) -> + init_stas(), process_flag(trap_exit, true), true = link(WsPid), - Transport = mochiweb_request:get(transport, Req), - Sock = mochiweb_request:get(socket, Req), - case mochiweb_request:get(peername, Req) of - {ok, Peername} -> - Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)), - ProtoState = emqx_protocol:init(Transport, Sock, Peername, send_fun(ReplyChannel), - [{ws_initial_headers, Headers} | Env]), - IdleTimeout = get_value(client_idle_timeout, Env, 30000), - EnableStats = get_value(client_enable_stats, Env, false), - ForceGcCount = emqx_gc:conn_max_gc_count(), - {ok, #wsclient_state{transport = Transport, - socket = Sock, - ws_pid = WsPid, - peername = Peername, - proto_state = ProtoState, - enable_stats = EnableStats, - force_gc_count = ForceGcCount}, - IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}; - {error, enotconn} -> Transport:fast_close(Sock), - exit(WsPid, normal), - exit(normal); - {error, Reason} -> Transport:fast_close(Sock), - exit(WsPid, normal), - exit({shutdown, Reason}) - end. - -handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) -> - erlang:garbage_collect(WsPid), - {hibernate, emqx_gc:reset_conn_gc_count(#wsclient_state.force_gc_count, emit_stats(State))}. + Peername = cowboy_req:peer(Req), + Headers = cowboy_req:headers(Req), + Sockname = cowboy_req:sock(Req), + Peercert = cowboy_req:cert(Req), + Zone = proplists:get_value(zone, Options), + ProtoState = emqx_protocol:init(#{zone => Zone, + peername => Peername, + sockname => Sockname, + peercert => Peercert, + sendfun => send_fun(WsPid)}, + [{ws_initial_headers, Headers} | Options]), + IdleTimeout = get_value(client_idle_timeout, Options, 30000), + EnableStats = get_value(client_enable_stats, Options, false), + ForceGcCount = emqx_gc:conn_max_gc_count(), + {ok, #wsclient_state{ws_pid = WsPid, + peername = Peername, + proto_state = ProtoState, + enable_stats = EnableStats, + force_gc_count = ForceGcCount}, IdleTimeout}. handle_call(info, From, State = #wsclient_state{peername = Peername, proto_state = ProtoState}) -> @@ -123,7 +116,7 @@ handle_call(info, From, State = #wsclient_state{peername = Peername, handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) -> reply(lists:append([emqx_misc:proc_stats(), - wsock_stats(State), + wsock_stats(), emqx_protocol:stats(ProtoState)]), State); handle_call(kick, _From, State) -> @@ -140,7 +133,9 @@ handle_call(Req, _From, State) -> ?WSLOG(error, "Unexpected request: ~p", [Req], State), reply({error, unexpected_request}, State). -handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) -> +handle_cast({received, Packet, BinSize}, State = #wsclient_state{proto_state = ProtoState}) -> + put(recv_oct, get(recv_oct) + BinSize), + put(recv_cnt, get(recv_cnt) + 1), emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> @@ -158,48 +153,24 @@ handle_cast(Msg, State) -> ?WSLOG(error, "unexpected msg: ~p", [Msg], State), {noreply, State}. -handle_info({subscribe, TopicTable}, State) -> +handle_info(SubReq ={subscribe, _TopicTable}, State) -> with_proto( fun(ProtoState) -> - emqx_protocol:subscribe(TopicTable, ProtoState) + emqx_protocol:process(SubReq, ProtoState) end, State); -handle_info({unsubscribe, Topics}, State) -> +handle_info(UnsubReq = {unsubscribe, _Topics}, State) -> with_proto( fun(ProtoState) -> - emqx_protocol:unsubscribe(Topics, ProtoState) + emqx_protocol:process(UnsubReq, ProtoState) end, State); -handle_info({suback, PacketId, ReasonCodes}, State) -> +handle_info({deliver, PubOrAck}, State) -> with_proto( fun(ProtoState) -> - Packet = ?SUBACK_PACKET(PacketId, ReasonCodes), - emqx_protocol:send(Packet, ProtoState) - end, State); - -handle_info({unsuback, PacketId, ReasonCodes}, State) -> - with_proto( - fun(ProtoState) -> - Packet = ?UNSUBACK_PACKET(PacketId, ReasonCodes), - emqx_protocol:send(Packet, ProtoState) - end, State); - -%% Fastlane -handle_info({dispatch, _Topic, Message}, State) -> - handle_info({deliver, Message#message{qos = ?QOS_0}}, State); - -handle_info({deliver, Message}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:send(Message, ProtoState) + emqx_protocol:deliver(PubOrAck, ProtoState) end, gc(State)); -handle_info({redeliver, {?PUBREL, PacketId}}, State) -> - with_proto( - fun(ProtoState) -> - emqx_protocol:pubrel(PacketId, ProtoState) - end, State); - handle_info(emit_stats, State) -> {noreply, emit_stats(State), hibernate}; @@ -213,10 +184,9 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> handle_info({shutdown, Reason}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, - State = #wsclient_state{transport = Transport, socket =Sock}) -> +handle_info({keepalive, start, Interval}, State) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(stat_fun(Transport, Sock), Interval, {keepalive, check}) of + case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate}; {error, Error} -> @@ -271,23 +241,18 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -send_fun(ReplyChannel) -> - Self = self(), - fun(Packet) -> - Data = emqx_frame:serialize(Packet), - emqx_metrics:inc('bytes/sent', iolist_size(Data)), - case ReplyChannel({binary, Data}) of - ok -> ok; - {error, Reason} -> Self ! {shutdown, Reason} - end +send_fun(WsPid) -> + fun(Data) -> + BinSize = iolist_size(Data), + emqx_metrics:inc('bytes/sent', BinSize), + put(send_oct, get(send_oct) + BinSize), + put(send_cnt, get(send_cnt) + 1), + WsPid ! {binary, iolist_to_binary(Data)} end. -stat_fun(Transport, Sock) -> +stat_fun() -> fun() -> - case Transport:getstat(Sock, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - {error, Error} -> {error, Error} - end + {ok, get(recv_oct)} end. emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> @@ -302,11 +267,8 @@ emit_stats(ClientId, State) -> emqx_cm:set_client_stats(ClientId, Stats), State. -wsock_stats(#wsclient_state{transport = Transport, socket = Sock}) -> - case Transport:getstat(Sock, ?SOCK_STATS) of - {ok, Ss} -> Ss; - {error, _} -> [] - end. +wsock_stats() -> + [{Key, get(Key)}|| Key <- ?SOCK_STATS]. with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), @@ -325,3 +287,9 @@ gc(State) -> Cb = fun() -> emit_stats(State) end, emqx_gc:maybe_force_gc(#wsclient_state.force_gc_count, State, Cb). +init_stas() -> + put(recv_oct, 0), + put(recv_cnt, 0), + put(send_oct, 0), + put(send_cnt, 0). + diff --git a/src/emqx_ws_connection_sup.erl b/src/emqx_ws_connection_sup.erl index b58e7c956..f627abfbb 100644 --- a/src/emqx_ws_connection_sup.erl +++ b/src/emqx_ws_connection_sup.erl @@ -18,7 +18,7 @@ -behavior(supervisor). --export([start_link/0, start_connection/3]). +-export([start_link/0, start_connection/2]). -export([init/1]). @@ -27,9 +27,9 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc Start a MQTT/WebSocket Connection. --spec(start_connection(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). -start_connection(WsPid, Req, ReplyChannel) -> - supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]). +-spec(start_connection(pid(), mochiweb_request:request()) -> {ok, pid()}). +start_connection(WsPid, Req) -> + supervisor:start_child(?MODULE, [WsPid, Req]). %%-------------------------------------------------------------------- %% Supervisor callbacks