From 52c3bc962827a21a43c9e74ffa6ab5c3f87b4d5f Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Sat, 10 Jan 2015 16:46:11 +0800 Subject: [PATCH] KeepAlive --- apps/emqtt/src/emqtt_client.erl | 160 +++++++++++++++-------------- apps/emqtt/src/emqtt_keepalive.erl | 71 +++++++------ apps/emqtt/src/emqtt_net.erl | 10 +- apps/emqtt/src/emqtt_packet.erl | 2 +- apps/emqtt/src/emqtt_protocol.erl | 15 +-- 5 files changed, 140 insertions(+), 118 deletions(-) diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index ade78783c..7561c1915 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -29,120 +29,126 @@ -export([start_link/1, info/1, go/2]). -export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, + handle_call/3, + handle_cast/2, + handle_info/2, code_change/3, - terminate/2]). + terminate/2]). -include("emqtt.hrl"). %%Client State... -record(state, { - socket, - conn_name, - await_recv, - conn_state, - conserve, - parse_state, - proto_state, - keep_alive + socket, + peer_name, + conn_name, + await_recv, + conn_state, + conserve, + parse_state, + proto_state, + keepalive }). start_link(Sock) -> gen_server:start_link(?MODULE, [Sock], []). info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info). go(Pid, Sock) -> - gen_server:call(Pid, {go, Sock}). + gen_server:call(Pid, {go, Sock}). init([Sock]) -> - io:format("client is created: ~p~n", [self()]), - {ok, #state{socket = Sock}, hibernate}. + {ok, #state{socket = Sock}, 1000}. handle_call({go, Sock}, _From, #state{socket = Sock}) -> + {ok, Peername} = emqtt_net:peer_string(Sock), {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), - io:format("conn from ~s~n", [ConnStr]), + lager:debug("connection: ~s~n", [ConnStr]), {reply, ok, control_throttle( #state{ socket = Sock, + peer_name = Peername, conn_name = ConnStr, await_recv = false, conn_state = running, conserve = false, parse_state = emqtt_packet:initial_state(), - proto_state = emqtt_protocol:initial_state(Sock)})}; + proto_state = emqtt_protocol:initial_state(Sock)}), 10000}; handle_call(info, _From, State = #state{ - conn_name=ConnName, - proto_state = ProtoState}) -> - {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; + conn_name=ConnName, proto_state = ProtoState}) -> + {reply, [{conn_name, ConnName} | emqtt_protocol:info(ProtoState)], State}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. + {stop, {badmsg, Msg}, State}. handle_info(timeout, State) -> - stop({shutdown, timeout}, State); + stop({shutdown, timeout}, State); handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName}) -> - %%TODO: - %lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), - stop({shutdown, duplicate_id}, State); + %%TODO: + %lager:error("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), + stop({shutdown, duplicate_id}, State); %%TODO: ok?? -handle_info({dispatch, Msg}, #state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqtt_protocol:send_message(Msg, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; +handle_info({dispatch, Message}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqtt_protocol:send_message(Message, ProtoState), + {noreply, State#state{proto_state = ProtoState1}}; handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) -> process_received_bytes( - Data, control_throttle(State #state{ await_recv = false })); + Data, control_throttle(State #state{ await_recv = false })); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); -%%TODO: HOW TO HANDLE THIS?? handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - {noreply, State}; + lager:critical("unexpected inet_reply '~p'", [Reason]), + {noreply, State}; -handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> - case emqtt_keep_alive:state(KeepAlive) of - idle -> - lager:info("keep_alive timeout: ~p", [State#state.conn_name]), - {stop, normal, State}; - active -> - KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), - {noreply, State#state{keep_alive=KeepAlive1}} - end; +handle_info({keepalive, start, TimeoutSec}, State = #state{socket = Socket}) -> + lager:info("~s keepalive started: ~p", [State#state.peer_name, TimeoutSec]), + KeepAlive = emqtt_keepalive:new(Socket, TimeoutSec, {keepalive, timeout}), + {noreply, State#state{ keepalive = KeepAlive }}; + +handle_info({keepalive, timeout}, State = #state { keepalive = KeepAlive }) -> + case emqtt_keepalive:resume(KeepAlive) of + timeout -> + lager:info("~s keepalive timeout!", [State#state.peer_name]), + {stop, normal, State}; + {resumed, KeepAlive1} -> + lager:info("~s keepalive resumed.", [State#state.peer_name]), + {noreply, State#state{ keepalive = KeepAlive1 }} + end; handle_info(Info, State) -> - lager:error("badinfo :~p",[Info]), - {stop, {badinfo, Info}, State}. + lager:error("badinfo :~p",[Info]), + {stop, {badinfo, Info}, State}. terminate(Reason, #state{proto_state = unefined}) -> io:format("client terminated: ~p, reason: ~p~n", [self(), Reason]), - %%TODO: fix keep_alive... - %%emqtt_keep_alive:cancel(KeepAlive), - %emqtt_protocol:client_terminated(ProtoState), - ok; + %%TODO: fix keep_alive... + %%emqtt_keep_alive:cancel(KeepAlive), + %emqtt_protocol:client_terminated(ProtoState), + ok; -terminate(_Reason, #state{proto_state = ProtoState}) -> - %%TODO: fix keep_alive... - %%emqtt_keep_alive:cancel(KeepAlive), - emqtt_protocol:client_terminated(ProtoState), - ok. +terminate(_Reason, #state { keepalive = KeepAlive, proto_state = ProtoState }) -> + %%TODO: fix keep_alive... + emqtt_keepalive:cancel(KeepAlive), + emqtt_protocol:client_terminated(ProtoState), + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. - + async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); @@ -157,38 +163,38 @@ process_received_bytes(<<>>, State) -> process_received_bytes(Bytes, State = #state{ parse_state = ParseState, - proto_state = ProtoState, + proto_state = ProtoState, conn_name = ConnStr }) -> case emqtt_packet:parse(Bytes, ParseState) of - {more, ParseState1} -> - {noreply, - control_throttle( State #state{ parse_state = ParseState1 }), - hibernate}; - {ok, Packet, Rest} -> - case emqtt_protocol:handle_packet(Packet, ProtoState) of - {ok, ProtoState1} -> - process_received_bytes( - Rest, - State#state{ parse_state = emqtt_packet:initial_state(), - proto_state = ProtoState1 }); - {error, Error} -> - lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), - stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); - {stop, ProtoState1} -> - stop(normal, State#state{proto_state = ProtoState1}) - end; - {error, Error} -> - lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), - stop({shutdown, Error}, State) + {more, ParseState1} -> + {noreply, + control_throttle( State #state{ parse_state = ParseState1 }), + hibernate}; + {ok, Packet, Rest} -> + case emqtt_protocol:handle_packet(Packet, ProtoState) of + {ok, ProtoState1} -> + process_received_bytes( + Rest, + State#state{ parse_state = emqtt_packet:initial_state(), + proto_state = ProtoState1 }); + {error, Error} -> + lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + {stop, ProtoState1} -> + stop(normal, State#state{proto_state = ProtoState1}) + end; + {error, Error} -> + lager:error("MQTT detected framing error ~p for connection ~p~n", [ConnStr, Error]), + stop({shutdown, Error}, State) end. %%---------------------------------------------------------------------------- network_error(Reason, State = #state{ conn_name = ConnStr}) -> lager:error("MQTT detected network error '~p' for ~p", [Reason, ConnStr]), - %%TODO: where to SEND WILL MSG?? + %%TODO: where to SEND WILL MSG?? %%send_will_msg(State), % todo: flush channel after publish stop({shutdown, conn_closed}, State). diff --git a/apps/emqtt/src/emqtt_keepalive.erl b/apps/emqtt/src/emqtt_keepalive.erl index 1106f7460..eef049a6c 100644 --- a/apps/emqtt/src/emqtt_keepalive.erl +++ b/apps/emqtt/src/emqtt_keepalive.erl @@ -24,43 +24,48 @@ -author('feng@emqtt.io'). --export([new/2, - state/1, - activate/1, - reset/1, - cancel/1]). +-export([new/3, resume/1, cancel/1]). --record(keep_alive, {state, period, timer, msg}). +-record(keepalive, {socket, recv_oct, timeout_sec, timeout_msg, timer_ref}). -new(undefined, _) -> - undefined; -new(0, _) -> - undefined; -new(Period, TimeoutMsg) when is_integer(Period) -> - Ref = erlang:send_after(Period, self(), TimeoutMsg), - #keep_alive{state=idle, period=Period, timer=Ref, msg=TimeoutMsg}. +%% +%% @doc create a keepalive. +%% +new(Socket, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> + {ok, [{recv_oct, RecvOct}]} = inet:getstate(Socket, [recv_oct]), + Ref = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg), + #keepalive { socket = Socket, + recv_oct = RecvOct, + timeout_sec = TimeoutSec, + timeout_msg = TimeoutMsg, + timer_ref = Ref }. -state(undefined) -> - undefined; -state(#keep_alive{state=State}) -> - State. - -activate(undefined) -> - undefined; -activate(KeepAlive) when is_record(KeepAlive, keep_alive) -> - KeepAlive#keep_alive{state=active}. - -reset(undefined) -> - undefined; -reset(KeepAlive=#keep_alive{period=Period, timer=Timer, msg=Msg}) -> - catch erlang:cancel_timer(Timer), - Ref = erlang:send_after(Period, self(), Msg), - KeepAlive#keep_alive{state=idle, timer = Ref}. +%% +%% @doc try to resume keepalive, called when timeout. +%% +resume(KeepAlive = #keepalive { socket = Socket, + recv_oct = RecvOct, + timeout_sec = TimeoutSec, + timeout_msg = TimeoutMsg, + timer_ref = Ref }) -> + {ok, [{recv_oct, NewRecvOct}]} = inet:getstate(Socket, [recv_oct]), + if + NewRecvOct =:= RecvOct -> + timeout; + true -> + %need? + cancel(Ref), + NewRef = erlang:send_after(TimeoutSec*1000, self(), TimeoutMsg), + {resumed, KeepAlive#keepalive { recv_oct = NewRecvOct, timer_ref = NewRef }} + end. +%% +%% @doc cancel keepalive +%% +cancel(#keepalive { timer_ref = Ref }) -> + cancel(Ref); cancel(undefined) -> undefined; -cancel(KeepAlive=#keep_alive{timer=Timer}) -> - catch erlang:cancel_timer(Timer), - KeepAlive#keep_alive{timer=undefined}. - +cancel(Ref) -> + catch erlang:cancel_timer(Ref). diff --git a/apps/emqtt/src/emqtt_net.erl b/apps/emqtt/src/emqtt_net.erl index 949f731cf..21d98d704 100644 --- a/apps/emqtt/src/emqtt_net.erl +++ b/apps/emqtt/src/emqtt_net.erl @@ -26,7 +26,7 @@ -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). --export([connection_string/2]). +-export([peername/1, sockname/1, peer_string/1, connection_string/2]). -include_lib("kernel/include/inet.hrl"). @@ -196,6 +196,14 @@ setopts(Sock, Options) when is_port(Sock) -> sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). +peer_string(Sock) -> + case peername(Sock) of + {ok, {Addr, Port}} -> + {ok, lists:flatten(io_lib:format("~s:~p", [maybe_ntoab(Addr), Port]))}; + Error -> + Error + end. + peername(Sock) when is_port(Sock) -> inet:peername(Sock). ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> diff --git a/apps/emqtt/src/emqtt_packet.erl b/apps/emqtt/src/emqtt_packet.erl index 81d07351a..820cf50b2 100644 --- a/apps/emqtt/src/emqtt_packet.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -217,7 +217,7 @@ serialise_variable(#mqtt_packet_header { type = PubAck }, <<>> = _Payload) when PubAck =:= ?PUBACK; PubAck =:= ?PUBREC; PubAck =:= ?PUBREL; PubAck =:= ?PUBCOMP -> - {PacketIdBin = <>, <<>>}; + {<>, <<>>}; serialise_variable(#mqtt_packet_header { }, undefined, diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 2f2cd4e76..2bc27b39c 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -103,7 +103,7 @@ handle_packet(?CONNECT, #mqtt_packet { password = Password, proto_ver = ProtoVersion, clean_sess = CleanSess, - keep_alive = AlivePeriod, + keep_alive = KeepAlive, client_id = ClientId } = Var }, State0 = #proto_state{socket = Sock}) -> @@ -118,19 +118,18 @@ handle_packet(?CONNECT, #mqtt_packet { _ -> case emqtt_auth:check(Username, Password) of false -> - lager:error_MSG("MQTT login failed - no credentials"), + lager:error("MQTT login failed - no credentials"), {?CONNACK_CREDENTIALS, State}; true -> - lager:info("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), - %%TODO: - %%KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), + lager:info("connect from clientid: ~p, keepalive: ", [ClientId, KeepAlive]), + start_keepalive(KeepAlive), emqtt_cm:register(ClientId, self()), {?CONNACK_ACCEPT, State #proto_state{ will_msg = make_will_msg(Var), client_id = ClientId }} end end, - lager:info("recv conn...:~p", [ReturnCode]), + lager:info("[SENT] MQTT CONNACK: ~p", [ReturnCode]), send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?CONNACK }, variable = #mqtt_packet_connack{ return_code = ReturnCode }}), @@ -358,3 +357,7 @@ send_will_msg(#proto_state{will_msg = undefined}) -> send_will_msg(#proto_state{will_msg = WillMsg }) -> emqtt_router:route(WillMsg). +start_keepalive(0) -> ignore; +start_keepalive(Sec) when Sec > 0 -> + self() ! {keepalive, start, Sec * 1.5}. +