diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 801464694..f615a0e68 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -26,10 +26,12 @@ client_id, clean_sess, will_msg, - awaiting_ack, - subscriptions + keep_alive, + awaiting_ack, + subscriptions }). + -define(FRAME_TYPE(Frame, Type), Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). @@ -97,7 +99,7 @@ handle_info({route, Msg}, #state{socket = Sock} = State) -> handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; -handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock }=State) -> +handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) -> process_received_bytes( Data, control_throttle(State #state{ await_recv = false })); @@ -105,13 +107,24 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - error_logger:info_msg("sock error: ~p~n", [Reason]), + ?ERROR("sock error: ~p~n", [Reason]), {noreply, State}; +handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> + case emqtt_keep_alive:state(KeepAlive) of + idle -> + ?INFO("keep alive timeout: ~p", [State#state.conn_name]), + {stop, normal, State}; + active -> + KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), + {noreply, State#state{keep_alive=KeepAlive1}} + end; + handle_info(Info, State) -> {stop, {badinfo, Info}, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{keep_alive=KeepAlive}) -> + emqtt_keep_alive:cancel(KeepAlive), ok. code_change(_OldVsn, State, _Extra) -> @@ -162,8 +175,9 @@ process_received_bytes(Bytes, end. process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, - State ) -> + State=#state{keep_alive=KeepAlive}) -> ?INFO("~p", [Frame]), + emqtt_keep_alive:activate(KeepAlive), process_request(Type, Frame, State). process_request(?CONNECT, @@ -172,6 +186,7 @@ process_request(?CONNECT, password = Password, proto_ver = ProtoVersion, clean_sess = CleanSess, + keep_alive = AlivePeriod, client_id = ClientId } = Var}, #state{socket = Sock} = State) -> {ReturnCode, State1} = case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, @@ -183,12 +198,14 @@ process_request(?CONNECT, _ -> case emqtt_auth:check(Username, Password) of false -> - error_logger:error_msg("MQTT login failed - no credentials~n"), + ?ERROR_MSG("MQTT login failed - no credentials"), {?CONNACK_CREDENTIALS, State}; true -> + KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), {?CONNACK_ACCEPT, State #state{ will_msg = make_will_msg(Var), - client_id = ClientId }} + client_id = ClientId, + keep_alive = KeepAlive}} end end, send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, @@ -265,9 +282,12 @@ process_request(?UNSUBSCRIBE, {ok, State #state{ subscriptions = Subs0 }}; -process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock}=State) -> +process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> + %Keep alive timer + KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), + ?INFO("~p", [KeepAlive1]), send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), - {ok, State}; + {ok, State#state{keep_alive=KeepAlive1}}; process_request(?DISCONNECT, #mqtt_frame{}, State) -> {stop, State}. @@ -337,3 +357,4 @@ valid_client_id(ClientId) -> ClientIdLen = size(ClientId), 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. + diff --git a/src/emqtt_keep_alive.erl b/src/emqtt_keep_alive.erl new file mode 100644 index 000000000..2885180d5 --- /dev/null +++ b/src/emqtt_keep_alive.erl @@ -0,0 +1,55 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% Developer of the eMQTT Code is +%% Copyright (c) 2012 Ery Lee. All rights reserved. +%% +-module(emqtt_keep_alive). + +-export([new/2, + state/1, + activate/1, + reset/1, + cancel/1]). + +-record(keep_alive, {state, period, timer, msg}). + +new(undefined, _) -> + undefined; +new(0, _) -> + undefined; +new(Period, TimeoutMsg) when is_integer(Period) -> + Ref = erlang:send_after(Period, self(), TimeoutMsg), + #keep_alive{state=idle, period=Period, timer=Ref, msg=TimeoutMsg}. + +state(undefined) -> + undefined; +state(#keep_alive{state=State}) -> + State. + +activate(undefined) -> + undefined; +activate(KeepAlive) when is_record(KeepAlive, keep_alive) -> + KeepAlive#keep_alive{state=ative}. + +reset(undefined) -> + undefined; +reset(KeepAlive=#keep_alive{period=Period, timer=Timer, msg=Msg}) -> + catch erlang:cancel_timer(Timer), + Ref = erlang:send_after(Period, self(), Msg), + KeepAlive#keep_alive{state=idle, timer = Ref}. + +cancel(undefined) -> + undefined; +cancel(KeepAlive=#keep_alive{timer=Timer}) -> + catch erlang:cancel_timer(Timer), + KeepAlive#keep_alive{timer=undefined}. + +