Calculate the 1.5 keep alive time exactly
This commit is contained in:
parent
1256ce1500
commit
073bf481c9
|
@ -591,10 +591,10 @@ zone.external.enable_stats = on
|
|||
## zone.external.server_keepalive = 0
|
||||
|
||||
## The backoff for MQTT keepalive timeout. The broker will kick a connection out
|
||||
## until 'Keepalive * backoff * 2' timeout.
|
||||
## until 'Keepalive * backoff' timeout.
|
||||
##
|
||||
## Value: Float > 0.5
|
||||
zone.external.keepalive_backoff = 0.75
|
||||
zone.external.keepalive_backoff = 1.5
|
||||
|
||||
## Maximum number of subscriptions allowed, 0 means no limit.
|
||||
##
|
||||
|
|
|
@ -748,7 +748,7 @@ end}.
|
|||
|
||||
%% @doc Keepalive backoff
|
||||
{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
|
||||
{default, 0.75},
|
||||
{default, 1.5},
|
||||
{datatype, float}
|
||||
]}.
|
||||
|
||||
|
|
|
@ -45,7 +45,8 @@
|
|||
rate_limit,
|
||||
publish_limit,
|
||||
limit_timer,
|
||||
idle_timeout
|
||||
idle_timeout,
|
||||
last_packet_ts = 0
|
||||
}).
|
||||
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
|
||||
|
@ -243,23 +244,17 @@ handle_info({inet_reply, _Sock, ok}, State) ->
|
|||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
shutdown(Reason, State);
|
||||
|
||||
handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) ->
|
||||
handle_info({keepalive, start, Interval}, State) ->
|
||||
?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
||||
StatFun = fun() ->
|
||||
case Transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
Error -> Error
|
||||
end
|
||||
end,
|
||||
case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of
|
||||
case emqx_keepalive:start(Interval, {keepalive, check}) of
|
||||
{ok, KeepAlive} ->
|
||||
{noreply, State#state{keepalive = KeepAlive}};
|
||||
{error, Error} ->
|
||||
shutdown(Error, State)
|
||||
end;
|
||||
|
||||
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
||||
case emqx_keepalive:check(KeepAlive) of
|
||||
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) ->
|
||||
case emqx_keepalive:check(KeepAlive, LastPacketTs) of
|
||||
{ok, KeepAlive1} ->
|
||||
{noreply, State#state{keepalive = KeepAlive1}};
|
||||
{error, timeout} ->
|
||||
|
@ -296,14 +291,14 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
%% Receive and parse data
|
||||
handle_packet(<<>>, State) ->
|
||||
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))};
|
||||
{noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State#state{last_packet_ts = erlang:system_time(millisecond)})))};
|
||||
|
||||
handle_packet(Data, State = #state{proto_state = ProtoState,
|
||||
parser_state = ParserState,
|
||||
idle_timeout = IdleTimeout}) ->
|
||||
case catch emqx_frame:parse(Data, ParserState) of
|
||||
{more, NewParserState} ->
|
||||
{noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout};
|
||||
{noreply, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}, IdleTimeout};
|
||||
{ok, Packet = ?PACKET(Type), Rest} ->
|
||||
emqx_metrics:received(Packet),
|
||||
case emqx_protocol:received(Packet, ProtoState) of
|
||||
|
|
|
@ -14,51 +14,37 @@
|
|||
|
||||
-module(emqx_keepalive).
|
||||
|
||||
-export([start/3, check/1, cancel/1]).
|
||||
-export([start/2, check/2, cancel/1]).
|
||||
|
||||
-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}).
|
||||
-record(keepalive, {tmsec, tmsg, tref}).
|
||||
|
||||
-type(keepalive() :: #keepalive{}).
|
||||
|
||||
-export_type([keepalive/0]).
|
||||
|
||||
-define(SWEET_SPOT, 50). % 50ms
|
||||
|
||||
%% @doc Start a keepalive
|
||||
-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}).
|
||||
start(_, 0, _) ->
|
||||
-spec(start(integer(), any()) -> {ok, keepalive()}).
|
||||
start(0, _) ->
|
||||
{ok, #keepalive{}};
|
||||
start(StatFun, TimeoutSec, TimeoutMsg) ->
|
||||
case catch StatFun() of
|
||||
{ok, StatVal} ->
|
||||
{ok, #keepalive{statfun = StatFun, statval = StatVal,
|
||||
tsec = TimeoutSec, tmsg = TimeoutMsg,
|
||||
tref = timer(TimeoutSec, TimeoutMsg)}};
|
||||
{error, Error} ->
|
||||
{error, Error};
|
||||
{'EXIT', Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
start(TimeoutSec, TimeoutMsg) ->
|
||||
{ok, #keepalive{tmsec = TimeoutSec * 1000, tmsg = TimeoutMsg, tref = timer(TimeoutSec * 1000 + ?SWEET_SPOT, TimeoutMsg)}}.
|
||||
|
||||
%% @doc Check keepalive, called when timeout...
|
||||
-spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}).
|
||||
check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
|
||||
case catch StatFun() of
|
||||
{ok, NewVal} ->
|
||||
if NewVal =/= LastVal ->
|
||||
{ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})};
|
||||
Repeat < 1 ->
|
||||
{ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})};
|
||||
-spec(check(keepalive(), integer()) -> {ok, keepalive()} | {error, term()}).
|
||||
check(KeepAlive = #keepalive{tmsec = TimeoutMs}, LastPacketTs) ->
|
||||
TimeDiff = erlang:system_time(millisecond) - LastPacketTs,
|
||||
case TimeDiff >= TimeoutMs of
|
||||
true ->
|
||||
{error, timeout}
|
||||
end;
|
||||
{error, Error} ->
|
||||
{error, Error};
|
||||
{'EXIT', Reason} ->
|
||||
{error, Reason}
|
||||
{error, timeout};
|
||||
false ->
|
||||
{ok, resume(KeepAlive, TimeoutMs + ?SWEET_SPOT - TimeDiff)}
|
||||
end.
|
||||
|
||||
-spec(resume(keepalive()) -> keepalive()).
|
||||
resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
|
||||
KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}.
|
||||
-spec(resume(keepalive(), integer()) -> keepalive()).
|
||||
resume(KeepAlive = #keepalive{tmsg = TimeoutMsg}, TimeoutMs) ->
|
||||
KeepAlive#keepalive{tref = timer(TimeoutMs, TimeoutMsg)}.
|
||||
|
||||
%% @doc Cancel Keepalive
|
||||
-spec(cancel(keepalive()) -> ok).
|
||||
|
@ -67,6 +53,6 @@ cancel(#keepalive{tref = TRef}) when is_reference(TRef) ->
|
|||
cancel(_) ->
|
||||
ok.
|
||||
|
||||
timer(Secs, Msg) ->
|
||||
erlang:send_after(timer:seconds(Secs), self(), Msg).
|
||||
timer(Millisecond, Msg) ->
|
||||
erlang:send_after(Millisecond, self(), Msg).
|
||||
|
||||
|
|
|
@ -607,7 +607,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
|
|||
proto_name = Name}, _PState) ->
|
||||
case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
|
||||
true -> ok;
|
||||
false -> {error, ?RC_PROTOCOL_ERROR}
|
||||
false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
|
||||
end.
|
||||
|
||||
%% MQTT3.1 does not allow null clientId
|
||||
|
|
|
@ -40,7 +40,8 @@
|
|||
keepalive,
|
||||
enable_stats,
|
||||
stats_timer,
|
||||
shutdown
|
||||
shutdown,
|
||||
last_packet_ts = 0
|
||||
}).
|
||||
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||
|
@ -152,13 +153,10 @@ send_fun(WsPid) ->
|
|||
WsPid ! {binary, iolist_to_binary(Data)}
|
||||
end.
|
||||
|
||||
stat_fun() ->
|
||||
fun() -> {ok, get(recv_oct)} end.
|
||||
|
||||
websocket_handle({binary, <<>>}, State) ->
|
||||
{ok, ensure_stats_timer(State)};
|
||||
{ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})};
|
||||
websocket_handle({binary, [<<>>]}, State) ->
|
||||
{ok, ensure_stats_timer(State)};
|
||||
{ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})};
|
||||
websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
||||
proto_state = ProtoState}) ->
|
||||
BinSize = iolist_size(Data),
|
||||
|
@ -167,7 +165,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
|
|||
emqx_metrics:inc('bytes/received', BinSize),
|
||||
case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of
|
||||
{more, NewParserState} ->
|
||||
{ok, State#state{parser_state = NewParserState}};
|
||||
{ok, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}};
|
||||
{ok, Packet, Rest} ->
|
||||
emqx_metrics:received(Packet),
|
||||
put(recv_cnt, get(recv_cnt) + 1),
|
||||
|
@ -225,7 +223,7 @@ websocket_info({timeout, Timer, emit_stats},
|
|||
|
||||
websocket_info({keepalive, start, Interval}, State) ->
|
||||
?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
||||
case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of
|
||||
case emqx_keepalive:start(Interval, {keepalive, check}) of
|
||||
{ok, KeepAlive} ->
|
||||
{ok, State#state{keepalive = KeepAlive}};
|
||||
{error, Error} ->
|
||||
|
@ -233,8 +231,8 @@ websocket_info({keepalive, start, Interval}, State) ->
|
|||
shutdown(Error, State)
|
||||
end;
|
||||
|
||||
websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
||||
case emqx_keepalive:check(KeepAlive) of
|
||||
websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) ->
|
||||
case emqx_keepalive:check(KeepAlive, LastPacketTs) of
|
||||
{ok, KeepAlive1} ->
|
||||
{ok, State#state{keepalive = KeepAlive1}};
|
||||
{error, timeout} ->
|
||||
|
|
Loading…
Reference in New Issue