This commit is contained in:
Ery Lee 2015-03-05 12:09:29 +08:00
parent e8133366e1
commit c6668c6dc9
2 changed files with 49 additions and 36 deletions

View File

@ -111,8 +111,8 @@ handle_info({inet_reply, _Ref, ok}, State) ->
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) ->
lager:debug("RECV from ~s: ~p", [PeerName, Data]),
process_received_bytes(
Data, control_throttle(State #state{ await_recv = false }));
process_received_bytes(Data,
control_throttle(State #state{await_recv = false}));
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State);
@ -167,10 +167,10 @@ process_received_bytes(Bytes, State = #state{parse_state = ParseState,
case emqtt_parser:parse(Bytes, ParseState) of
{more, ParseState1} ->
{noreply,
control_throttle( State #state{parse_state = ParseState1}),
control_throttle(State #state{parse_state = ParseState1}),
hibernate};
{ok, Packet, Rest} ->
case emqtt_protocol:handle_packet(Packet, ProtoState) of
case emqtt_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(),
proto_state = ProtoState1});
@ -198,7 +198,7 @@ run_socket(State = #state{await_recv = true}) ->
State;
run_socket(State = #state{transport = Transport, socket = Sock}) ->
Transport:async_recv(Sock, 0, infinity),
State#state{ await_recv = true }.
State#state{await_recv = true}.
control_throttle(State = #state{conn_state = Flow,
conserve = Conserve}) ->

View File

@ -1,25 +1,29 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt keepalive.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_keepalive).
-author('feng@emqtt.io').
@ -28,9 +32,12 @@
-record(keepalive, {transport, socket, recv_oct, timeout_sec, timeout_msg, timer_ref}).
%%------------------------------------------------------------------------------
%% @doc
%% Create a keepalive.
%%
%% @doc create a keepalive.
%%
%% @end
%%------------------------------------------------------------------------------
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
{ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]),
Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
@ -39,11 +46,14 @@ new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 ->
recv_oct = RecvOct,
timeout_sec = TimeoutSec,
timeout_msg = TimeoutMsg,
timer_ref = Ref }.
timer_ref = Ref}.
%%------------------------------------------------------------------------------
%% @doc
%% Try to resume keepalive, called when timeout.
%%
%% @doc try to resume keepalive, called when timeout.
%%
%% @end
%%------------------------------------------------------------------------------
resume(KeepAlive = #keepalive {transport = Transport,
socket = Socket,
recv_oct = RecvOct,
@ -58,13 +68,16 @@ resume(KeepAlive = #keepalive {transport = Transport,
%need?
cancel(Ref),
NewRef = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg),
{resumed, KeepAlive#keepalive { recv_oct = NewRecvOct, timer_ref = NewRef }}
{resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}}
end.
%%------------------------------------------------------------------------------
%% @doc
%% Cancel Keepalive.
%%
%% @doc cancel keepalive
%%
cancel(#keepalive { timer_ref = Ref }) ->
%% @end
%%------------------------------------------------------------------------------
cancel(#keepalive{timer_ref = Ref}) ->
cancel(Ref);
cancel(undefined) ->
undefined;