Refactor websocket conn using cowboy

This commit is contained in:
turtled 2018-08-08 18:36:14 +08:00
parent 86348b5c94
commit 4d9e03a803
17 changed files with 194 additions and 491 deletions

View File

@ -4,7 +4,7 @@ PROJECT = emqx
PROJECT_DESCRIPTION = EMQ X Broker
PROJECT_VERSION = 3.0
DEPS = jsx gproc gen_rpc lager ekka esockd mochiweb clique
DEPS = jsx gproc gen_rpc lager ekka esockd minirest clique
dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git https://github.com/uwiger/gproc 0.8.0
@ -12,7 +12,7 @@ dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.1.1
dep_lager = git https://github.com/erlang-lager/lager 3.6.4
dep_esockd = git https://github.com/emqx/esockd emqx30
dep_ekka = git https://github.com/emqx/ekka emqx30
dep_mochiweb = git https://github.com/emqtt/mochiweb emqx30
dep_minirest = git https://github.com/emqx/minirest emqx30
dep_clique = git https://github.com/emqx/clique
NO_AUTOPATCH = gen_rpc cuttlefish

View File

@ -1176,57 +1176,11 @@ listener.ws.external.acceptors = 4
## Value: Number
listener.ws.external.max_clients = 102400
## Maximum MQTT/WebSocket connections per second.
##
## Value: Number
listener.ws.external.max_conn_rate = 1000
## Zone of the external MQTT/WebSocket listener belonged to.
##
## Value: String
listener.ws.external.zone = external
## Mountpoint of the MQTT/WebSocket Listener.
##
## See: listener.tcp.<name>.mountpoint
##
## Value: String
## listener.ws.external.mountpoint = external/
## The access control for the MQTT/WebSocket listener.
##
## See: listener.tcp.<name>.access
##
## Value: ACL Rule
listener.ws.external.access.1 = allow all
## Use X-Forwarded-For header for real source IP if the EMQ cluster is
## deployed behind NGINX or HAProxy.
##
## Value: String
## listener.ws.external.proxy_address_header = X-Forwarded-For
## Use X-Forwarded-Port header for real source port if the EMQ cluster is
## deployed behind NGINX or HAProxy.
##
## Value: String
## listener.ws.external.proxy_port_header = X-Forwarded-Port
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
##
## See: listener.tcp.<name>.proxy_protocol
##
## Value: on | off
## listener.ws.external.proxy_protocol = on
## Sets the timeout for proxy protocol.
##
## See: listener.tcp.<name>.proxy_protocol_timeout
##
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.tcp.<name>.backlog
@ -1283,11 +1237,6 @@ listener.ws.external.send_timeout_close = on
## Value: true | false
listener.ws.external.nodelay = true
## The SO_REUSEADDR flag for MQTT/WebSocket Listener.
##
## Value: true | false
listener.ws.external.reuseaddr = true
##--------------------------------------------------------------------
## External WebSocket/SSL listener for MQTT Protocol
@ -1309,11 +1258,6 @@ listener.wss.external.acceptors = 4
## Value: Number
listener.wss.external.max_clients = 64
## Maximum MQTT/WebSocket/SSL connections per second.
##
## Value: Number
listener.wss.external.max_conn_rate = 1000
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
##
## Value: String
@ -1326,37 +1270,6 @@ listener.wss.external.zone = external
## Value: String
## listener.wss.external.mountpoint = inbound/
## The access control rules for the MQTT/WebSocket/SSL listener.
##
## See: listener.tcp.<name>.access.<no>
##
## Value: ACL Rule
listener.wss.external.access.1 = allow all
## See: listener.ws.external.proxy_address_header
##
## Value: String
## listener.wss.external.proxy_address_header = X-Forwarded-For
## See: listener.ws.external.proxy_port_header
##
## Value: String
## listener.wss.external.proxy_port_header = X-Forwarded-Port
## Enable the Proxy Protocol V1/2 support.
##
## See: listener.tcp.<name>.proxy_protocol
##
## Value: on | off
## listener.wss.external.proxy_protocol = on
## Sets the timeout for proxy protocol.
##
## See: listener.tcp.<name>.proxy_protocol_timeout
##
## Value: Duration
## listener.wss.external.proxy_protocol_timeout = 3s
## TLS versions only to protect from POODLE attack.
##
## See: listener.ssl.<name>.tls_versions
@ -1364,13 +1277,6 @@ listener.wss.external.access.1 = allow all
## Value: String, seperated by ','
## listener.wss.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## TLS Handshake timeout.
##
## See: listener.ssl.<name>.handshake_timeout
##
## Value: Duration
listener.wss.external.handshake_timeout = 15s
## Path to the file containing the user's private PEM-encoded key.
##
## See: listener.ssl.<name>.keyfile
@ -1481,10 +1387,7 @@ listener.wss.external.send_timeout_close = on
## Value: true | false
## listener.wss.external.nodelay = true
## The SO_REUSEADDR flag for WebSocket/SSL listener.
##
## Value: true | false
listener.wss.external.reuseaddr = true
listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
##-------------------------------------------------------------------
## System Monitor

