diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 4eb5bc490..734203fa8 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -54,7 +54,6 @@ }). start_link(SockArgs) -> - io:format("start_link: ~p~n", [SockArgs]), {ok, proc_lib:spawn_link(?MODULE, init, [SockArgs])}. info(Pid) -> @@ -122,9 +121,9 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = Pee lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]), {noreply, State}; -handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) -> +handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> lager:info("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]), - KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}), + KeepAlive = emqtt_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), {noreply, State#state{ keepalive = KeepAlive }}; handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) -> diff --git a/apps/emqtt/src/emqtt_keepalive.erl b/apps/emqtt/src/emqtt_keepalive.erl index 7dd7be0f8..fa72722c2 100644 --- a/apps/emqtt/src/emqtt_keepalive.erl +++ b/apps/emqtt/src/emqtt_keepalive.erl @@ -26,29 +26,31 @@ -export([new/3, resume/1, cancel/1]). --record(keepalive, {socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). +-record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). %% %% @doc create a keepalive. %% -new(Socket, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> - {ok, [{recv_oct, RecvOct}]} = inet:getstat(Socket, [recv_oct]), +new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> + {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg), - #keepalive { socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }. + #keepalive {transport = Transport, + socket = Socket, + recv_oct = RecvOct, + timeout_sec = TimeoutSec, + timeout_msg = TimeoutMsg, + timer_ref = Ref }. %% %% @doc try to resume keepalive, called when timeout. %% -resume(KeepAlive = #keepalive { socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }) -> - {ok, [{recv_oct, NewRecvOct}]} = inet:getstat(Socket, [recv_oct]), +resume(KeepAlive = #keepalive {transport = Transport, + socket = Socket, + recv_oct = RecvOct, + timeout_sec = TimeoutSec, + timeout_msg = TimeoutMsg, + timer_ref = Ref }) -> + {ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), if NewRecvOct =:= RecvOct -> timeout;