diff --git a/docs/source/changes.rst b/docs/source/changes.rst index b7f709ca0..548233125 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -5,6 +5,28 @@ Changes ======= +.. _release_1.1.2: + +------------- +Version 1.1.2 +------------- + +*Release Date: 2016-06-30* + +Upgrade mysql-otp driver to 1.2.0 (#564, #523, #586, #596) + +Fix WebSocket Client Leak (PR #612) + +java.io.EOFException using paho java client (#551) + +Send message from paho java client to javascript client (#552) + +Compatible with the Qos0 PUBREL packet (#575) + +Empty clientId with non-clean session accepted (#599) + +Update docs to fix typos (#601, #607) + .. _release_1.1.1: ------------- diff --git a/docs/source/guide.rst b/docs/source/guide.rst index c732734f4..709c4329f 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -320,7 +320,7 @@ Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_mysql/etc/plugin.config: ... %% comment this query, the acl will be disabled - {aclquery, "select * from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"}, + {aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"}, %% If no rules matched, return... {acl_nomatch, allow} diff --git a/docs/source/index.rst b/docs/source/index.rst index 55740c006..4051035b1 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -53,6 +53,7 @@ Contents: plugins tune changes + upgrade ------- License diff --git a/docs/source/upgrade.rst b/docs/source/upgrade.rst new file mode 100644 index 000000000..1d5147f29 --- /dev/null +++ b/docs/source/upgrade.rst @@ -0,0 +1,33 @@ + +.. _upgrade: + +============= +Upgrade Guide +============= + +.. _upgrade_1.1.2: + +---------------- +Upgrade to 1.1.2 +---------------- + +.. NOTE:: 1.0+ releases can be upgraded to 1.1.2 smoothly + +Steps: + +1. Download and install emqttd-1.1.2 to the new directory, for example:: + + Old installation: /opt/emqttd_1_0_0/ + + New installation: /opt/emqttd_1_1_2/ + +2. Copy the 'etc/' and 'data/' from the old installation:: + + cp -R /opt/emqttd_1_0_0/etc/* /opt/emqttd_1_1_2/etc/ + + cp -R /opt/emqttd_1_0_0/data/* /opt/emqttd_1_1_2/data/ + +3. Copy the plugins/{plugin}/etc/* from the old installation if you loaded plugins. + +4. Stop the old emqttd, and start the new one. + diff --git a/docs/specs/MQTT-SN_spec_v1.2.pdf b/docs/specs/MQTT-SN_spec_v1.2.pdf new file mode 100644 index 000000000..ee6020d23 Binary files /dev/null and b/docs/specs/MQTT-SN_spec_v1.2.pdf differ diff --git a/docs/specs/mqtt-v3.1.1.pdf b/docs/specs/mqtt-v3.1.1.pdf new file mode 100644 index 000000000..e4095f1b5 Binary files /dev/null and b/docs/specs/mqtt-v3.1.1.pdf differ diff --git a/plugins/emqttd_auth_http b/plugins/emqttd_auth_http index 4711caa70..cd6ae58f1 160000 --- a/plugins/emqttd_auth_http +++ b/plugins/emqttd_auth_http @@ -1 +1 @@ -Subproject commit 4711caa703f84775ff0a1256587788430494eecd +Subproject commit cd6ae58f1674165f2033bff6b7af819c1f3a169d diff --git a/plugins/emqttd_dashboard b/plugins/emqttd_dashboard index f4ed05ee5..fff5eb17b 160000 --- a/plugins/emqttd_dashboard +++ b/plugins/emqttd_dashboard @@ -1 +1 @@ -Subproject commit f4ed05ee52d487dbab899aafa96b72519725978c +Subproject commit fff5eb17b02a41ed9e928013bc318914cd6269a6 diff --git a/plugins/emqttd_plugin_mongo b/plugins/emqttd_plugin_mongo index 168a70890..95143c1a4 160000 --- a/plugins/emqttd_plugin_mongo +++ b/plugins/emqttd_plugin_mongo @@ -1 +1 @@ -Subproject commit 168a70890b6c204e0629fc915a6da4b0247c326e +Subproject commit 95143c1a44f28cc38ac6c6a77c15cc9b5f433791 diff --git a/plugins/emqttd_plugin_mysql b/plugins/emqttd_plugin_mysql index f9a00100d..821766632 160000 --- a/plugins/emqttd_plugin_mysql +++ b/plugins/emqttd_plugin_mysql @@ -1 +1 @@ -Subproject commit f9a00100db19b864d7bdbe1eeca49978ea6934a9 +Subproject commit 82176663240c1bff825a903c254a86ebb9031b5a diff --git a/plugins/emqttd_plugin_pgsql b/plugins/emqttd_plugin_pgsql index 6ec5fb063..e1d67068a 160000 --- a/plugins/emqttd_plugin_pgsql +++ b/plugins/emqttd_plugin_pgsql @@ -1 +1 @@ -Subproject commit 6ec5fb063f5070766f6a1f911b3c5771890bb878 +Subproject commit e1d67068aabcc297956d2fd8aabe7f2dd1817650 diff --git a/plugins/emqttd_plugin_redis b/plugins/emqttd_plugin_redis index fc342cc91..285abd943 160000 --- a/plugins/emqttd_plugin_redis +++ b/plugins/emqttd_plugin_redis @@ -1 +1 @@ -Subproject commit fc342cc9164c91f6a64e7de2f9f1b0604d767159 +Subproject commit 285abd943850736bff04ac5e877cba2c3093ae25 diff --git a/plugins/emqttd_plugin_template b/plugins/emqttd_plugin_template index 418bb1d43..c2c94e1d4 160000 --- a/plugins/emqttd_plugin_template +++ b/plugins/emqttd_plugin_template @@ -1 +1 @@ -Subproject commit 418bb1d4385a0fd21a57918c2b2b4fdcc9751c1c +Subproject commit c2c94e1d49dd26b1d558e7632c925ea319093294 diff --git a/plugins/emqttd_recon b/plugins/emqttd_recon index f946d5ff1..4b0721609 160000 --- a/plugins/emqttd_recon +++ b/plugins/emqttd_recon @@ -1 +1 @@ -Subproject commit f946d5ff188a0668734fdc850149bb3d5cf1e798 +Subproject commit 4b0721609ca605b0a052b46eb753185afebc9fda diff --git a/plugins/emqttd_reloader b/plugins/emqttd_reloader index 1cc503573..3521c5d07 160000 --- a/plugins/emqttd_reloader +++ b/plugins/emqttd_reloader @@ -1 +1 @@ -Subproject commit 1cc503573439da213b435bc1c5d2dc7eb3891052 +Subproject commit 3521c5d07c24da5b362196de89bb0d250d42eccc diff --git a/plugins/emqttd_stomp b/plugins/emqttd_stomp index 4bd649c13..fb97df1bb 160000 --- a/plugins/emqttd_stomp +++ b/plugins/emqttd_stomp @@ -1 +1 @@ -Subproject commit 4bd649c137defacacbaba685c3adf042e3f70700 +Subproject commit fb97df1bb3e4a813c44f48161aef32cac428eb87 diff --git a/rel/files/vm.args b/rel/files/vm.args index 27c743997..a2ad9302c 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -58,3 +58,4 @@ ## Tweak GC to run more often -env ERL_FULLSWEEP_AFTER 1000 +-env ERL_CRASH_DUMP log/emqttd_crash.dump diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 42cc35daa..5c56e824f 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -89,6 +89,7 @@ start_servers(Sup) -> {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, + {"emqttd wsclient supervisor", {supervisor, emqttd_ws_client_sup}}, {"emqttd broker", emqttd_broker}, {"emqttd alarm", emqttd_alarm}, {"emqttd mod supervisor", emqttd_mod_sup}, @@ -187,7 +188,7 @@ start_listener({https, ListenOn, Opts}) -> start_listener(Protocol, ListenOn, Opts) -> MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]}, - esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). + {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). merge_sockopts(Options) -> SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS, diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 794193a5d..f898accd1 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -52,13 +52,14 @@ handle_request('POST', "/mqtt/publish", Req) -> %%-------------------------------------------------------------------- %% MQTT Over WebSocket %%-------------------------------------------------------------------- + handle_request('GET', "/mqtt", Req) -> lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), Proto = Req:get_header_value("Sec-WebSocket-Protocol"), case {is_websocket(Upgrade), Proto} of {true, "mqtt" ++ _Vsn} -> - emqttd_ws_client:start_link(Req); + emqttd_ws:handle_request(Req); {false, _} -> lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), Req:respond({400, [], <<"Bad Request">>}); @@ -144,13 +145,12 @@ authorized(Req) -> user_passwd(BasicAuth) -> list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). - int(S) -> list_to_integer(S). bool("0") -> false; bool("1") -> true. -is_websocket(Upgrade) -> +is_websocket(Upgrade) -> Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". docroot() -> diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index 1590ed0f8..139c5c1fb 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -36,8 +36,7 @@ new(Opts) -> fun(Bin) -> parse(Bin, {none, limit(Opts)}) end. limit(Opts) -> - #mqtt_packet_limit{max_packet_size = - proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}. + #mqtt_packet_limit{max_packet_size = proplists:get_value(max_packet_size, Opts, ?MAX_LEN)}. %% @doc Parse MQTT Packet -spec(parse(binary(), {none, [option()]} | fun()) diff --git a/src/emqttd_session_sup.erl b/src/emqttd_session_sup.erl index bea3249e5..2b9ee9496 100644 --- a/src/emqttd_session_sup.erl +++ b/src/emqttd_session_sup.erl @@ -38,7 +38,7 @@ start_session(CleanSess, ClientId, ClientPid) -> %%-------------------------------------------------------------------- init([]) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one, 0, 1}, [{session, {emqttd_session, start_link, []}, - temporary, 10000, worker, [emqttd_session]}]}}. + temporary, 5000, worker, [emqttd_session]}]}}. diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl new file mode 100644 index 000000000..fc482b365 --- /dev/null +++ b/src/emqttd_ws.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_ws). + +-export([handle_request/1, ws_loop/3]). + +%% WebSocket Loop State +-record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}). + +-define(LOG(Level, Peer, Format, Args), + lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). + +%%-------------------------------------------------------------------- +%% Handle WebSocket Request +%%-------------------------------------------------------------------- + +%% @doc Handle WebSocket Request. +handle_request(Req) -> + Peer = Req:get(peer), + PktOpts = emqttd:env(mqtt, packet), + ParserFun = emqttd_parser:new(PktOpts), + {ReentryWs, ReplyChannel} = upgrade(Req), + {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), + ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid, + packet_opts = PktOpts, parser_fun = ParserFun}). + +%% @doc Upgrade WebSocket. +%% @private +upgrade(Req) -> + mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3). + +%%-------------------------------------------------------------------- +%% Receive Loop +%%-------------------------------------------------------------------- + +%% @doc WebSocket frame receive loop. +ws_loop(<<>>, State, _ReplyChannel) -> + State; +ws_loop([<<>>], State, _ReplyChannel) -> + State; +ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid, + parser_fun = ParserFun}, ReplyChannel) -> + ?LOG(debug, Peer, "RECV ~p", [Data]), + case catch ParserFun(iolist_to_binary(Data)) of + {more, NewParser} -> + State#wsocket_state{parser_fun = NewParser}; + {ok, Packet, Rest} -> + gen_server:cast(ClientPid, {received, Packet}), + ws_loop(Rest, reset_parser(State), ReplyChannel); + {error, Error} -> + ?LOG(error, Peer, "Frame error: ~p", [Error]), + exit({shutdown, Error}); + {'EXIT', Reason} -> + ?LOG(error, Peer, "Frame error: ~p", [Reason]), + ?LOG(error, Peer, "Error data: ~p", [Data]), + exit({shutdown, parser_error}) + end. + +reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> + State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}. + diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 776dc4ce5..ff4c16e79 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -16,42 +16,31 @@ -module(emqttd_ws_client). +-behaviour(gen_server). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). %% API Exports --export([start_link/1, ws_loop/3, session/1, info/1, kick/1]). +-export([start_link/4, session/1, info/1, kick/1]). %% SUB/UNSUB Asynchronously -export([subscribe/2, unsubscribe/2]). --behaviour(gen_server). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% WebSocket Loop State --record(wsocket_state, {request, client_pid, packet_opts, parser_fun}). - %% WebSocket Client State --record(wsclient_state, {ws_pid, request, proto_state, keepalive}). +-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}). --define(WSLOG(Level, Format, Args, Req), - lager:Level("WsClient(~s): " ++ Format, [Req:get(peer) | Args])). +-define(WSLOG(Level, Peer, Format, Args), + lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). -%% @doc Start WebSocket client. -start_link(Req) -> - PktOpts = emqttd:env(mqtt, packet), - ParserFun = emqttd_parser:new(PktOpts), - {ReentryWs, ReplyChannel} = upgrade(Req), - Params = [self(), Req, ReplyChannel, PktOpts], - {ok, ClientPid} = gen_server:start_link(?MODULE, Params, []), - ReentryWs(#wsocket_state{request = Req, - client_pid = ClientPid, - packet_opts = PktOpts, - parser_fun = ParserFun}). +%% @doc Start WebSocket Client. +start_link(MqttEnv, WsPid, Req, ReplyChannel) -> + gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []). session(CPid) -> gen_server:call(CPid, session, infinity). @@ -68,66 +57,40 @@ subscribe(CPid, TopicTable) -> unsubscribe(CPid, Topics) -> gen_server:cast(CPid, {unsubscribe, Topics}). -%% @private -%% @doc Upgrade WebSocket. -upgrade(Req) -> - mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3). - -%% @doc WebSocket frame receive loop. -ws_loop(<<>>, State, _ReplyChannel) -> - State; -ws_loop([<<>>], State, _ReplyChannel) -> - State; -ws_loop(Data, State = #wsocket_state{request = Req, - client_pid = ClientPid, - parser_fun = ParserFun}, ReplyChannel) -> - ?WSLOG(debug, "RECV ~p", [Data], Req), - case catch ParserFun(iolist_to_binary(Data)) of - {more, NewParser} -> - State#wsocket_state{parser_fun = NewParser}; - {ok, Packet, Rest} -> - gen_server:cast(ClientPid, {received, Packet}), - ws_loop(Rest, reset_parser(State), ReplyChannel); - {error, Error} -> - ?WSLOG(error, "Frame error: ~p", [Error], Req), - exit({shutdown, Error}); - {'EXIT', Reason} -> - ?WSLOG(error, "Frame error: ~p", [Reason], Req), - ?WSLOG(error, "Error data: ~p", [Data], Req), - exit({shutdown, parser_error}) - end. - -reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}. - %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- -init([WsPid, Req, ReplyChannel, PktOpts]) -> - %%issue#413: trap_exit is unnecessary - %%process_flag(trap_exit, true), +init([MqttEnv, WsPid, Req, ReplyChannel]) -> + true = link(WsPid), {ok, Peername} = Req:get(peername), + Headers = mochiweb_headers:to_list( + mochiweb_request:get(headers, Req)), + PktOpts = proplists:get_value(packet, MqttEnv), SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, - Headers = mochiweb_request:get(headers, Req), - HeadersList = mochiweb_headers:to_list(Headers), ProtoState = emqttd_protocol:init(Peername, SendFun, - [{ws_initial_headers, HeadersList} | PktOpts]), - {ok, #wsclient_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. + [{ws_initial_headers, Headers} | PktOpts]), + {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer), + connection = Req:get(connection), + proto_state = ProtoState}, idle_timeout(MqttEnv)}. + +idle_timeout(MqttEnv) -> + ClientOpts = proplists:get_value(client, MqttEnv), + timer:seconds(proplists:get_value(idle_timeout, ClientOpts, 10)). handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> {reply, emqttd_protocol:session(ProtoState), State}; -handle_call(info, _From, State = #wsclient_state{request = Req, +handle_call(info, _From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> ProtoInfo = emqttd_protocol:info(ProtoState), - {reply, [{websocket, true}, {peer, Req:get(peer)}| ProtoInfo], State}; + {reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State}; handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; -handle_call(Req, _From, State = #wsclient_state{request = HttpReq}) -> - ?WSLOG(critical, "Unexpected request: ~p", [Req], HttpReq), +handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> + ?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]), {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> @@ -140,13 +103,12 @@ handle_cast({unsubscribe, Topics}, State) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); -handle_cast({received, Packet}, State = #wsclient_state{request = Req, - proto_state = ProtoState}) -> +handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> noreply(State#wsclient_state{proto_state = ProtoState1}); {error, Error} -> - ?WSLOG(error, "Protocol error - ~p", [Error], Req), + ?WSLOG(error, Peer, "Protocol error - ~p", [Error]), shutdown(Error, State); {error, Error, ProtoState1} -> shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); @@ -154,9 +116,12 @@ handle_cast({received, Packet}, State = #wsclient_state{request = Req, stop(Reason, State#wsclient_state{proto_state = ProtoState1}) end; -handle_cast(Msg, State = #wsclient_state{request = Req}) -> - ?WSLOG(critical, "Unexpected msg: ~p", [Msg], Req), - {noreply, State}. +handle_cast(Msg, State = #wsclient_state{peer = Peer}) -> + ?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]), + noreply(State). + +handle_info(timeout, State) -> + shutdown(idle_timeout, State); handle_info({suback, PacketId, GrantedQos}, State) -> with_proto_state(fun(ProtoState) -> @@ -174,13 +139,12 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State) -> emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState) end, State); -handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) -> - ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req), +handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) -> + ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); -handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) -> - ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req), - Conn = Req:get(connection), +handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, connection = Conn}) -> + ?WSLOG(debug, Peer, "Keepalive at the interval of ~p", [Interval]), StatFun = fun() -> case Conn:getstat([recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; @@ -190,25 +154,21 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req} KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), noreply(State#wsclient_state{keepalive = KeepAlive}); -handle_info({keepalive, check}, State = #wsclient_state{request = Req, +handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> noreply(State#wsclient_state{keepalive = KeepAlive1}); {error, timeout} -> - ?WSLOG(debug, "Keepalive Timeout!", [], Req), + ?WSLOG(debug, Peer, "Keepalive Timeout!", []), shutdown(keepalive_timeout, State); {error, Error} -> - ?WSLOG(warning, "Keepalive error - ~p", [Error], Req), + ?WSLOG(warning, Peer, "Keepalive error - ~p", [Error]), shutdown(keepalive_error, State) end; -%%issue#413: removed the trap_exit flag -%%handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) -> -%% stop(Reason, State); - -handle_info(Info, State = #wsclient_state{request = Req}) -> - ?WSLOG(critical, "Unexpected Info: ~p", [Info], Req), +handle_info(Info, State = #wsclient_state{peer = Peer}) -> + ?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]), noreply(State). terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> diff --git a/src/emqttd_ws_client_sup.erl b/src/emqttd_ws_client_sup.erl new file mode 100644 index 000000000..3577527a6 --- /dev/null +++ b/src/emqttd_ws_client_sup.erl @@ -0,0 +1,45 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_ws_client_sup). + +-author("Feng Lee "). + +-behavior(supervisor). + +-export([start_link/0, start_client/3]). + +-export([init/1]). + +%% @doc Start websocket client supervisor +-spec(start_link() -> {ok, pid()}). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd:env(mqtt)]). + +%% @doc Start a WebSocket Client +-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). +start_client(WsPid, Req, ReplyChannel) -> + supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]). + +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- + +init([Env]) -> + {ok, {{simple_one_for_one, 0, 1}, + [{ws_client, {emqttd_ws_client, start_link, [Env]}, + temporary, 5000, worker, [emqttd_ws_client]}]}}. + diff --git a/test/esockd_access.erl b/test/esockd_access.erl deleted file mode 100644 index 8fcafed4b..000000000 --- a/test/esockd_access.erl +++ /dev/null @@ -1,140 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(esockd_access). - --type cidr() :: string(). - --type rule() :: {allow, all} | - {allow, cidr()} | - {deny, all} | - {deny, cidr()}. - --type range() :: {cidr(), pos_integer(), pos_integer()}. - --export_type([cidr/0, range/0, rule/0]). - --type range_rule() :: {allow, all} | - {allow, range()} | - {deny, all} | - {deny, range()}. - --export([rule/1, match/2, range/1, mask/1, atoi/1, itoa/1]). - -%%------------------------------------------------------------------------------ -%% @doc -%% Build CIDR, Make rule. -%% -%% @end -%%------------------------------------------------------------------------------ --spec rule(rule()) -> range_rule(). -rule({allow, all}) -> - {allow, all}; -rule({allow, CIDR}) when is_list(CIDR) -> - rule(allow, CIDR); -rule({deny, CIDR}) when is_list(CIDR) -> - rule(deny, CIDR); -rule({deny, all}) -> - {deny, all}. - -rule(Type, CIDR) when is_list(CIDR) -> - {Start, End} = range(CIDR), {Type, {CIDR, Start, End}}. - -%%------------------------------------------------------------------------------ -%% @doc -%% Match Addr with Access Rules. -%% -%% @end -%%------------------------------------------------------------------------------ --spec match(inet:ip_address(), [range_rule()]) -> {matched, allow} | {matched, deny} | nomatch. -match(Addr, Rules) when is_tuple(Addr) -> - match2(atoi(Addr), Rules). - -match2(_I, []) -> - nomatch; -match2(_I, [{allow, all}|_]) -> - {matched, allow}; -match2(I, [{allow, {_, Start, End}}|_]) when I >= Start, I =< End -> - {matched, allow}; -match2(I, [{allow, {_, _Start, _End}}|Rules]) -> - match2(I, Rules); -match2(I, [{deny, {_, Start, End}}|_]) when I >= Start, I =< End -> - {matched, deny}; -match2(I, [{deny, {_, _Start, _End}}|Rules]) -> - match2(I, Rules); -match2(_I, [{deny, all}|_]) -> - {matched, deny}. - -%%------------------------------------------------------------------------------ -%% @doc -%% CIDR range. -%% -%% @end -%%------------------------------------------------------------------------------ --spec range(cidr()) -> {pos_integer(), pos_integer()}. -range(CIDR) -> - case string:tokens(CIDR, "/") of - [Addr] -> - {ok, IP} = inet:getaddr(Addr, inet), - {atoi(IP), atoi(IP)}; - [Addr, Mask] -> - {ok, IP} = inet:getaddr(Addr, inet), - {Start, End} = subnet(IP, mask(list_to_integer(Mask))), - {Start, End} - end. - -subnet(IP, Mask) -> - Start = atoi(IP) band Mask, - End = Start bor (Mask bxor 16#FFFFFFFF), - {Start, End}. - -%%------------------------------------------------------------------------------ -%% @doc -%% Mask Int -%% -%% @end -%%------------------------------------------------------------------------------ --spec mask(0..32) -> 0..16#FFFFFFFF. -mask(0) -> - 16#00000000; -mask(32) -> - 16#FFFFFFFF; -mask(N) when N >= 1, N =< 31 -> - lists:foldl(fun(I, Mask) -> (1 bsl I) bor Mask end, 0, lists:seq(32 - N, 31)). - -%%------------------------------------------------------------------------------ -%% @doc -%% Addr to Integer. -%% -%% @end -%%------------------------------------------------------------------------------ -atoi({A, B, C, D}) -> - (A bsl 24) + (B bsl 16) + (C bsl 8) + D. - -%%------------------------------------------------------------------------------ -%% @doc -%% Integer to Addr. -%% -%% @end -%%------------------------------------------------------------------------------ -itoa(I) -> - A = (I bsr 24) band 16#FF, - B = (I bsr 16) band 16#FF, - C = (I bsr 8) band 16#FF, - D = I band 16#FF, - {A, B, C, D}. - -