This commit is contained in:
Ery Lee 2015-03-02 12:01:19 +08:00
parent e64f2c02fe
commit 4865afcda9
2 changed files with 18 additions and 17 deletions

View File

@ -54,7 +54,6 @@
}).
start_link(SockArgs) ->
io:format("start_link: ~p~n", [SockArgs]),
{ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}.
info(Pid) ->
@ -122,9 +121,9 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = Pee
lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]),
{noreply, State};
handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) ->
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]),
KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}),
KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
{noreply, State#state{ keepalive = KeepAlive }};
handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) ->

View File

@ -26,29 +26,31 @@
-export([new/3, resume/1, cancel/1]).
-record(keepalive, {socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
-record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
%%
%% @doc create a keepalive.
%%
new(Socket, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
{ok, [{recv_oct, RecvOct}]} = inet:getstat(Socket, [recv_oct]),
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
{ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]),
Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
#keepalive { socket = Socket,
recv_oct = RecvOct,
timeout_sec = TimeoutSec,
timeout_msg = TimeoutMsg,
timer_ref = Ref }.
#keepalive {transport = Transport,
socket = Socket,
recv_oct = RecvOct,
timeout_sec = TimeoutSec,
timeout_msg = TimeoutMsg,
timer_ref = Ref }.
%%
%% @doc try to resume keepalive, called when timeout.
%%
resume(KeepAlive = #keepalive { socket = Socket,
recv_oct = RecvOct,
timeout_sec = TimeoutSec,
timeout_msg = TimeoutMsg,
timer_ref = Ref }) ->
{ok, [{recv_oct, NewRecvOct}]} = inet:getstat(Socket, [recv_oct]),
resume(KeepAlive = #keepalive {transport = Transport,
socket = Socket,
recv_oct = RecvOct,
timeout_sec = TimeoutSec,
timeout_msg = TimeoutMsg,
timer_ref = Ref }) ->
{ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]),
if
NewRecvOct =:= RecvOct ->
timeout;