improve keepalie

This commit is contained in:
Feng 2015-10-04 19:43:58 +08:00
parent 273149f633
commit 78288e8088
1 changed files with 37 additions and 38 deletions

View File

@ -23,62 +23,61 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_keepalive). -module(emqttd_keepalive).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-export([new/3, resume/1, cancel/1]). -export([start/3, check/1, cancel/1]).
-record(keepalive, {transport, -record(keepalive, {statfun, statval,
socket, tsec, tmsg, tref,
recv_oct, repeat = 0}).
timeout_sec,
timeout_msg,
timer_ref}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Create a keepalive %% @doc Start a keepalive
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
new({Transport, Socket}, TimeoutSec, TimeoutMsg) when TimeoutSec > 0 -> start(_, 0, _) ->
{ok, [{recv_oct, RecvOct}]} = Transport:getstat(Socket, [recv_oct]), undefined;
Ref = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg), start(StatFun, TimeoutSec, TimeoutMsg) ->
#keepalive {transport = Transport, {ok, StatVal} = StatFun(),
socket = Socket, #keepalive{statfun = StatFun, statval = StatVal,
recv_oct = RecvOct, tsec = TimeoutSec, tmsg = TimeoutMsg,
timeout_sec = TimeoutSec, tref = timer(TimeoutSec, TimeoutMsg)}.
timeout_msg = TimeoutMsg,
timer_ref = Ref}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Try to resume keepalive, called when timeout %% @doc Check keepalive, called when timeout.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
resume(KeepAlive = #keepalive {transport = Transport, check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
socket = Socket, case StatFun() of
recv_oct = RecvOct, {ok, NewVal} ->
timeout_sec = TimeoutSec, if NewVal =/= LastVal ->
timeout_msg = TimeoutMsg, {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
timer_ref = Ref }) -> Repeat < 1 ->
{ok, [{recv_oct, NewRecvOct}]} = Transport:getstat(Socket, [recv_oct]), {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
if true ->
NewRecvOct =:= RecvOct -> {error, timeout}
timeout; end;
true -> {error, Error} ->
%need? {error, Error}
cancel(Ref),
NewRef = erlang:send_after(timer:seconds(TimeoutSec), self(), TimeoutMsg),
{resumed, KeepAlive#keepalive{recv_oct = NewRecvOct, timer_ref = NewRef}}
end. end.
resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Cancel Keepalive %% @doc Cancel Keepalive
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cancel(#keepalive{timer_ref = Ref}) -> cancel(#keepalive{tref = TRef}) ->
cancel(Ref); cancel(TRef);
cancel(undefined) -> cancel(undefined) ->
undefined; ok;
cancel(Ref) -> cancel(TRef) ->
catch erlang:cancel_timer(Ref). catch erlang:cancel_timer(TRef).
timer(Sec, Msg) ->
erlang:send_after(timer:seconds(Sec), self(), Msg).