View File

@ -192,13 +192,13 @@
%%--------------------------------------------------------------------
-record(plugin, {
name :: atom(),
version :: string(),
dir :: string(),
descr :: string(),
vendor :: string(),
active :: boolean(),
info :: map()
name :: atom(),
version :: string(),
dir :: string(),
descr :: string(),
vendor :: string(),
active = false :: boolean(),
info :: map()
}).
-type(plugin() :: #plugin{}).

View File

@ -497,7 +497,7 @@ end}.
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
ConsoleHandler = {lager_console_backend, [ConsoleLogLevel]},
ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]},
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
{level, ConsoleLogLevel},
{size, cuttlefish:conf_get("log.console.size", Conf)},
@ -1173,10 +1173,6 @@ end}.
{datatype, integer}
]}.
{mapping, "listener.ws.$name.max_conn_rate", "emqx.listeners", [
{datatype, integer}
]}.
{mapping, "listener.ws.$name.zone", "emqx.listeners", [
{datatype, string}
]}.
@ -1185,28 +1181,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.ws.$name.access.$id", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [
{datatype, flag}
]}.
{mapping, "listener.ws.$name.proxy_protocol_timeout", "emqx.listeners", [
{datatype, {duration, ms}}
]}.
{mapping, "listener.ws.$name.backlog", "emqx.listeners", [
{default, 1024},
{datatype, integer}
@ -1247,11 +1221,6 @@ end}.
hidden
]}.
{mapping, "listener.ws.$name.reuseaddr", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
%%--------------------------------------------------------------------
%% MQTT/WebSocket/SSL Listeners
@ -1269,10 +1238,6 @@ end}.
{datatype, integer}
]}.
{mapping, "listener.wss.$name.max_conn_rate", "emqx.listeners", [
{datatype, integer}
]}.
{mapping, "listener.wss.$name.zone", "emqx.listeners", [
{datatype, string}
]}.
@ -1281,32 +1246,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.access.$id", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [
{datatype, string},
hidden
]}.
{mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [
{datatype, flag}
]}.
{mapping, "listener.wss.$name.proxy_protocol_timeout", "emqx.listeners", [
{datatype, {duration, ms}}
]}.
{mapping, "listener.wss.$name.backlog", "emqx.listeners", [
{default, 1024},
{datatype, integer}
@ -1347,11 +1286,6 @@ end}.
hidden
]}.
{mapping, "listener.wss.$name.reuseaddr", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.wss.$name.tls_versions", "emqx.listeners", [
{datatype, string}
]}.
@ -1360,11 +1294,6 @@ end}.
{datatype, string}
]}.
{mapping, "listener.wss.$name.handshake_timeout", "emqx.listeners", [
{default, "15s"},
{datatype, {duration, ms}}
]}.
{mapping, "listener.wss.$name.keyfile", "emqx.listeners", [
{datatype, string}
]}.
@ -1444,7 +1373,7 @@ end}.
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)},
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, true)}])
{reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}])
end,
SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
@ -1488,17 +1417,6 @@ end}.
end
end,
ApiListeners = fun(Type, Name) ->
Prefix = string:join(["listener", Type, Name], "."),
case cuttlefish:conf_get(Prefix, Conf, undefined) of
undefined ->
[];
ListenOn ->
SslOpts1 = case SslOpts(Prefix) of [] -> []; SslOpts0 -> [{ssl_options, SslOpts0}] end,
[{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}|LisOpts(Prefix)] ++ SslOpts1}]
end
end,
lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn}
<- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf)
@ -1506,106 +1424,9 @@ end}.
++
[SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
<- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]
++
[ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn}
<- cuttlefish_variable:filter_by_prefix("listener.api", Conf)])
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
end}.
%%--------------------------------------------------------------------
%% MQTT REST API Listeners
{mapping, "listener.api.$name", "emqx.listeners", [
{datatype, [integer, ip]}
]}.
{mapping, "listener.api.$name.acceptors", "emqx.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "listener.api.$name.max_clients", "emqx.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "listener.api.$name.rate_limit", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.access.$id", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.backlog", "emqx.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "listener.api.$name.send_timeout", "emqx.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.api.$name.send_timeout_close", "emqx.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.api.$name.recbuf", "emqx.listeners", [
{datatype, bytesize},
hidden
]}.
{mapping, "listener.api.$name.sndbuf", "emqx.listeners", [
{datatype, bytesize},
hidden
]}.
{mapping, "listener.api.$name.buffer", "emqx.listeners", [
{datatype, bytesize},
hidden
]}.
{mapping, "listener.api.$name.tune_buffer", "emqx.listeners", [
{datatype, flag},
hidden
]}.
{mapping, "listener.api.$name.nodelay", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.api.$name.reuseaddr", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.api.$name.handshake_timeout", "emqx.listeners", [
{datatype, {duration, ms}}
]}.
{mapping, "listener.api.$name.keyfile", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.certfile", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.cacertfile", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.verify", "emqx.listeners", [
{datatype, atom}
]}.
{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqx.listeners", [
{datatype, {enum, [true, false]}}
]}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------

