diff --git a/etc/emqx.conf b/etc/emqx.conf index 3604f2efd..ea90b8d28 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -591,10 +591,10 @@ zone.external.enable_stats = on ## zone.external.server_keepalive = 0 ## The backoff for MQTT keepalive timeout. The broker will kick a connection out -## until 'Keepalive * backoff * 2' timeout. +## until 'Keepalive * backoff' timeout. ## ## Value: Float > 0.5 -zone.external.keepalive_backoff = 0.75 +zone.external.keepalive_backoff = 1.5 ## Maximum number of subscriptions allowed, 0 means no limit. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index a8ed316c2..77167b440 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -748,7 +748,7 @@ end}. %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ - {default, 0.75}, + {default, 1.5}, {datatype, float} ]}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index a7f71d9aa..139011891 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -45,7 +45,8 @@ rate_limit, publish_limit, limit_timer, - idle_timeout + idle_timeout, + last_packet_ts = 0 }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -243,23 +244,17 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> +handle_info({keepalive, start, Interval}, State) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), - StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - Error -> Error - end - end, - case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of + case emqx_keepalive:start(Interval, {keepalive, check}) of {ok, KeepAlive} -> {noreply, State#state{keepalive = KeepAlive}}; {error, Error} -> shutdown(Error, State) end; -handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of +handle_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> + case emqx_keepalive:check(KeepAlive, LastPacketTs) of {ok, KeepAlive1} -> {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> @@ -296,14 +291,14 @@ code_change(_OldVsn, State, _Extra) -> %% Receive and parse data handle_packet(<<>>, State) -> - {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State)))}; + {noreply, maybe_gc(ensure_stats_timer(ensure_rate_limit(State#state{last_packet_ts = erlang:system_time(millisecond)})))}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> case catch emqx_frame:parse(Data, ParserState) of {more, NewParserState} -> - {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout}; + {noreply, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 25740b099..59dbe73b9 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -14,51 +14,37 @@ -module(emqx_keepalive). --export([start/3, check/1, cancel/1]). +-export([start/2, check/2, cancel/1]). --record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). +-record(keepalive, {tmsec, tmsg, tref}). -type(keepalive() :: #keepalive{}). -export_type([keepalive/0]). +-define(SWEET_SPOT, 50). % 50ms + %% @doc Start a keepalive --spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). -start(_, 0, _) -> +-spec(start(integer(), any()) -> {ok, keepalive()}). +start(0, _) -> {ok, #keepalive{}}; -start(StatFun, TimeoutSec, TimeoutMsg) -> - case catch StatFun() of - {ok, StatVal} -> - {ok, #keepalive{statfun = StatFun, statval = StatVal, - tsec = TimeoutSec, tmsg = TimeoutMsg, - tref = timer(TimeoutSec, TimeoutMsg)}}; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} - end. +start(TimeoutSec, TimeoutMsg) -> + {ok, #keepalive{tmsec = TimeoutSec * 1000, tmsg = TimeoutMsg, tref = timer(TimeoutSec * 1000 + ?SWEET_SPOT, TimeoutMsg)}}. %% @doc Check keepalive, called when timeout... --spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}). -check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> - case catch StatFun() of - {ok, NewVal} -> - if NewVal =/= LastVal -> - {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; - Repeat < 1 -> - {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; - true -> - {error, timeout} - end; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} +-spec(check(keepalive(), integer()) -> {ok, keepalive()} | {error, term()}). +check(KeepAlive = #keepalive{tmsec = TimeoutMs}, LastPacketTs) -> + TimeDiff = erlang:system_time(millisecond) - LastPacketTs, + case TimeDiff >= TimeoutMs of + true -> + {error, timeout}; + false -> + {ok, resume(KeepAlive, TimeoutMs + ?SWEET_SPOT - TimeDiff)} end. --spec(resume(keepalive()) -> keepalive()). -resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> - KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. +-spec(resume(keepalive(), integer()) -> keepalive()). +resume(KeepAlive = #keepalive{tmsg = TimeoutMsg}, TimeoutMs) -> + KeepAlive#keepalive{tref = timer(TimeoutMs, TimeoutMsg)}. %% @doc Cancel Keepalive -spec(cancel(keepalive()) -> ok). @@ -67,6 +53,6 @@ cancel(#keepalive{tref = TRef}) when is_reference(TRef) -> cancel(_) -> ok. -timer(Secs, Msg) -> - erlang:send_after(timer:seconds(Secs), self(), Msg). +timer(Millisecond, Msg) -> + erlang:send_after(Millisecond, self(), Msg). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..9d548f066 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -607,7 +607,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of true -> ok; - false -> {error, ?RC_PROTOCOL_ERROR} + false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} end. %% MQTT3.1 does not allow null clientId diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..3a2d4fdae 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -40,7 +40,8 @@ keepalive, enable_stats, stats_timer, - shutdown + shutdown, + last_packet_ts = 0 }). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). @@ -152,13 +153,10 @@ send_fun(WsPid) -> WsPid ! {binary, iolist_to_binary(Data)} end. -stat_fun() -> - fun() -> {ok, get(recv_oct)} end. - websocket_handle({binary, <<>>}, State) -> - {ok, ensure_stats_timer(State)}; + {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; websocket_handle({binary, [<<>>]}, State) -> - {ok, ensure_stats_timer(State)}; + {ok, ensure_stats_timer(State#state{last_packet_ts = erlang:system_time(millisecond)})}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> BinSize = iolist_size(Data), @@ -167,7 +165,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> - {ok, State#state{parser_state = NewParserState}}; + {ok, State#state{parser_state = NewParserState, last_packet_ts = erlang:system_time(millisecond)}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), put(recv_cnt, get(recv_cnt) + 1), @@ -225,7 +223,7 @@ websocket_info({timeout, Timer, emit_stats}, websocket_info({keepalive, start, Interval}, State) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of + case emqx_keepalive:start(Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> @@ -233,8 +231,8 @@ websocket_info({keepalive, start, Interval}, State) -> shutdown(Error, State) end; -websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of +websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive, last_packet_ts = LastPacketTs}) -> + case emqx_keepalive:check(KeepAlive, LastPacketTs) of {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} ->