diff --git a/apps/emqttd/priv/www/mqtt.html b/apps/emqttd/priv/www/index.html
similarity index 95%
rename from apps/emqttd/priv/www/mqtt.html
rename to apps/emqttd/priv/www/index.html
index 44dce6e08..d84f633cd 100644
--- a/apps/emqttd/priv/www/mqtt.html
+++ b/apps/emqttd/priv/www/index.html
@@ -20,7 +20,7 @@
}
// Create a client instance
- client = new Paho.MQTT.Client(location.hostname, Number(location.port), "/mqtt/wsocket", "clientId");
+ client = new Paho.MQTT.Client(location.hostname, Number(location.port), "/mqtt", "clientId");
// set callback handlers
client.onConnectionLost = onConnectionLost;
@@ -34,6 +34,7 @@
// called when the client connects
function onConnect() {
alert("connected"),
+ $('connstate').innerHTML = 'CONNECTED';
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("/World");
diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl
index 1dcad8bae..a56c89495 100644
--- a/apps/emqttd/src/emqttd.erl
+++ b/apps/emqttd/src/emqttd.erl
@@ -71,7 +71,7 @@ open_listener({mqtts, Port, Options}) ->
%% open http port
open_listener({http, Port, Options}) ->
- MFArgs = {emqttd_http, handle_req, []},
+ MFArgs = {emqttd_http, handle_request, []},
mochiweb:start_http(Port, Options, MFArgs).
open_listener(Protocol, Port, Options) ->
diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl
index 123ad703b..2e2645850 100644
--- a/apps/emqttd/src/emqttd_client.erl
+++ b/apps/emqttd/src/emqttd_client.erl
@@ -67,8 +67,9 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
{ok, Peername} = emqttd_net:peername(Sock),
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
lager:info("Connect from ~s", [ConnStr]),
+ SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
ParserState = emqtt_parser:init(PacketOpts),
- ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts),
+ ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts),
State = control_throttle(#state{transport = Transport,
socket = NewSock,
peername = Peername,
@@ -200,7 +201,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
stop(Reason, State#state{proto_state = ProtoState1})
end;
{error, Error} ->
- lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]),
+ lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]),
stop({shutdown, Error}, State)
end.
diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl
index 86c93c6e1..4d60fb30a 100644
--- a/apps/emqttd/src/emqttd_http.erl
+++ b/apps/emqttd/src/emqttd_http.erl
@@ -34,12 +34,15 @@
-import(proplists, [get_value/2, get_value/3]).
--export([handle_req/1]).
+-export([handle_request/1]).
-handle_req(Req) ->
- handle_req(Req:get(method), Req:get(path), Req).
+handle_request(Req) ->
+ handle_request(Req:get(method), Req:get(path), Req).
-handle_req('POST', "/mqtt/publish", Req) ->
+%%------------------------------------------------------------------------------
+%% HTTP Publish API
+%%------------------------------------------------------------------------------
+handle_request('POST', "/mqtt/publish", Req) ->
Params = mochiweb_request:parse_post(Req),
lager:info("HTTP Publish: ~p", [Params]),
case authorized(Req) of
@@ -64,21 +67,33 @@ handle_req('POST', "/mqtt/publish", Req) ->
Req:respond({401, [], <<"Fobbiden">>})
end;
-handle_req(_Method, "/mqtt/wsocket", Req) ->
+%%------------------------------------------------------------------------------
+%% MQTT Over WebSocket
+%%------------------------------------------------------------------------------
+handle_request('GET', "/mqtt", Req) ->
lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
- Up = Req:get_header_value("Upgrade"),
- case Up =/= undefined andalso string:to_lower(Up) =:= "websocket" of
- true ->
- emqttd_websocket:start_link(Req);
- false ->
- Req:respond({400, [], <<"Bad Request">>})
+ Upgrade = Req:get_header_value("Upgrade"),
+ Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
+ case {is_websocket(Upgrade), Proto} of
+ {true, "mqtt" ++ _Vsn} ->
+ emqttd_ws_client:start_link(Req);
+ {false, _} ->
+ lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
+ Req:respond({400, [], <<"Bad Request">>});
+ {_, Proto} ->
+ lager:error("WebSocket with error Protocol: ~s", [Proto]),
+ Req:respond({400, [], <<"Bad WebSocket Protocol">>})
end;
-handle_req('GET', "/" ++ File, Req) ->
+%%------------------------------------------------------------------------------
+%% Get static files
+%%------------------------------------------------------------------------------
+handle_request('GET', "/" ++ File, Req) ->
lager:info("HTTP GET File: ~s", [File]),
mochiweb_request:serve_file(File, docroot(), Req);
-handle_req(_Method, _Path, Req) ->
+handle_request(Method, Path, Req) ->
+ lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
Req:not_found().
%%------------------------------------------------------------------------------
@@ -113,6 +128,9 @@ int(S) -> list_to_integer(S).
bool("0") -> false;
bool("1") -> true.
+is_websocket(Upgrade) ->
+ Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+
docroot() ->
{file, Here} = code:is_loaded(?MODULE),
Dir = filename:dirname(filename:dirname(Here)),
diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl
index cd9485aee..a979f10b7 100644
--- a/apps/emqttd/src/emqttd_protocol.erl
+++ b/apps/emqttd/src/emqttd_protocol.erl
@@ -34,7 +34,7 @@
-include("emqttd.hrl").
%% API
--export([init/2, clientid/1]).
+-export([init/3, clientid/1]).
-export([received/2, send/2, redeliver/2, shutdown/2]).
@@ -42,9 +42,8 @@
%% Protocol State
-record(proto_state, {
- transport,
- socket,
peername,
+ sendfun,
connected = false, %received CONNECT action?
proto_ver,
proto_name,
@@ -59,12 +58,12 @@
-type proto_state() :: #proto_state{}.
-init({Transport, Socket, Peername}, Opts) ->
+init(Peername, SendFun, Opts) ->
+ MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
#proto_state{
- transport = Transport,
- socket = Socket,
peername = Peername,
- max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}.
+ sendfun = SendFun,
+ max_clientid_len = MaxLen}.
clientid(#proto_state{clientid = ClientId}) -> ClientId.
@@ -231,22 +230,13 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
{Message1, NewSession} = emqttd_session:store(Session, Message),
send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
-send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) ->
+send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when is_record(Packet, mqtt_packet) ->
trace(send, Packet, State),
sent_stats(Packet),
Data = emqtt_serialiser:serialise(Packet),
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
emqttd_metrics:inc('bytes/sent', size(Data)),
- if
- is_function(Transport) ->
- io:format("Transport Fun: ~p~n", [Transport]),
- try
- Transport(Data)
- catch
- _:Error -> io:format("Transport send error: ~p~n", [Error])
- end;
- true -> Transport:send(Sock, Data)
- end,
+ SendFun(Data),
{ok, State}.
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
diff --git a/apps/emqttd/src/emqttd_websocket.erl b/apps/emqttd/src/emqttd_websocket.erl
deleted file mode 100644
index d537b4109..000000000
--- a/apps/emqttd/src/emqttd_websocket.erl
+++ /dev/null
@@ -1,171 +0,0 @@
-%%%-----------------------------------------------------------------------------
-%%% @Copyright (C) 2012-2015, Feng Lee
-%%%
-%%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%%% of this software and associated documentation files (the "Software"), to deal
-%%% in the Software without restriction, including without limitation the rights
-%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%%% copies of the Software, and to permit persons to whom the Software is
-%%% furnished to do so, subject to the following conditions:
-%%%
-%%% The above copyright notice and this permission notice shall be included in all
-%%% copies or substantial portions of the Software.
-%%%
-%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%%% SOFTWARE.
-%%%-----------------------------------------------------------------------------
-%%% @doc
-%%% emqttd websocket client.
-%%%
-%%% @end
-%%%-----------------------------------------------------------------------------
-
--module(emqttd_websocket).
-
--author("Feng Lee ").
-
--include_lib("emqtt/include/emqtt.hrl").
-
--include_lib("emqtt/include/emqtt_packet.hrl").
-
--behaviour(gen_server).
-
--define(SERVER, ?MODULE).
-
--export([start_link/1, ws_loop/3]).
-
-%% gen_server Function Exports
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--record(wsocket_state, {client, parser_state}).
-
--record(client_state, {request, reply_channel, proto_state, keepalive}).
-
--define(PACKET_OPTS, [{max_clientid_len, 1024},
- {max_packet_size, 4096}]).
-
-start_link(Req) ->
- {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
- {ok, Client} = gen_server:start_link(?MODULE, [Req, ReplyChannel], []),
- ReentryWs(#wsocket_state{client = Client,
- parser_state = emqtt_parser:init(?PACKET_OPTS)}).
-
-ws_loop(<<>>, State, _ReplyChannel) ->
- State;
-ws_loop(Payload, State = #wsocket_state{client = Client, parser_state = ParserState}, ReplyChannel) ->
- io:format("Received data: ~p~n", [Payload]),
- case catch emqtt_parser:parse(iolist_to_binary(Payload), ParserState) of
- {more, ParserState1} ->
- State#wsocket_state{parser_state = ParserState1};
- {ok, Packet, Rest} ->
- Client ! {received, Packet},
- ws_loop(Rest, State#wsocket_state{parser_state = emqtt_parser:init(?PACKET_OPTS)}, ReplyChannel);
- {error, Error} ->
- lager:error("MQTT detected framing error ~p~n", [Error]),
- exit({shutdown, Error});
- Exit ->
- lager:error("MQTT detected error ~p~n", [Exit])
- end.
-
-%%%=============================================================================
-%%% gen_server callbacks
-%%%=============================================================================
-
-init([Req, ReplyChannel]) ->
- %%TODO: Redesign later...
- Socket = Req:get(socket),
- {ok, Peername} = emqttd_net:peername(Socket),
- SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
- ProtoState = emqttd_protocol:init({SendFun, Socket, Peername}, ?PACKET_OPTS),
- {ok, #client_state{request = Req, reply_channel = ReplyChannel,
- proto_state = ProtoState}}.
-
-handle_call(_Req, _From, State) ->
- {reply, error, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
- io:format("Packet Received: ~p~n", [Packet]),
- case emqttd_protocol:received(Packet, ProtoState) of
- {ok, ProtoState1} ->
- {noreply, State#client_state{proto_state = ProtoState1}};
- {error, Error} ->
- lager:error("MQTT protocol error ~p", [Error]),
- stop({shutdown, Error}, State);
- {error, Error, ProtoState1} ->
- stop({shutdown, Error}, State#client_state{proto_state = ProtoState1});
- {stop, Reason, ProtoState1} ->
- stop(Reason, State#client_state{proto_state = ProtoState1})
- end;
-
-%%TODO: ok??
-handle_info({dispatch, {From, Messages}}, #client_state{proto_state = ProtoState} = State) when is_list(Messages) ->
- ProtoState1 =
- lists:foldl(fun(Message, PState) ->
- {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
- end, ProtoState, Messages),
- {noreply, State#client_state{proto_state = ProtoState1}};
-
-handle_info({dispatch, {From, Message}}, #client_state{proto_state = ProtoState} = State) ->
- {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
- {noreply, State#client_state{proto_state = ProtoState1}};
-
-handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) ->
- {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
- {noreply, State#client_state{proto_state = ProtoState1}};
-
-handle_info({stop, duplicate_id, _NewPid}, State=#client_state{proto_state = ProtoState}) ->
- %% TODO: to...
- %% need transfer data???
- %% emqttd_client:transfer(NewPid, Data),
- lager:error("Shutdown for duplicate clientid: ~s",
- [emqttd_protocol:clientid(ProtoState)]),
- stop({shutdown, duplicate_id}, State);
-
-handle_info({keepalive, start, _TimeoutSec}, State = #client_state{}) ->
- %lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
- KeepAlive = undefined, %emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
- {noreply, State#client_state{ keepalive = KeepAlive }};
-
-handle_info({keepalive, timeout}, State = #client_state{keepalive = KeepAlive}) ->
- case emqttd_keepalive:resume(KeepAlive) of
- timeout ->
- %lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
- stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
- {resumed, KeepAlive1} ->
- %lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
- {noreply, State#client_state{keepalive = KeepAlive1}}
- end;
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(Reason, #client_state{keepalive = KeepAlive, proto_state = ProtoState}) ->
- lager:debug("~p terminated, reason: ~p~n", [self(), Reason]),
- emqttd_keepalive:cancel(KeepAlive),
- case {ProtoState, Reason} of
- {undefined, _} -> ok;
- {_, {shutdown, Error}} ->
- emqttd_protocol:shutdown(Error, ProtoState);
- {_, _} ->
- ok
- end.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%%=============================================================================
-%%% Internal functions
-%%%=============================================================================
-
-stop(Reason, State ) ->
- {stop, Reason, State}.
-
diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl
new file mode 100644
index 000000000..4742cc65f
--- /dev/null
+++ b/apps/emqttd/src/emqttd_ws_client.erl
@@ -0,0 +1,192 @@
+%%%-----------------------------------------------------------------------------
+%%% @Copyright (C) 2012-2015, Feng Lee
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd websocket client.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+
+-module(emqttd_ws_client).
+
+-author("Feng Lee ").
+
+-include_lib("emqtt/include/emqtt.hrl").
+
+-include_lib("emqtt/include/emqtt_packet.hrl").
+
+-behaviour(gen_server).
+
+%% API Exports
+-export([start_link/1, ws_loop/3]).
+
+%% gen_server Function Exports
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% WebSocket loop state
+-record(wsocket_state, {request,
+ client_pid,
+ packet_opts,
+ parser_state}).
+
+%% Client state
+-record(state, {ws_pid, request, proto_state, keepalive}).
+
+%%------------------------------------------------------------------------------
+%% @doc Start WebSocket client.
+%% @end
+%%------------------------------------------------------------------------------
+start_link(Req) ->
+ {ReentryWs, ReplyChannel} = upgrade(Req),
+ {ok, PktOpts} = application:get_env(emqttd, mqtt_packet),
+ {ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []),
+ ReentryWs(#wsocket_state{request = Req,
+ client_pid = ClientPid,
+ packet_opts = PktOpts,
+ parser_state = emqtt_parser:init(PktOpts)}).
+
+%%------------------------------------------------------------------------------
+%% @private
+%% @doc Start WebSocket client.
+%% @end
+%%------------------------------------------------------------------------------
+upgrade(Req) ->
+ mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3).
+
+%%------------------------------------------------------------------------------
+%% @doc WebSocket frame receive loop.
+%% @end
+%%------------------------------------------------------------------------------
+ws_loop(<<>>, State, _ReplyChannel) ->
+ State;
+ws_loop([<<>>], State, _ReplyChannel) ->
+ State;
+ws_loop(Data, State = #wsocket_state{request = Req,
+ client_pid = ClientPid,
+ parser_state = ParserState}, ReplyChannel) ->
+ Peer = Req:get(peer),
+ lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]),
+ case emqtt_parser:parse(iolist_to_binary(Data), ParserState) of
+ {more, ParserState1} ->
+ State#wsocket_state{parser_state = ParserState1};
+ {ok, Packet, Rest} ->
+ gen_server:cast(ClientPid, {received, Packet}),
+ ws_loop(Rest, reset_parser(State), ReplyChannel);
+ {error, Error} ->
+ lager:error("MQTT(WebSocket) detected framing error ~p for connection ~s", [Error, Peer]),
+ exit({shutdown, Error})
+ end.
+
+reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
+ State#wsocket_state{parser_state = emqtt_parser:init(PktOpts)}.
+
+%%%=============================================================================
+%%% gen_fsm callbacks
+%%%=============================================================================
+
+init([WsPid, Req, ReplyChannel, PktOpts]) ->
+ process_flag(trap_exit, true),
+ Socket = Req:get(socket),
+ {ok, Peername} = emqttd_net:peername(Socket),
+ SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
+ ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
+ {ok, #state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
+
+handle_call(_Req, _From, State) ->
+ {reply, error, State}.
+
+handle_cast({received, Packet}, State = #state{proto_state = ProtoState}) ->
+ case emqttd_protocol:received(Packet, ProtoState) of
+ {ok, ProtoState1} ->
+ {noreply, State#state{proto_state = ProtoState1}};
+ {error, Error} ->
+ lager:error("MQTT protocol error ~p", [Error]),
+ stop({shutdown, Error}, State);
+ {error, Error, ProtoState1} ->
+ stop({shutdown, Error}, State#state{proto_state = ProtoState1});
+ {stop, Reason, ProtoState1} ->
+ stop(Reason, State#state{proto_state = ProtoState1})
+ end;
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({dispatch, {From, Messages}}, #state{proto_state = ProtoState} = State) when is_list(Messages) ->
+ ProtoState1 =
+ lists:foldl(fun(Message, PState) ->
+ {ok, PState1} = emqttd_protocol:send({From, Message}, PState), PState1
+ end, ProtoState, Messages),
+ {noreply, State#state{proto_state = ProtoState1}};
+
+handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = State) ->
+ {ok, ProtoState1} = emqttd_protocol:send({From, Message}, ProtoState),
+ {noreply, State#state{proto_state = ProtoState1}};
+
+handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
+ {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
+ {noreply, State#state{proto_state = ProtoState1}};
+
+handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState}) ->
+ lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]),
+ stop({shutdown, duplicate_id}, State);
+
+handle_info({keepalive, start, TimeoutSec}, State = #state{request = Req}) ->
+ lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]),
+ %%TODO: fix esockd_transport...
+ KeepAlive = emqttd_keepalive:new({esockd_transport, Req:get(socket)},
+ TimeoutSec, {keepalive, timeout}),
+ {noreply, State#state{keepalive = KeepAlive}};
+
+handle_info({keepalive, timeout}, State = #state{request = Req, keepalive = KeepAlive}) ->
+ case emqttd_keepalive:resume(KeepAlive) of
+ timeout ->
+ lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
+ stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
+ {resumed, KeepAlive1} ->
+ lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
+ {noreply, State#state{keepalive = KeepAlive1}}
+ end;
+
+handle_info({'EXIT', WsPid, Reason}, State = #state{ws_pid = WsPid}) ->
+ stop(Reason, State);
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(Reason, #state{proto_state = ProtoState, keepalive = KeepAlive}) ->
+ emqttd_keepalive:cancel(KeepAlive),
+ case Reason of
+ {shutdown, Error} ->
+ emqttd_protocol:shutdown(Error, ProtoState);
+ _ -> ok
+ end.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
+stop(Reason, State ) ->
+ {stop, Reason, State}.
+