support keep alive

This commit is contained in:
erylee 2012-12-23 16:14:02 +08:00
parent a176312fa3
commit 7117d03a09
2 changed files with 86 additions and 10 deletions

View File

@ -26,10 +26,12 @@
client_id,
clean_sess,
will_msg,
awaiting_ack,
subscriptions
keep_alive,
awaiting_ack,
subscriptions
}).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
@ -97,7 +99,7 @@ handle_info({route, Msg}, #state{socket = Sock} = State) ->
handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate};
handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock }=State) ->
handle_info({inet_async, Sock, _Ref, {ok, Data}}, #state{ socket = Sock}=State) ->
process_received_bytes(
Data, control_throttle(State #state{ await_recv = false }));
@ -105,13 +107,24 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State);
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
error_logger:info_msg("sock error: ~p~n", [Reason]),
?ERROR("sock error: ~p~n", [Reason]),
{noreply, State};
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
case emqtt_keep_alive:state(KeepAlive) of
idle ->
?INFO("keep alive timeout: ~p", [State#state.conn_name]),
{stop, normal, State};
active ->
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
{noreply, State#state{keep_alive=KeepAlive1}}
end;
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
terminate(_Reason, #state{keep_alive=KeepAlive}) ->
emqtt_keep_alive:cancel(KeepAlive),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -162,8 +175,9 @@ process_received_bytes(Bytes,
end.
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
State ) ->
State=#state{keep_alive=KeepAlive}) ->
?INFO("~p", [Frame]),
emqtt_keep_alive:activate(KeepAlive),
process_request(Type, Frame, State).
process_request(?CONNECT,
@ -172,6 +186,7 @@ process_request(?CONNECT,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
keep_alive = AlivePeriod,
client_id = ClientId } = Var}, #state{socket = Sock} = State) ->
{ReturnCode, State1} =
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
@ -183,12 +198,14 @@ process_request(?CONNECT,
_ ->
case emqtt_auth:check(Username, Password) of
false ->
error_logger:error_msg("MQTT login failed - no credentials~n"),
?ERROR_MSG("MQTT login failed - no credentials"),
{?CONNACK_CREDENTIALS, State};
true ->
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
{?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var),
client_id = ClientId }}
client_id = ClientId,
keep_alive = KeepAlive}}
end
end,
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
@ -265,9 +282,12 @@ process_request(?UNSUBSCRIBE,
{ok, State #state{ subscriptions = Subs0 }};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock}=State) ->
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
%Keep alive timer
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
?INFO("~p", [KeepAlive1]),
send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
{ok, State};
{ok, State#state{keep_alive=KeepAlive1}};
process_request(?DISCONNECT, #mqtt_frame{}, State) ->
{stop, State}.
@ -337,3 +357,4 @@ valid_client_id(ClientId) ->
ClientIdLen = size(ClientId),
1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.

55
src/emqtt_keep_alive.erl Normal file
View File

@ -0,0 +1,55 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% Developer of the eMQTT Code is <ery.lee@gmail.com>
%% Copyright (c) 2012 Ery Lee. All rights reserved.
%%
-module(emqtt_keep_alive).
-export([new/2,
state/1,
activate/1,
reset/1,
cancel/1]).
-record(keep_alive, {state, period, timer, msg}).
new(undefined, _) ->
undefined;
new(0, _) ->
undefined;
new(Period, TimeoutMsg) when is_integer(Period) ->
Ref = erlang:send_after(Period, self(), TimeoutMsg),
#keep_alive{state=idle, period=Period, timer=Ref, msg=TimeoutMsg}.
state(undefined) ->
undefined;
state(#keep_alive{state=State}) ->
State.
activate(undefined) ->
undefined;
activate(KeepAlive) when is_record(KeepAlive, keep_alive) ->
KeepAlive#keep_alive{state=ative}.
reset(undefined) ->
undefined;
reset(KeepAlive=#keep_alive{period=Period, timer=Timer, msg=Msg}) ->
catch erlang:cancel_timer(Timer),
Ref = erlang:send_after(Period, self(), Msg),
KeepAlive#keep_alive{state=idle, timer = Ref}.
cancel(undefined) ->
undefined;
cancel(KeepAlive=#keep_alive{timer=Timer}) ->
catch erlang:cancel_timer(Timer),
KeepAlive#keep_alive{timer=undefined}.