Merge pull request #613 from emqtt/emq10

1.1.2 - Fix websocket client leak and upgrade mysql-otp driver
This commit is contained in:
Feng Lee 2016-06-30 18:38:36 +08:00 committed by GitHub
commit ec3342875d
25 changed files with 240 additions and 243 deletions

View File

@ -5,6 +5,28 @@
Changes Changes
======= =======
.. _release_1.1.2:
-------------
Version 1.1.2
-------------
*Release Date: 2016-06-30*
Upgrade mysql-otp driver to 1.2.0 (#564, #523, #586, #596)
Fix WebSocket Client Leak (PR #612)
java.io.EOFException using paho java client (#551)
Send message from paho java client to javascript client (#552)
Compatible with the Qos0 PUBREL packet (#575)
Empty clientId with non-clean session accepted (#599)
Update docs to fix typos (#601, #607)
.. _release_1.1.1: .. _release_1.1.1:
------------- -------------

View File

@ -320,7 +320,7 @@ Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_mysql/etc/plugin.config:
... ...
%% comment this query, the acl will be disabled %% comment this query, the acl will be disabled
{aclquery, "select * from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"}, {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
%% If no rules matched, return... %% If no rules matched, return...
{acl_nomatch, allow} {acl_nomatch, allow}

View File

@ -53,6 +53,7 @@ Contents:
plugins plugins
tune tune
changes changes
upgrade
------- -------
License License

33
docs/source/upgrade.rst Normal file
View File

@ -0,0 +1,33 @@
.. _upgrade:
=============
Upgrade Guide
=============
.. _upgrade_1.1.2:
----------------
Upgrade to 1.1.2
----------------
.. NOTE:: 1.0+ releases can be upgraded to 1.1.2 smoothly
Steps:
1. Download and install emqttd-1.1.2 to the new directory, for example::
Old installation: /opt/emqttd_1_0_0/
New installation: /opt/emqttd_1_1_2/
2. Copy the 'etc/' and 'data/' from the old installation::
cp -R /opt/emqttd_1_0_0/etc/* /opt/emqttd_1_1_2/etc/
cp -R /opt/emqttd_1_0_0/data/* /opt/emqttd_1_1_2/data/
3. Copy the plugins/{plugin}/etc/* from the old installation if you loaded plugins.
4. Stop the old emqttd, and start the new one.

Binary file not shown.

BIN
docs/specs/mqtt-v3.1.1.pdf Normal file

Binary file not shown.

@ -1 +1 @@
Subproject commit 4711caa703f84775ff0a1256587788430494eecd Subproject commit cd6ae58f1674165f2033bff6b7af819c1f3a169d

@ -1 +1 @@
Subproject commit f4ed05ee52d487dbab899aafa96b72519725978c Subproject commit fff5eb17b02a41ed9e928013bc318914cd6269a6

@ -1 +1 @@
Subproject commit 168a70890b6c204e0629fc915a6da4b0247c326e Subproject commit 95143c1a44f28cc38ac6c6a77c15cc9b5f433791

@ -1 +1 @@
Subproject commit f9a00100db19b864d7bdbe1eeca49978ea6934a9 Subproject commit 82176663240c1bff825a903c254a86ebb9031b5a

@ -1 +1 @@
Subproject commit 6ec5fb063f5070766f6a1f911b3c5771890bb878 Subproject commit e1d67068aabcc297956d2fd8aabe7f2dd1817650

@ -1 +1 @@
Subproject commit fc342cc9164c91f6a64e7de2f9f1b0604d767159 Subproject commit 285abd943850736bff04ac5e877cba2c3093ae25

@ -1 +1 @@
Subproject commit 418bb1d4385a0fd21a57918c2b2b4fdcc9751c1c Subproject commit c2c94e1d49dd26b1d558e7632c925ea319093294

@ -1 +1 @@
Subproject commit f946d5ff188a0668734fdc850149bb3d5cf1e798 Subproject commit 4b0721609ca605b0a052b46eb753185afebc9fda

@ -1 +1 @@
Subproject commit 1cc503573439da213b435bc1c5d2dc7eb3891052 Subproject commit 3521c5d07c24da5b362196de89bb0d250d42eccc

@ -1 +1 @@
Subproject commit 4bd649c137defacacbaba685c3adf042e3f70700 Subproject commit fb97df1bb3e4a813c44f48161aef32cac428eb87

View File

@ -58,3 +58,4 @@
## Tweak GC to run more often ## Tweak GC to run more often
-env ERL_FULLSWEEP_AFTER 1000 -env ERL_FULLSWEEP_AFTER 1000
-env ERL_CRASH_DUMP log/emqttd_crash.dump

View File

@ -89,6 +89,7 @@ start_servers(Sup) ->
{"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}},
{"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}},
{"emqttd wsclient supervisor", {supervisor, emqttd_ws_client_sup}},
{"emqttd broker", emqttd_broker}, {"emqttd broker", emqttd_broker},
{"emqttd alarm", emqttd_alarm}, {"emqttd alarm", emqttd_alarm},
{"emqttd mod supervisor", emqttd_mod_sup}, {"emqttd mod supervisor", emqttd_mod_sup},
@ -187,7 +188,7 @@ start_listener({https, ListenOn, Opts}) ->
start_listener(Protocol, ListenOn, Opts) -> start_listener(Protocol, ListenOn, Opts) ->
MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]}, MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]},
esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
merge_sockopts(Options) -> merge_sockopts(Options) ->
SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS, SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,

View File

@ -52,13 +52,14 @@ handle_request('POST', "/mqtt/publish", Req) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT Over WebSocket %% MQTT Over WebSocket
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_request('GET', "/mqtt", Req) -> handle_request('GET', "/mqtt", Req) ->
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"), Upgrade = Req:get_header_value("Upgrade"),
Proto = Req:get_header_value("Sec-WebSocket-Protocol"), Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
case {is_websocket(Upgrade), Proto} of case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} -> {true, "mqtt" ++ _Vsn} ->
emqttd_ws_client:start_link(Req); emqttd_ws:handle_request(Req);
{false, _} -> {false, _} ->
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
Req:respond({400, [], <<"Bad Request">>}); Req:respond({400, [], <<"Bad Request">>});
@ -144,13 +145,12 @@ authorized(Req) ->
user_passwd(BasicAuth) -> user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
int(S) -> list_to_integer(S). int(S) -> list_to_integer(S).
bool("0") -> false; bool("0") -> false;
bool("1") -> true. bool("1") -> true.
is_websocket(Upgrade) -> is_websocket(Upgrade) ->
Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
docroot() -> docroot() ->

View File

@ -36,8 +36,7 @@ new(Opts) ->
fun(Bin) -> parse(Bin, {none, limit(Opts)}) end. fun(Bin) -> parse(Bin, {none, limit(Opts)}) end.
limit(Opts) -> limit(Opts) ->
#mqtt_packet_limit{max_packet_size = #mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}.
%% @doc Parse MQTT Packet %% @doc Parse MQTT Packet
-spec(parse(binary(), {none, [option()]} | fun()) -spec(parse(binary(), {none, [option()]} | fun())

View File

@ -38,7 +38,7 @@ start_session(CleanSess, ClientId, ClientPid) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
{ok, {{simple_one_for_one, 10, 10}, {ok, {{simple_one_for_one, 0, 1},
[{session, {emqttd_session, start_link, []}, [{session, {emqttd_session, start_link, []},
temporary, 10000, worker, [emqttd_session]}]}}. temporary, 5000, worker, [emqttd_session]}]}}.

75
src/emqttd_ws.erl Normal file
View File

@ -0,0 +1,75 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_ws).
-export([handle_request/1, ws_loop/3]).
%% WebSocket Loop State
-record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}).
-define(LOG(Level, Peer, Format, Args),
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
%%--------------------------------------------------------------------
%% Handle WebSocket Request
%%--------------------------------------------------------------------
%% @doc Handle WebSocket Request.
handle_request(Req) ->
Peer = Req:get(peer),
PktOpts = emqttd:env(mqtt, packet),
ParserFun = emqttd_parser:new(PktOpts),
{ReentryWs, ReplyChannel} = upgrade(Req),
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid,
packet_opts = PktOpts, parser_fun = ParserFun}).
%% @doc Upgrade WebSocket.
%% @private
upgrade(Req) ->
mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
%%--------------------------------------------------------------------
%% Receive Loop
%%--------------------------------------------------------------------
%% @doc WebSocket frame receive loop.
ws_loop(<<>>, State, _ReplyChannel) ->
State;
ws_loop([<<>>], State, _ReplyChannel) ->
State;
ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid,
parser_fun = ParserFun}, ReplyChannel) ->
?LOG(debug, Peer, "RECV ~p", [Data]),
case catch ParserFun(iolist_to_binary(Data)) of
{more, NewParser} ->
State#wsocket_state{parser_fun = NewParser};
{ok, Packet, Rest} ->
gen_server:cast(ClientPid, {received, Packet}),
ws_loop(Rest, reset_parser(State), ReplyChannel);
{error, Error} ->
?LOG(error, Peer, "Frame error: ~p", [Error]),
exit({shutdown, Error});
{'EXIT', Reason} ->
?LOG(error, Peer, "Frame error: ~p", [Reason]),
?LOG(error, Peer, "Error data: ~p", [Data]),
exit({shutdown, parser_error})
end.
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.

View File

@ -16,42 +16,31 @@
-module(emqttd_ws_client). -module(emqttd_ws_client).
-behaviour(gen_server).
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
%% API Exports %% API Exports
-export([start_link/1, ws_loop/3, session/1, info/1, kick/1]). -export([start_link/4, session/1, info/1, kick/1]).
%% SUB/UNSUB Asynchronously %% SUB/UNSUB Asynchronously
-export([subscribe/2, unsubscribe/2]). -export([subscribe/2, unsubscribe/2]).
-behaviour(gen_server).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% WebSocket Loop State
-record(wsocket_state, {request, client_pid, packet_opts, parser_fun}).
%% WebSocket Client State %% WebSocket Client State
-record(wsclient_state, {ws_pid, request, proto_state, keepalive}). -record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}).
-define(WSLOG(Level, Format, Args, Req), -define(WSLOG(Level, Peer, Format, Args),
lager:Level("WsClient(~s): " ++ Format, [Req:get(peer) | Args])). lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
%% @doc Start WebSocket client. %% @doc Start WebSocket Client.
start_link(Req) -> start_link(MqttEnv, WsPid, Req, ReplyChannel) ->
PktOpts = emqttd:env(mqtt, packet), gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []).
ParserFun = emqttd_parser:new(PktOpts),
{ReentryWs, ReplyChannel} = upgrade(Req),
Params = [self(), Req, ReplyChannel, PktOpts],
{ok, ClientPid} = gen_server:start_link(?MODULE, Params, []),
ReentryWs(#wsocket_state{request = Req,
client_pid = ClientPid,
packet_opts = PktOpts,
parser_fun = ParserFun}).
session(CPid) -> session(CPid) ->
gen_server:call(CPid, session, infinity). gen_server:call(CPid, session, infinity).
@ -68,66 +57,40 @@ subscribe(CPid, TopicTable) ->
unsubscribe(CPid, Topics) -> unsubscribe(CPid, Topics) ->
gen_server:cast(CPid, {unsubscribe, Topics}). gen_server:cast(CPid, {unsubscribe, Topics}).
%% @private
%% @doc Upgrade WebSocket.
upgrade(Req) ->
mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
%% @doc WebSocket frame receive loop.
ws_loop(<<>>, State, _ReplyChannel) ->
State;
ws_loop([<<>>], State, _ReplyChannel) ->
State;
ws_loop(Data, State = #wsocket_state{request = Req,
client_pid = ClientPid,
parser_fun = ParserFun}, ReplyChannel) ->
?WSLOG(debug, "RECV ~p", [Data], Req),
case catch ParserFun(iolist_to_binary(Data)) of
{more, NewParser} ->
State#wsocket_state{parser_fun = NewParser};
{ok, Packet, Rest} ->
gen_server:cast(ClientPid, {received, Packet}),
ws_loop(Rest, reset_parser(State), ReplyChannel);
{error, Error} ->
?WSLOG(error, "Frame error: ~p", [Error], Req),
exit({shutdown, Error});
{'EXIT', Reason} ->
?WSLOG(error, "Frame error: ~p", [Reason], Req),
?WSLOG(error, "Error data: ~p", [Data], Req),
exit({shutdown, parser_error})
end.
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([WsPid, Req, ReplyChannel, PktOpts]) -> init([MqttEnv, WsPid, Req, ReplyChannel]) ->
%%issue#413: trap_exit is unnecessary true = link(WsPid),
%%process_flag(trap_exit, true),
{ok, Peername} = Req:get(peername), {ok, Peername} = Req:get(peername),
Headers = mochiweb_headers:to_list(
mochiweb_request:get(headers, Req)),
PktOpts = proplists:get_value(packet, MqttEnv),
SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
Headers = mochiweb_request:get(headers, Req),
HeadersList = mochiweb_headers:to_list(Headers),
ProtoState = emqttd_protocol:init(Peername, SendFun, ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, HeadersList} | PktOpts]), [{ws_initial_headers, Headers} | PktOpts]),
{ok, #wsclient_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer),
connection = Req:get(connection),
proto_state = ProtoState}, idle_timeout(MqttEnv)}.
idle_timeout(MqttEnv) ->
ClientOpts = proplists:get_value(client, MqttEnv),
timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)).
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State}; {reply, emqttd_protocol:session(ProtoState), State};
handle_call(info, _From, State = #wsclient_state{request = Req, handle_call(info, _From, State = #wsclient_state{peer = Peer,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
ProtoInfo = emqttd_protocol:info(ProtoState), ProtoInfo = emqttd_protocol:info(ProtoState),
{reply, [{websocket, true}, {peer, Req:get(peer)}| ProtoInfo], State}; {reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State};
handle_call(kick, _From, State) -> handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call(Req, _From, State = #wsclient_state{request = HttpReq}) -> handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
?WSLOG(critical, "Unexpected request: ~p", [Req], HttpReq), ?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]),
{reply, {error, unsupported_request}, State}. {reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State) -> handle_cast({subscribe, TopicTable}, State) ->
@ -140,13 +103,12 @@ handle_cast({unsubscribe, Topics}, State) ->
emqttd_session:unsubscribe(SessPid, Topics) emqttd_session:unsubscribe(SessPid, Topics)
end, State); end, State);
handle_cast({received, Packet}, State = #wsclient_state{request = Req, handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
proto_state = ProtoState}) ->
case emqttd_protocol:received(Packet, ProtoState) of case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
noreply(State#wsclient_state{proto_state = ProtoState1}); noreply(State#wsclient_state{proto_state = ProtoState1});
{error, Error} -> {error, Error} ->
?WSLOG(error, "Protocol error - ~p", [Error], Req), ?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
shutdown(Error, State); shutdown(Error, State);
{error, Error, ProtoState1} -> {error, Error, ProtoState1} ->
shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
@ -154,9 +116,12 @@ handle_cast({received, Packet}, State = #wsclient_state{request = Req,
stop(Reason, State#wsclient_state{proto_state = ProtoState1}) stop(Reason, State#wsclient_state{proto_state = ProtoState1})
end; end;
handle_cast(Msg, State = #wsclient_state{request = Req}) -> handle_cast(Msg, State = #wsclient_state{peer = Peer}) ->
?WSLOG(critical, "Unexpected msg: ~p", [Msg], Req), ?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]),
{noreply, State}. noreply(State).
handle_info(timeout, State) ->
shutdown(idle_timeout, State);
handle_info({suback, PacketId, GrantedQos}, State) -> handle_info({suback, PacketId, GrantedQos}, State) ->
with_proto_state(fun(ProtoState) -> with_proto_state(fun(ProtoState) ->
@ -174,13 +139,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState) emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
end, State); end, State);
handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) ->
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req), ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
shutdown(conflict, State); shutdown(conflict, State);
handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) -> handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) ->
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req), ?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]),
Conn = Req:get(connection),
StatFun = fun() -> StatFun = fun() ->
case Conn:getstat([recv_oct]) of case Conn:getstat([recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
@ -190,25 +154,21 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
noreply(State#wsclient_state{keepalive = KeepAlive}); noreply(State#wsclient_state{keepalive = KeepAlive});
handle_info({keepalive, check}, State = #wsclient_state{request = Req, handle_info({keepalive, check}, State = #wsclient_state{peer = Peer,
keepalive = KeepAlive}) -> keepalive = KeepAlive}) ->
case emqttd_keepalive:check(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
noreply(State#wsclient_state{keepalive = KeepAlive1}); noreply(State#wsclient_state{keepalive = KeepAlive1});
{error, timeout} -> {error, timeout} ->
?WSLOG(debug, "Keepalive Timeout!", [], Req), ?WSLOG(debug, Peer, "Keepalive Timeout!", []),
shutdown(keepalive_timeout, State); shutdown(keepalive_timeout, State);
{error, Error} -> {error, Error} ->
?WSLOG(warning, "Keepalive error - ~p", [Error], Req), ?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]),
shutdown(keepalive_error, State) shutdown(keepalive_error, State)
end; end;
%%issue#413: removed the trap_exit flag handle_info(Info, State = #wsclient_state{peer = Peer}) ->
%%handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) -> ?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]),
%% stop(Reason, State);
handle_info(Info, State = #wsclient_state{request = Req}) ->
?WSLOG(critical, "Unexpected Info: ~p", [Info], Req),
noreply(State). noreply(State).
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->

View File

@ -0,0 +1,45 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_ws_client_sup).
-author("Feng Lee <feng@emqtt.io>").
-behavior(supervisor).
-export([start_link/0, start_client/3]).
-export([init/1]).
%% @doc Start websocket client supervisor
-spec(start_link() -> {ok, pid()}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd:env(mqtt)]).
%% @doc Start a WebSocket Client
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
start_client(WsPid, Req, ReplyChannel) ->
supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([Env]) ->
{ok, {{simple_one_for_one, 0, 1},
[{ws_client, {emqttd_ws_client, start_link, [Env]},
temporary, 5000, worker, [emqttd_ws_client]}]}}.

View File

@ -1,140 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(esockd_access).
-type cidr() :: string().
-type rule() :: {allow, all} |
{allow, cidr()} |
{deny, all} |
{deny, cidr()}.
-type range() :: {cidr(), pos_integer(), pos_integer()}.
-export_type([cidr/0, range/0, rule/0]).
-type range_rule() :: {allow, all} |
{allow, range()} |
{deny, all} |
{deny, range()}.
-export([rule/1, match/2, range/1, mask/1, atoi/1, itoa/1]).
%%------------------------------------------------------------------------------
%% @doc
%% Build CIDR, Make rule.
%%
%% @end
%%------------------------------------------------------------------------------
-spec rule(rule()) -> range_rule().
rule({allow, all}) ->
{allow, all};
rule({allow, CIDR}) when is_list(CIDR) ->
rule(allow, CIDR);
rule({deny, CIDR}) when is_list(CIDR) ->
rule(deny, CIDR);
rule({deny, all}) ->
{deny, all}.
rule(Type, CIDR) when is_list(CIDR) ->
{Start, End} = range(CIDR), {Type, {CIDR, Start, End}}.
%%------------------------------------------------------------------------------
%% @doc
%% Match Addr with Access Rules.
%%
%% @end
%%------------------------------------------------------------------------------
-spec match(inet:ip_address(), [range_rule()]) -> {matched, allow} | {matched, deny} | nomatch.
match(Addr, Rules) when is_tuple(Addr) ->
match2(atoi(Addr), Rules).
match2(_I, []) ->
nomatch;
match2(_I, [{allow, all}|_]) ->
{matched, allow};
match2(I, [{allow, {_, Start, End}}|_]) when I >= Start, I =< End ->
{matched, allow};
match2(I, [{allow, {_, _Start, _End}}|Rules]) ->
match2(I, Rules);
match2(I, [{deny, {_, Start, End}}|_]) when I >= Start, I =< End ->
{matched, deny};
match2(I, [{deny, {_, _Start, _End}}|Rules]) ->
match2(I, Rules);
match2(_I, [{deny, all}|_]) ->
{matched, deny}.
%%------------------------------------------------------------------------------
%% @doc
%% CIDR range.
%%
%% @end
%%------------------------------------------------------------------------------
-spec range(cidr()) -> {pos_integer(), pos_integer()}.
range(CIDR) ->
case string:tokens(CIDR, "/") of
[Addr] ->
{ok, IP} = inet:getaddr(Addr, inet),
{atoi(IP), atoi(IP)};
[Addr, Mask] ->
{ok, IP} = inet:getaddr(Addr, inet),
{Start, End} = subnet(IP, mask(list_to_integer(Mask))),
{Start, End}
end.
subnet(IP, Mask) ->
Start = atoi(IP) band Mask,
End = Start bor (Mask bxor 16#FFFFFFFF),
{Start, End}.
%%------------------------------------------------------------------------------
%% @doc
%% Mask Int
%%
%% @end
%%------------------------------------------------------------------------------
-spec mask(0..32) -> 0..16#FFFFFFFF.
mask(0) ->
16#00000000;
mask(32) ->
16#FFFFFFFF;
mask(N) when N >= 1, N =< 31 ->
lists:foldl(fun(I, Mask) -> (1 bsl I) bor Mask end, 0, lists:seq(32 - N, 31)).
%%------------------------------------------------------------------------------
%% @doc
%% Addr to Integer.
%%
%% @end
%%------------------------------------------------------------------------------
atoi({A, B, C, D}) ->
(A bsl 24) + (B bsl 16) + (C bsl 8) + D.
%%------------------------------------------------------------------------------
%% @doc
%% Integer to Addr.
%%
%% @end
%%------------------------------------------------------------------------------
itoa(I) ->
A = (I bsr 24) band 16#FF,
B = (I bsr 16) band 16#FF,
C = (I bsr 8) band 16#FF,
D = I band 16#FF,
{A, B, C, D}.