diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 2d5ccf55a..3add439f5 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -111,8 +111,8 @@ handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) -> lager:debug("RECV from ~s: ~p", [PeerName, Data]), - process_received_bytes( - Data, control_throttle(State #state{ await_recv = false })); + process_received_bytes(Data, + control_throttle(State #state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); @@ -167,10 +167,10 @@ process_received_bytes(Bytes, State = #state{parse_state = ParseState, case emqtt_parser:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, - control_throttle( State #state{parse_state = ParseState1}), + control_throttle(State #state{parse_state = ParseState1}), hibernate}; {ok, Packet, Rest} -> - case emqtt_protocol:handle_packet(Packet, ProtoState) of + case emqtt_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(), proto_state = ProtoState1}); @@ -198,7 +198,7 @@ run_socket(State = #state{await_recv = true}) -> State; run_socket(State = #state{transport = Transport, socket = Sock}) -> Transport:async_recv(Sock, 0, infinity), - State#state{ await_recv = true }. + State#state{await_recv = true}. control_throttle(State = #state{conn_state = Flow, conserve = Conserve}) -> diff --git a/apps/emqtt/src/emqtt_keepalive.erl b/apps/emqtt/src/emqtt_keepalive.erl index fa72722c2..9dacb4b7d 100644 --- a/apps/emqtt/src/emqtt_keepalive.erl +++ b/apps/emqtt/src/emqtt_keepalive.erl @@ -1,25 +1,29 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt keepalive. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_keepalive). -author('feng@emqtt.io'). @@ -28,9 +32,12 @@ -record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). +%%------------------------------------------------------------------------------ +%% @doc +%% Create a keepalive. %% -%% @doc create a keepalive. -%% +%% @end +%%------------------------------------------------------------------------------ new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg), @@ -39,11 +46,14 @@ new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> recv_oct = RecvOct, timeout_sec = TimeoutSec, timeout_msg = TimeoutMsg, - timer_ref = Ref }. + timer_ref = Ref}. +%%------------------------------------------------------------------------------ +%% @doc +%% Try to resume keepalive, called when timeout. %% -%% @doc try to resume keepalive, called when timeout. -%% +%% @end +%%------------------------------------------------------------------------------ resume(KeepAlive = #keepalive {transport = Transport, socket = Socket, recv_oct = RecvOct, @@ -58,13 +68,16 @@ resume(KeepAlive = #keepalive {transport = Transport, %need? cancel(Ref), NewRef = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg), - {resumed, KeepAlive#keepalive { recv_oct = NewRecvOct, timer_ref = NewRef }} + {resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}} end. +%%------------------------------------------------------------------------------ +%% @doc +%% Cancel Keepalive. %% -%% @doc cancel keepalive -%% -cancel(#keepalive { timer_ref = Ref }) -> +%% @end +%%------------------------------------------------------------------------------ +cancel(#keepalive{timer_ref = Ref}) -> cancel(Ref); cancel(undefined) -> undefined;