From 69611b234d8fef1b088f979ce6549ce6bbbfac22 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 29 Apr 2015 02:19:46 +0800 Subject: [PATCH] websocket support --- .../emqttd/priv/www/{mqtt.html => index.html} | 3 +- apps/emqttd/src/emqttd.erl | 2 +- apps/emqttd/src/emqttd_client.erl | 5 +- apps/emqttd/src/emqttd_http.erl | 44 ++-- apps/emqttd/src/emqttd_protocol.erl | 26 +-- apps/emqttd/src/emqttd_websocket.erl | 171 ---------------- apps/emqttd/src/emqttd_ws_client.erl | 192 ++++++++++++++++++ 7 files changed, 237 insertions(+), 206 deletions(-) rename apps/emqttd/priv/www/{mqtt.html => index.html} (95%) delete mode 100644 apps/emqttd/src/emqttd_websocket.erl create mode 100644 apps/emqttd/src/emqttd_ws_client.erl diff --git a/apps/emqttd/priv/www/mqtt.html b/apps/emqttd/priv/www/index.html similarity index 95% rename from apps/emqttd/priv/www/mqtt.html rename to apps/emqttd/priv/www/index.html index 44dce6e08..d84f633cd 100644 --- a/apps/emqttd/priv/www/mqtt.html +++ b/apps/emqttd/priv/www/index.html @@ -20,7 +20,7 @@ } // Create a client instance - client = new Paho.MQTT.Client(location.hostname, Number(location.port), "/mqtt/wsocket", "clientId"); + client = new Paho.MQTT.Client(location.hostname, Number(location.port), "/mqtt", "clientId"); // set callback handlers client.onConnectionLost = onConnectionLost; @@ -34,6 +34,7 @@ // called when the client connects function onConnect() { alert("connected"), + $('connstate').innerHTML = 'CONNECTED'; // Once a connection has been made, make a subscription and send a message. console.log("onConnect"); client.subscribe("/World"); diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index 1dcad8bae..a56c89495 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -71,7 +71,7 @@ open_listener({mqtts, Port, Options}) -> %% open http port open_listener({http, Port, Options}) -> - MFArgs = {emqttd_http, handle_req, []}, + MFArgs = {emqttd_http, handle_request, []}, mochiweb:start_http(Port, Options, MFArgs). open_listener(Protocol, Port, Options) -> diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 123ad703b..2e2645850 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -67,8 +67,9 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> {ok, Peername} = emqttd_net:peername(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), + SendFun = fun(Data) -> Transport:send(NewSock, Data) end, ParserState = emqtt_parser:init(PacketOpts), - ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts), + ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts), State = control_throttle(#state{transport = Transport, socket = NewSock, peername = Peername, @@ -200,7 +201,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> - lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), + lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]), stop({shutdown, Error}, State) end. diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 86c93c6e1..4d60fb30a 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -34,12 +34,15 @@ -import(proplists, [get_value/2, get_value/3]). --export([handle_req/1]). +-export([handle_request/1]). -handle_req(Req) -> - handle_req(Req:get(method), Req:get(path), Req). +handle_request(Req) -> + handle_request(Req:get(method), Req:get(path), Req). -handle_req('POST', "/mqtt/publish", Req) -> +%%------------------------------------------------------------------------------ +%% HTTP Publish API +%%------------------------------------------------------------------------------ +handle_request('POST', "/mqtt/publish", Req) -> Params = mochiweb_request:parse_post(Req), lager:info("HTTP Publish: ~p", [Params]), case authorized(Req) of @@ -64,21 +67,33 @@ handle_req('POST', "/mqtt/publish", Req) -> Req:respond({401, [], <<"Fobbiden">>}) end; -handle_req(_Method, "/mqtt/wsocket", Req) -> +%%------------------------------------------------------------------------------ +%% MQTT Over WebSocket +%%------------------------------------------------------------------------------ +handle_request('GET', "/mqtt", Req) -> lager:info("Websocket Connection from: ~s", [Req:get(peer)]), - Up = Req:get_header_value("Upgrade"), - case Up =/= undefined andalso string:to_lower(Up) =:= "websocket" of - true -> - emqttd_websocket:start_link(Req); - false -> - Req:respond({400, [], <<"Bad Request">>}) + Upgrade = Req:get_header_value("Upgrade"), + Proto = Req:get_header_value("Sec-WebSocket-Protocol"), + case {is_websocket(Upgrade), Proto} of + {true, "mqtt" ++ _Vsn} -> + emqttd_ws_client:start_link(Req); + {false, _} -> + lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), + Req:respond({400, [], <<"Bad Request">>}); + {_, Proto} -> + lager:error("WebSocket with error Protocol: ~s", [Proto]), + Req:respond({400, [], <<"Bad WebSocket Protocol">>}) end; -handle_req('GET', "/" ++ File, Req) -> +%%------------------------------------------------------------------------------ +%% Get static files +%%------------------------------------------------------------------------------ +handle_request('GET', "/" ++ File, Req) -> lager:info("HTTP GET File: ~s", [File]), mochiweb_request:serve_file(File, docroot(), Req); -handle_req(_Method, _Path, Req) -> +handle_request(Method, Path, Req) -> + lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), Req:not_found(). %%------------------------------------------------------------------------------ @@ -113,6 +128,9 @@ int(S) -> list_to_integer(S). bool("0") -> false; bool("1") -> true. +is_websocket(Upgrade) -> + Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". + docroot() -> {file, Here} = code:is_loaded(?MODULE), Dir = filename:dirname(filename:dirname(Here)), diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index cd9485aee..a979f10b7 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -34,7 +34,7 @@ -include("emqttd.hrl"). %% API --export([init/2, clientid/1]). +-export([init/3, clientid/1]). -export([received/2, send/2, redeliver/2, shutdown/2]). @@ -42,9 +42,8 @@ %% Protocol State -record(proto_state, { - transport, - socket, peername, + sendfun, connected = false, %received CONNECT action? proto_ver, proto_name, @@ -59,12 +58,12 @@ -type proto_state() :: #proto_state{}. -init({Transport, Socket, Peername}, Opts) -> +init(Peername, SendFun, Opts) -> + MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), #proto_state{ - transport = Transport, - socket = Socket, peername = Peername, - max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}. + sendfun = SendFun, + max_clientid_len = MaxLen}. clientid(#proto_state{clientid = ClientId}) -> ClientId. @@ -231,22 +230,13 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = {Message1, NewSession} = emqttd_session:store(Session, Message), send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}); -send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) -> +send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), sent_stats(Packet), Data = emqtt_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/sent', size(Data)), - if - is_function(Transport) -> - io:format("Transport Fun: ~p~n", [Transport]), - try - Transport(Data) - catch - _:Error -> io:format("Transport send error: ~p~n", [Error]) - end; - true -> Transport:send(Sock, Data) - end, + SendFun(Data), {ok, State}. trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> diff --git a/apps/emqttd/src/emqttd_websocket.erl b/apps/emqttd/src/emqttd_websocket.erl deleted file mode 100644 index d537b4109..000000000 --- a/apps/emqttd/src/emqttd_websocket.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% @Copyright (C) 2012-2015, Feng Lee -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd websocket client. -%%% -%%% @end -%%%----------------------------------------------------------------------------- - --module(emqttd_websocket). - --author("Feng Lee "). - --include_lib("emqtt/include/emqtt.hrl"). - --include_lib("emqtt/include/emqtt_packet.hrl"). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - --export([start_link/1, ws_loop/3]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(wsocket_state, {client, parser_state}). - --record(client_state, {request, reply_channel, proto_state, keepalive}). - --define(PACKET_OPTS, [{max_clientid_len, 1024}, - {max_packet_size, 4096}]). - -start_link(Req) -> - {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), - {ok, Client} = gen_server:start_link(?MODULE, [Req, ReplyChannel], []), - ReentryWs(#wsocket_state{client = Client, - parser_state = emqtt_parser:init(?PACKET_OPTS)}). - -ws_loop(<<>>, State, _ReplyChannel) -> - State; -ws_loop(Payload, State = #wsocket_state{client = Client, parser_state = ParserState}, ReplyChannel) -> - io:format("Received data: ~p~n", [Payload]), - case catch emqtt_parser:parse(iolist_to_binary(Payload), ParserState) of - {more, ParserState1} -> - State#wsocket_state{parser_state = ParserState1}; - {ok, Packet, Rest} -> - Client ! {received, Packet}, - ws_loop(Rest, State#wsocket_state{parser_state = emqtt_parser:init(?PACKET_OPTS)}, ReplyChannel); - {error, Error} -> - lager:error("MQTT detected framing error ~p~n", [Error]), - exit({shutdown, Error}); - Exit -> - lager:error("MQTT detected error ~p~n", [Exit]) - end. - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init([Req, ReplyChannel]) -> - %%TODO: Redesign later... - Socket = Req:get(socket), - {ok, Peername} = emqttd_net:peername(Socket), - SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, - ProtoState = emqttd_protocol:init({SendFun, Socket, Peername}, ?PACKET_OPTS), - {ok, #client_state{request = Req, reply_channel = ReplyChannel, - proto_state = ProtoState}}. - -handle_call(_Req, _From, State) -> - {reply, error, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({received, Packet}, State = #client_state{proto_state = ProtoState}) -> - io:format("Packet Received: ~p~n", [Packet]), - case emqttd_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - {noreply, State#client_state{proto_state = ProtoState1}}; - {error, Error} -> - lager:error("MQTT protocol error ~p", [Error]), - stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#client_state{proto_state = ProtoState1}) - end; - -%%TODO: ok?? -handle_info({dispatch, {From, Messages}}, #client_state{proto_state = ProtoState} = State) when is_list(Messages) -> - ProtoState1 = - lists:foldl(fun(Message, PState) -> - {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1 - end, ProtoState, Messages), - {noreply, State#client_state{proto_state = ProtoState1}}; - -handle_info({dispatch, {From, Message}}, #client_state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; - -handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}; - -handle_info({stop, duplicate_id, _NewPid}, State=#client_state{proto_state = ProtoState}) -> - %% TODO: to... - %% need transfer data??? - %% emqttd_client:transfer(NewPid, Data), - lager:error("Shutdown for duplicate clientid: ~s", - [emqttd_protocol:clientid(ProtoState)]), - stop({shutdown, duplicate_id}, State); - -handle_info({keepalive, start, _TimeoutSec}, State = #client_state{}) -> - %lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), - KeepAlive = undefined, %emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), - {noreply, State#client_state{ keepalive = KeepAlive }}; - -handle_info({keepalive, timeout}, State = #client_state{keepalive = KeepAlive}) -> - case emqttd_keepalive:resume(KeepAlive) of - timeout -> - %lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), - stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); - {resumed, KeepAlive1} -> - %lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), - {noreply, State#client_state{keepalive = KeepAlive1}} - end; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(Reason, #client_state{keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:debug("~p terminated, reason: ~p~n", [self(), Reason]), - emqttd_keepalive:cancel(KeepAlive), - case {ProtoState, Reason} of - {undefined, _} -> ok; - {_, {shutdown, Error}} -> - emqttd_protocol:shutdown(Error, ProtoState); - {_, _} -> - ok - end. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= - -stop(Reason, State ) -> - {stop, Reason, State}. - diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl new file mode 100644 index 000000000..4742cc65f --- /dev/null +++ b/apps/emqttd/src/emqttd_ws_client.erl @@ -0,0 +1,192 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd websocket client. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_ws_client). + +-author("Feng Lee "). + +-include_lib("emqtt/include/emqtt.hrl"). + +-include_lib("emqtt/include/emqtt_packet.hrl"). + +-behaviour(gen_server). + +%% API Exports +-export([start_link/1, ws_loop/3]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% WebSocket loop state +-record(wsocket_state, {request, + client_pid, + packet_opts, + parser_state}). + +%% Client state +-record(state, {ws_pid, request, proto_state, keepalive}). + +%%------------------------------------------------------------------------------ +%% @doc Start WebSocket client. +%% @end +%%------------------------------------------------------------------------------ +start_link(Req) -> + {ReentryWs, ReplyChannel} = upgrade(Req), + {ok, PktOpts} = application:get_env(emqttd, mqtt_packet), + {ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []), + ReentryWs(#wsocket_state{request = Req, + client_pid = ClientPid, + packet_opts = PktOpts, + parser_state = emqtt_parser:init(PktOpts)}). + +%%------------------------------------------------------------------------------ +%% @private +%% @doc Start WebSocket client. +%% @end +%%------------------------------------------------------------------------------ +upgrade(Req) -> + mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3). + +%%------------------------------------------------------------------------------ +%% @doc WebSocket frame receive loop. +%% @end +%%------------------------------------------------------------------------------ +ws_loop(<<>>, State, _ReplyChannel) -> + State; +ws_loop([<<>>], State, _ReplyChannel) -> + State; +ws_loop(Data, State = #wsocket_state{request = Req, + client_pid = ClientPid, + parser_state = ParserState}, ReplyChannel) -> + Peer = Req:get(peer), + lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), + case emqtt_parser:parse(iolist_to_binary(Data), ParserState) of + {more, ParserState1} -> + State#wsocket_state{parser_state = ParserState1}; + {ok, Packet, Rest} -> + gen_server:cast(ClientPid, {received, Packet}), + ws_loop(Rest, reset_parser(State), ReplyChannel); + {error, Error} -> + lager:error("MQTT(WebSocket) detected framing error ~p for connection ~s", [Error, Peer]), + exit({shutdown, Error}) + end. + +reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> + State#wsocket_state{parser_state = emqtt_parser:init(PktOpts)}. + +%%%============================================================================= +%%% gen_fsm callbacks +%%%============================================================================= + +init([WsPid, Req, ReplyChannel, PktOpts]) -> + process_flag(trap_exit, true), + Socket = Req:get(socket), + {ok, Peername} = emqttd_net:peername(Socket), + SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end, + ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), + {ok, #state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. + +handle_call(_Req, _From, State) -> + {reply, error, State}. + +handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) -> + case emqttd_protocol:received(Packet, ProtoState) of + {ok, ProtoState1} -> + {noreply, State#state{proto_state = ProtoState1}}; + {error, Error} -> + lager:error("MQTT protocol error ~p", [Error]), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + stop(Reason, State#state{proto_state = ProtoState1}) + end; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) -> + ProtoState1 = + lists:foldl(fun(Message, PState) -> + {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1 + end, ProtoState, Messages), + {noreply, State#state{proto_state = ProtoState1}}; + +handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState), + {noreply, State#state{proto_state = ProtoState1}}; + +handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), + {noreply, State#state{proto_state = ProtoState1}}; + +handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState}) -> + lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), + stop({shutdown, duplicate_id}, State); + +handle_info({keepalive, start, TimeoutSec}, State = #state{request = Req}) -> + lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), + %%TODO: fix esockd_transport... + KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)}, + TimeoutSec, {keepalive, timeout}), + {noreply, State#state{keepalive = KeepAlive}}; + +handle_info({keepalive, timeout}, State = #state{request = Req, keepalive = KeepAlive}) -> + case emqttd_keepalive:resume(KeepAlive) of + timeout -> + lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), + stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); + {resumed, KeepAlive1} -> + lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]), + {noreply, State#state{keepalive = KeepAlive1}} + end; + +handle_info({'EXIT', WsPid, Reason}, State = #state{ws_pid = WsPid}) -> + stop(Reason, State); + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(Reason, #state{proto_state = ProtoState, keepalive = KeepAlive}) -> + emqttd_keepalive:cancel(KeepAlive), + case Reason of + {shutdown, Error} -> + emqttd_protocol:shutdown(Error, ProtoState); + _ -> ok + end. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +stop(Reason, State ) -> + {stop, Reason, State}. +