View File

@ -3,7 +3,7 @@
{vsn,"3.0"},
{modules,[]},
{registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,mochiweb]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,minirest]},
{env,[]},
{mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]},

View File

@ -14,7 +14,7 @@
-module(emqx_cli).
-export([print/1, print/2, usage/1]).
-export([print/1, print/2, usage/1, usage/2]).
print(Msg) ->
io:format(Msg).
@ -28,3 +28,5 @@ usage(CmdList) ->
io:format("~-48s# ~s~n", [Cmd, Descr])
end, CmdList).
usage(Format, Args) ->
usage([{Format, Args}]).

View File

@ -84,7 +84,7 @@ unregister_client(CObj = {ClientId, ClientPid}) when is_binary(ClientId), is_pid
%% @doc Lookup client pid
-spec(lookup_client_pid(client_id()) -> pid() | undefined).
lookup_client_pid(ClientId) when is_binary(ClientId) ->
case lookup_client_pid(ClientId) of
case ets:lookup(?CLIENT, ClientId) of
[] -> undefined;
[{_, Pid}] -> Pid
end.

View File

@ -18,7 +18,7 @@
-export([start_link/0]).
-export([register_command/2, register_command/3, unregister_command/1]).
-export([run_command/2, lookup_command/1]).
-export([run_command/1, run_command/2, lookup_command/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@ -48,7 +48,21 @@ unregister_command(Cmd) when is_atom(Cmd) ->
cast(Msg) ->
gen_server:cast(?SERVER, Msg).
run_command([]) ->
run_command(help, []);
run_command([Cmd | Args]) ->
run_command(list_to_atom(Cmd), Args).
-spec(run_command(cmd(), [string()]) -> ok | {error, term()}).
run_command(set, []) ->
emqx_mgmt_cli_cfg:set_usage(), ok;
run_command(set, Args) ->
emqx_mgmt_cli_cfg:run(["config" | Args]), ok;
run_command(show, Args) ->
emqx_mgmt_cli_cfg:run(["config" | Args]), ok;
run_command(help, []) ->
usage();
run_command(Cmd, Args) when is_atom(Cmd) ->

View File

@ -36,21 +36,31 @@ start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls ->
start_mqtt_listener('mqtt:ssl', ListenOn, Options);
%% Start MQTT/WS listener
start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws ->
start_http_listener('mqtt:ws', ListenOn, Options);
Dispatch = [{"/mqtt", emqx_ws, []}],
NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_clients, Options, 1024),
TcpOptions = proplists:get_value(tcp_options, Options, []),
Options1 = [{port, ListenOn},
{num_acceptors, NumAcceptors},
{max_connections, MaxConnections} | TcpOptions],
minirest:start_http(Proto, Options1, Dispatch);
%% Start MQTT/WSS listener
start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss ->
start_http_listener('mqtt:wss', ListenOn, Options).
Dispatch = [{"/mqtt", emqx_ws, []}],
NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_clients, Options, 1024),
TcpOptions = proplists:get_value(tcp_options, Options, []),
SslOptions = proplists:get_value(ssl_options, Options, []),
Options1 = [{port, ListenOn},
{num_acceptors, NumAcceptors},
{max_connections, MaxConnections} | TcpOptions ++ SslOptions],
minirest:start_https(Proto, Options1, Dispatch).
start_mqtt_listener(Name, ListenOn, Options) ->
SockOpts = esockd:parse_opt(Options),
MFA = {emqx_connection, start_link, [Options -- SockOpts]},
{ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA).
start_http_listener(Name, ListenOn, Options) ->
SockOpts = esockd:parse_opt(Options),
MFA = {emqx_ws, handle_request, [Options -- SockOpts]},
{ok, _} = mochiweb:start_http(Name, ListenOn, SockOpts, MFA).
%% @doc Restart all listeners
-spec(restart_all() -> ok).
restart_all() ->

