From 78288e80886466ac2f96c6bfef06e47b747c2de1 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 4 Oct 2015 19:43:58 +0800 Subject: [PATCH] improve keepalie --- src/emqttd_keepalive.erl | 75 ++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/src/emqttd_keepalive.erl b/src/emqttd_keepalive.erl index f5c7f2ac7..e06382207 100644 --- a/src/emqttd_keepalive.erl +++ b/src/emqttd_keepalive.erl @@ -23,62 +23,61 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_keepalive). -author("Feng Lee "). --export([new/3, resume/1, cancel/1]). +-export([start/3, check/1, cancel/1]). --record(keepalive, {transport, - socket, - recv_oct, - timeout_sec, - timeout_msg, - timer_ref}). +-record(keepalive, {statfun, statval, + tsec, tmsg, tref, + repeat = 0}). %%------------------------------------------------------------------------------ -%% @doc Create a keepalive +%% @doc Start a keepalive %% @end %%------------------------------------------------------------------------------ -new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> - {ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), - Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref}. +start(_, 0, _) -> + undefined; +start(StatFun, TimeoutSec, TimeoutMsg) -> + {ok, StatVal} = StatFun(), + #keepalive{statfun = StatFun, statval = StatVal, + tsec = TimeoutSec, tmsg = TimeoutMsg, + tref = timer(TimeoutSec, TimeoutMsg)}. %%------------------------------------------------------------------------------ -%% @doc Try to resume keepalive, called when timeout +%% @doc Check keepalive, called when timeout. %% @end %%------------------------------------------------------------------------------ -resume(KeepAlive = #keepalive {transport = Transport, - socket = Socket, - recv_oct = RecvOct, - timeout_sec = TimeoutSec, - timeout_msg = TimeoutMsg, - timer_ref = Ref }) -> - {ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), - if - NewRecvOct =:= RecvOct -> - timeout; - true -> - %need? - cancel(Ref), - NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), - {resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}} +check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> + case StatFun() of + {ok, NewVal} -> + if NewVal =/= LastVal -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; + Repeat < 1 -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; + true -> + {error, timeout} + end; + {error, Error} -> + {error, Error} end. +resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> + KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. + %%------------------------------------------------------------------------------ %% @doc Cancel Keepalive %% @end %%------------------------------------------------------------------------------ -cancel(#keepalive{timer_ref = Ref}) -> - cancel(Ref); +cancel(#keepalive{tref = TRef}) -> + cancel(TRef); cancel(undefined) -> - undefined; -cancel(Ref) -> - catch erlang:cancel_timer(Ref). + ok; +cancel(TRef) -> + catch erlang:cancel_timer(TRef). + +timer(Sec, Msg) -> + erlang:send_after(timer:seconds(Sec), self(), Msg).