View File

@ -352,11 +352,11 @@ send(Packet = ?PACKET(Type), ProtoState = #proto_state{proto_ver = Ver,
sockprops = #{sendfun := SendFun}}) ->
Data = emqx_frame:serialize(Packet, #{version => Ver}),
case SendFun(Data) of
ok -> emqx_metrics:sent(Packet),
trace(send, Packet, ProtoState),
{ok, inc_stats(send, Type, ProtoState)};
{error, Reason} ->
{error, Reason}
{error, Reason};
_ -> emqx_metrics:sent(Packet),
trace(send, Packet, ProtoState),
{ok, inc_stats(send, Type, ProtoState)}
end.
trace(recv, Packet, ProtoState) ->

View File

@ -38,7 +38,7 @@
-define(TAB, emqx_shared_subscription).
-record(state, {pmon}).
-record(shared_subscription, {group, topic, subpid}).
-record(emqx_shared_subscription, {group, topic, subpid}).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
@ -48,8 +48,8 @@ mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, bag},
{ram_copies, [node()]},
{record_name, shared_subscription},
{attributes, record_info(fields, shared_subscription)}]);
{record_name, emqx_shared_subscription},
{attributes, record_info(fields, emqx_shared_subscription)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB).
@ -78,7 +78,7 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)).
record(Group, Topic, SubPid) ->
#shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
%% TODO: dispatch strategy, ensure the delivery...
dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
@ -110,7 +110,7 @@ init([]) ->
init_monitors() ->
mnesia:foldl(
fun(#shared_subscription{subpid = SubPid}, Mon) ->
fun(#emqx_shared_subscription{subpid = SubPid}, Mon) ->
emqx_pmon:monitor(SubPid, Mon)
end, emqx_pmon:new(), ?TAB).
@ -126,11 +126,11 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
#shared_subscription{subpid = SubPid} = NewRecord,
#emqx_shared_subscription{subpid = SubPid} = NewRecord,
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
#shared_subscription{subpid = SubPid} = OldRecord,
#emqx_shared_subscription{subpid = SubPid} = OldRecord,
{noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
handle_info({mnesia_table_event, _Event}, State) ->
@ -138,7 +138,7 @@ handle_info({mnesia_table_event, _Event}, State) ->
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
emqx_logger:info("[SharedSub] shared subscriber down: ~p", [SubPid]),
mnesia:async_dirty(fun cleanup_down/1, [SubPid]),
cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
handle_info(Info, State) ->
@ -156,8 +156,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
cleanup_down(SubPid) ->
lists:foreach(fun(Record) -> mnesia:delete_object(?TAB, Record) end,
mnesia:match_object(#shared_subscription{_ = '_', subpid = SubPid})).
lists:foreach(
fun(Record) ->
mnesia:dirty_delete_object(?TAB, Record)
end,mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
update_stats(State) ->
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.

View File

@ -20,7 +20,7 @@
-export([start_link/0]).
-export([open_session/1, lookup_session/1, close_session/1]).
-export([open_session/1, lookup_session/1, close_session/1, lookup_session_pid/1]).
-export([resume_session/1, resume_session/2, discard_session/1, discard_session/2]).
-export([register_session/2, get_session_attrs/1, unregister_session/1]).
-export([get_session_stats/1, set_session_stats/2]).

View File

@ -72,7 +72,7 @@ init([]) ->
%% Connection Manager
CMSup = supervisor_spec(emqx_cm_sup),
%% WebSocket Connection Sup
%% WSConnSup = supervisor_spec(emqx_ws_connection_sup),
WSConnSup = supervisor_spec(emqx_ws_connection_sup),
%% Sys Sup
SysSup = supervisor_spec(emqx_sys_sup),
{ok, {{one_for_all, 0, 1},
@ -84,7 +84,7 @@ init([]) ->
SMSup,
SessionSup,
CMSup,
%%WSConnSup,
WSConnSup,
SysSup]}}.
%%--------------------------------------------------------------------

View File

@ -199,7 +199,7 @@ mem_info() ->
{used_memory, proplists:get_value(total_memory, Dataset) - proplists:get_value(free_memory, Dataset)}].
ftos(F) ->
[S] = io_lib:format("~.2f", [F]), S.
S = io_lib:format("~.2f", [F]), S.
%%%% erlang vm scheduler_usage fun copied from recon
scheduler_usage(Interval) when is_integer(Interval) ->

View File

@ -20,100 +20,83 @@
-import(proplists, [get_value/3]).
-export([handle_request/1, ws_loop/3]).
%% WebSocket Loop State
-record(wsocket_state, {peername, client_pid, max_packet_size, parser}).
-record(wsocket_state, {req, peername, client_pid, max_packet_size, parser}).
-define(WSLOG(Level, Format, Args, State),
emqx_logger:Level("WsClient(~s): " ++ Format,
[esockd_net:format(State#wsocket_state.peername) | Args])).
lager:Level("WsClient(~s): " ++ Format,
[esockd_net:format(State#wsocket_state.peername) | Args])).
-export([init/2]).
-export([websocket_init/1]).
-export([websocket_handle/2]).
-export([websocket_info/2]).
handle_request(Req) ->
handle_request(Req:get(method), Req:get(path), Req).
%%--------------------------------------------------------------------
%% MQTT Over WebSocket
%%--------------------------------------------------------------------
handle_request('GET', "/mqtt", Req) ->
emqx_logger:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"),
Proto = check_protocol_header(Req),
case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} ->
case Req:get(peername) of
{ok, Peername} ->
{ok, ProtoEnv} = emqx_config:get_env(protocol),
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
Parser = emqx_parser:initial_state(PacketSize),
%% Upgrade WebSocket.
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
{ok, ClientPid} = emqx_ws_conn_sup:start_connection(self(), Req, ReplyChannel),
ReentryWs(#wsocket_state{peername = Peername,
parser = Parser,
max_packet_size = PacketSize,
client_pid = ClientPid});
{error, Reason} ->
emqx_logger:error("Get peername with error ~s", [Reason]),
Req:respond({400, [], <<"Bad Request">>})
end;
{false, _} ->
emqx_logger:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
Req:respond({400, [], <<"Bad Request">>});
{_, Proto} ->
emqx_logger:error("WebSocket with error Protocol: ~s", [Proto]),
Req:respond({400, [], <<"Bad WebSocket Protocol">>})
end;
handle_request(Method, Path, Req) ->
emqx_logger:error("Unexpected WS Request: ~s ~s", [Method, Path]),
Req:not_found().
is_websocket(Upgrade) ->
(not emqx_config:get_env(websocket_check_upgrade_header, true)) orelse
(Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket").
check_protocol_header(Req) ->
case emqx_config:get_env(websocket_protocol_header, false) of
true -> get_protocol_header(Req);
false -> "mqtt-v3.1.1"
init(Req0, State) ->
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req0) of
undefined ->
{cowboy_websocket, Req0, #wsocket_state{}};
Subprotocols ->
case lists:member(<<"mqtt">>, Subprotocols) of
true ->
Peername = cowboy_req:peer(Req0),
Req = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req0),
{cowboy_websocket, Req, #wsocket_state{req = Req, peername = Peername}, #{idle_timeout => 86400000}};
false ->
Req = cowboy_req:reply(400, Req0),
{ok, Req, #wsocket_state{}}
end
end.
get_protocol_header(Req) ->
case Req:get_header_value("EMQ-WebSocket-Protocol") of
undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
Proto -> Proto
websocket_init(State = #wsocket_state{req = Req}) ->
case emqx_ws_connection_sup:start_connection(self(), Req) of
{ok, ClientPid} ->
{ok, ProtoEnv} = emqx_config:get_env(protocol),
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
Parser = emqx_frame:initial_state(#{max_packet_size => PacketSize}),
NewState = State#wsocket_state{parser = Parser,
max_packet_size = PacketSize,
client_pid = ClientPid},
{ok, NewState};
Error ->
?WSLOG(error, "Start client fail: ~p", [Error], State),
{stop, State}
end.
%%--------------------------------------------------------------------
%% Receive Loop
%%--------------------------------------------------------------------
websocket_handle({binary, <<>>}, State) ->
{ok, State};
websocket_handle({binary, [<<>>]}, State) ->
{ok, State};
%% @doc WebSocket frame receive loop.
ws_loop(<<>>, State, _ReplyChannel) ->
State;
ws_loop([<<>>], State, _ReplyChannel) ->
State;
ws_loop(Data, State = #wsocket_state{client_pid = ClientPid, parser = Parser}, ReplyChannel) ->
websocket_handle({binary, Data}, State = #wsocket_state{client_pid = ClientPid, parser = Parser}) ->
?WSLOG(debug, "RECV ~p", [Data], State),
emqx_metrics:inc('bytes/received', iolist_size(Data)),
case catch emqx_parser:parse(iolist_to_binary(Data), Parser) of
BinSize = iolist_size(Data),
emqx_metrics:inc('bytes/received', BinSize),
case catch emqx_frame:parse(iolist_to_binary(Data), Parser) of
{more, NewParser} ->
State#wsocket_state{parser = NewParser};
{ok, State#wsocket_state{parser = NewParser}};
{ok, Packet, Rest} ->
gen_server:cast(ClientPid, {received, Packet}),
ws_loop(Rest, reset_parser(State), ReplyChannel);
gen_server:cast(ClientPid, {received, Packet, BinSize}),
websocket_handle({binary, Rest}, reset_parser(State));
{error, Error} ->
?WSLOG(error, "Frame error: ~p", [Error], State),
exit({shutdown, Error});
{stop, State};
{'EXIT', Reason} ->
?WSLOG(error, "Frame error: ~p", [Reason], State),
?WSLOG(error, "Error data: ~p", [Data], State),
exit({shutdown, parser_error})
{stop, State}
end.
reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) ->
State#wsocket_state{parser = emqx_parser:initial_state(PacketSize)}.
websocket_info({binary, Data}, State) ->
{reply, {binary, Data}, State};
websocket_info({'EXIT', _Pid, {shutdown, kick}}, State) ->
{stop, State};
websocket_info(_Info, State) ->
{ok, State}.
reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) ->
State#wsocket_state{parser = emqx_frame:initial_state(#{max_packet_size => PacketSize})}.

View File

@ -23,7 +23,7 @@
-import(proplists, [get_value/2, get_value/3]).
%% API Exports
-export([start_link/4]).
-export([start_link/3]).
%% Management and Monitor API
-export([info/1, stats/1, kick/1, clean_acl_cache/2]).
@ -38,13 +38,15 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% TODO: remove ...
-export([handle_pre_hibernate/1]).
%% WebSocket Client State
-record(wsclient_state, {ws_pid, transport, socket, peername,
proto_state, keepalive, enable_stats,
force_gc_count}).
-record(wsclient_state, {ws_pid, peername, proto_state, keepalive,
enable_stats, force_gc_count}).
%% recv_oct
%% Number of bytes received by the socket.
%% recv_cnt
%% Number of packets received by the socket.
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -53,8 +55,8 @@
[esockd_net:format(State#wsclient_state.peername) | Args])).
%% @doc Start WebSocket Client.
start_link(Env, WsPid, Req, ReplyChannel) ->
gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel],
start_link(Env, WsPid, Req) ->
gen_server:start_link(?MODULE, [Env, WsPid, Req],
[[{hibernate_after, 10000}]]).
info(CPid) ->
@ -82,38 +84,29 @@ clean_acl_cache(CPid, Topic) ->
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([Env, WsPid, Req, ReplyChannel]) ->
init([Options, WsPid, Req]) ->
init_stas(),
process_flag(trap_exit, true),
true = link(WsPid),
Transport = mochiweb_request:get(transport, Req),
Sock = mochiweb_request:get(socket, Req),
case mochiweb_request:get(peername, Req) of
{ok, Peername} ->
Headers = mochiweb_headers:to_list(mochiweb_request:get(headers, Req)),
ProtoState = emqx_protocol:init(Transport, Sock, Peername, send_fun(ReplyChannel),
[{ws_initial_headers, Headers} | Env]),
IdleTimeout = get_value(client_idle_timeout, Env, 30000),
EnableStats = get_value(client_enable_stats, Env, false),
ForceGcCount = emqx_gc:conn_max_gc_count(),
{ok, #wsclient_state{transport = Transport,
socket = Sock,
ws_pid = WsPid,
peername = Peername,
proto_state = ProtoState,
enable_stats = EnableStats,
force_gc_count = ForceGcCount},
IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE};
{error, enotconn} -> Transport:fast_close(Sock),
exit(WsPid, normal),
exit(normal);
{error, Reason} -> Transport:fast_close(Sock),
exit(WsPid, normal),
exit({shutdown, Reason})
end.
handle_pre_hibernate(State = #wsclient_state{ws_pid = WsPid}) ->
erlang:garbage_collect(WsPid),
{hibernate, emqx_gc:reset_conn_gc_count(#wsclient_state.force_gc_count, emit_stats(State))}.
Peername = cowboy_req:peer(Req),
Headers = cowboy_req:headers(Req),
Sockname = cowboy_req:sock(Req),
Peercert = cowboy_req:cert(Req),
Zone = proplists:get_value(zone, Options),
ProtoState = emqx_protocol:init(#{zone => Zone,
peername => Peername,
sockname => Sockname,
peercert => Peercert,
sendfun => send_fun(WsPid)},
[{ws_initial_headers, Headers} | Options]),
IdleTimeout = get_value(client_idle_timeout, Options, 30000),
EnableStats = get_value(client_enable_stats, Options, false),
ForceGcCount = emqx_gc:conn_max_gc_count(),
{ok, #wsclient_state{ws_pid = WsPid,
peername = Peername,
proto_state = ProtoState,
enable_stats = EnableStats,
force_gc_count = ForceGcCount}, IdleTimeout}.
handle_call(info, From, State = #wsclient_state{peername = Peername,
proto_state = ProtoState}) ->
@ -123,7 +116,7 @@ handle_call(info, From, State = #wsclient_state{peername = Peername,
handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) ->
reply(lists:append([emqx_misc:proc_stats(),
wsock_stats(State),
wsock_stats(),
emqx_protocol:stats(ProtoState)]), State);
handle_call(kick, _From, State) ->
@ -140,7 +133,9 @@ handle_call(Req, _From, State) ->
?WSLOG(error, "Unexpected request: ~p", [Req], State),
reply({error, unexpected_request}, State).
handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) ->
handle_cast({received, Packet, BinSize}, State = #wsclient_state{proto_state = ProtoState}) ->
put(recv_oct, get(recv_oct) + BinSize),
put(recv_cnt, get(recv_cnt) + 1),
emqx_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
@ -158,48 +153,24 @@ handle_cast(Msg, State) ->
?WSLOG(error, "unexpected msg: ~p", [Msg], State),
{noreply, State}.
handle_info({subscribe, TopicTable}, State) ->
handle_info(SubReq ={subscribe, _TopicTable}, State) ->
with_proto(
fun(ProtoState) ->
emqx_protocol:subscribe(TopicTable, ProtoState)
emqx_protocol:process(SubReq, ProtoState)
end, State);
handle_info({unsubscribe, Topics}, State) ->
handle_info(UnsubReq = {unsubscribe, _Topics}, State) ->
with_proto(
fun(ProtoState) ->
emqx_protocol:unsubscribe(Topics, ProtoState)
emqx_protocol:process(UnsubReq, ProtoState)
end, State);
handle_info({suback, PacketId, ReasonCodes}, State) ->
handle_info({deliver, PubOrAck}, State) ->
with_proto(
fun(ProtoState) ->
Packet = ?SUBACK_PACKET(PacketId, ReasonCodes),
emqx_protocol:send(Packet, ProtoState)
end, State);
handle_info({unsuback, PacketId, ReasonCodes}, State) ->
with_proto(
fun(ProtoState) ->
Packet = ?UNSUBACK_PACKET(PacketId, ReasonCodes),
emqx_protocol:send(Packet, ProtoState)
end, State);
%% Fastlane
handle_info({dispatch, _Topic, Message}, State) ->
handle_info({deliver, Message#message{qos = ?QOS_0}}, State);
handle_info({deliver, Message}, State) ->
with_proto(
fun(ProtoState) ->
emqx_protocol:send(Message, ProtoState)
emqx_protocol:deliver(PubOrAck, ProtoState)
end, gc(State));
handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
with_proto(
fun(ProtoState) ->
emqx_protocol:pubrel(PacketId, ProtoState)
end, State);
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
@ -213,10 +184,9 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
handle_info({shutdown, Reason}, State) ->
shutdown(Reason, State);
handle_info({keepalive, start, Interval},
State = #wsclient_state{transport = Transport, socket =Sock}) ->
handle_info({keepalive, start, Interval}, State) ->
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
case emqx_keepalive:start(stat_fun(Transport, Sock), Interval, {keepalive, check}) of
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
{ok, KeepAlive} ->
{noreply, State#wsclient_state{keepalive = KeepAlive}, hibernate};
{error, Error} ->
@ -271,23 +241,18 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
send_fun(ReplyChannel) ->
Self = self(),
fun(Packet) ->
Data = emqx_frame:serialize(Packet),
emqx_metrics:inc('bytes/sent', iolist_size(Data)),
case ReplyChannel({binary, Data}) of
ok -> ok;
{error, Reason} -> Self ! {shutdown, Reason}
end
send_fun(WsPid) ->
fun(Data) ->
BinSize = iolist_size(Data),
emqx_metrics:inc('bytes/sent', BinSize),
put(send_oct, get(send_oct) + BinSize),
put(send_cnt, get(send_cnt) + 1),
WsPid ! {binary, iolist_to_binary(Data)}
end.
stat_fun(Transport, Sock) ->
stat_fun() ->
fun() ->
case Transport:getstat(Sock, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error}
end
{ok, get(recv_oct)}
end.
emit_stats(State = #wsclient_state{proto_state = ProtoState}) ->
@ -302,11 +267,8 @@ emit_stats(ClientId, State) ->
emqx_cm:set_client_stats(ClientId, Stats),
State.
wsock_stats(#wsclient_state{transport = Transport, socket = Sock}) ->
case Transport:getstat(Sock, ?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end.
wsock_stats() ->
[{Key, get(Key)}|| Key <- ?SOCK_STATS].
with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
@ -325,3 +287,9 @@ gc(State) ->
Cb = fun() -> emit_stats(State) end,
emqx_gc:maybe_force_gc(#wsclient_state.force_gc_count, State, Cb).
init_stas() ->
put(recv_oct, 0),
put(recv_cnt, 0),
put(send_oct, 0),
put(send_cnt, 0).

View File

@ -18,7 +18,7 @@
-behavior(supervisor).
-export([start_link/0, start_connection/3]).
-export([start_link/0, start_connection/2]).
-export([init/1]).
@ -27,9 +27,9 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Start a MQTT/WebSocket Connection.
-spec(start_connection(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
start_connection(WsPid, Req, ReplyChannel) ->
supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).
-spec(start_connection(pid(), mochiweb_request:request()) -> {ok, pid()}).
start_connection(WsPid, Req) ->
supervisor:start_child(?MODULE, [WsPid, Req]).
%%--------------------------------------------------------------------
%% Supervisor callbacks