diff --git a/Makefile b/Makefile index 302aad42a..54b13727a 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_pqueue emqx_router emqx_sm \ + emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ emqx_mountpoint emqx_listeners emqx_protocol emqx_pool diff --git a/etc/emqx.conf b/etc/emqx.conf index c3aa177cc..635ccf68c 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -597,10 +597,10 @@ zone.external.force_gc_policy = 1000|1MB ## zone.external.server_keepalive = 0 ## The backoff for MQTT keepalive timeout. The broker will kick a connection out -## until 'Keepalive * backoff' timeout. +## until 'Keepalive * backoff * 2' timeout. ## ## Value: Float > 0.5 -zone.external.keepalive_backoff = 1.5 +zone.external.keepalive_backoff = 0.75 ## Maximum number of subscriptions allowed, 0 means no limit. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 32e006ffa..379a3939e 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -748,7 +748,7 @@ end}. %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ - {default, 1.5}, + {default, 0.75}, {datatype, float} ]}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index dc8439cca..3176ad443 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -249,7 +249,6 @@ handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), emqx_metrics:inc('bytes/received', Size), Incoming = #{bytes => Size, packets => 0}, - put(last_packet_ts, erlang:system_time(millisecond)), handle_packet(Data, State#state{await_recv = false, incoming = Incoming}); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> @@ -261,9 +260,15 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State) -> +handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(Interval, {keepalive, check}) of + StatFun = fun() -> + case Transport:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + Error -> Error + end + end, + case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of {ok, KeepAlive} -> {noreply, State#state{keepalive = KeepAlive}}; {error, Error} -> @@ -271,7 +276,7 @@ handle_info({keepalive, start, Interval}, State) -> end; handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of + case emqx_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 59dbe73b9..25740b099 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -14,37 +14,51 @@ -module(emqx_keepalive). --export([start/2, check/2, cancel/1]). +-export([start/3, check/1, cancel/1]). --record(keepalive, {tmsec, tmsg, tref}). +-record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). -type(keepalive() :: #keepalive{}). -export_type([keepalive/0]). --define(SWEET_SPOT, 50). % 50ms - %% @doc Start a keepalive --spec(start(integer(), any()) -> {ok, keepalive()}). -start(0, _) -> +-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). +start(_, 0, _) -> {ok, #keepalive{}}; -start(TimeoutSec, TimeoutMsg) -> - {ok, #keepalive{tmsec = TimeoutSec * 1000, tmsg = TimeoutMsg, tref = timer(TimeoutSec * 1000 + ?SWEET_SPOT, TimeoutMsg)}}. - -%% @doc Check keepalive, called when timeout... --spec(check(keepalive(), integer()) -> {ok, keepalive()} | {error, term()}). -check(KeepAlive = #keepalive{tmsec = TimeoutMs}, LastPacketTs) -> - TimeDiff = erlang:system_time(millisecond) - LastPacketTs, - case TimeDiff >= TimeoutMs of - true -> - {error, timeout}; - false -> - {ok, resume(KeepAlive, TimeoutMs + ?SWEET_SPOT - TimeDiff)} +start(StatFun, TimeoutSec, TimeoutMsg) -> + case catch StatFun() of + {ok, StatVal} -> + {ok, #keepalive{statfun = StatFun, statval = StatVal, + tsec = TimeoutSec, tmsg = TimeoutMsg, + tref = timer(TimeoutSec, TimeoutMsg)}}; + {error, Error} -> + {error, Error}; + {'EXIT', Reason} -> + {error, Reason} end. --spec(resume(keepalive(), integer()) -> keepalive()). -resume(KeepAlive = #keepalive{tmsg = TimeoutMsg}, TimeoutMs) -> - KeepAlive#keepalive{tref = timer(TimeoutMs, TimeoutMsg)}. +%% @doc Check keepalive, called when timeout... +-spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}). +check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> + case catch StatFun() of + {ok, NewVal} -> + if NewVal =/= LastVal -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; + Repeat < 1 -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; + true -> + {error, timeout} + end; + {error, Error} -> + {error, Error}; + {'EXIT', Reason} -> + {error, Reason} + end. + +-spec(resume(keepalive()) -> keepalive()). +resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> + KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. %% @doc Cancel Keepalive -spec(cancel(keepalive()) -> ok). @@ -53,6 +67,6 @@ cancel(#keepalive{tref = TRef}) when is_reference(TRef) -> cancel(_) -> ok. -timer(Millisecond, Msg) -> - erlang:send_after(Millisecond, self(), Msg). +timer(Secs, Msg) -> + erlang:send_after(timer:seconds(Secs), self(), Msg). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9d548f066..8301cf014 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -607,7 +607,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of true -> ok; - false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} + false -> {error, ?RC_PROTOCOL_ERROR} end. %% MQTT3.1 does not allow null clientId diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 2526392ee..fa08fa1bb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -152,11 +152,12 @@ send_fun(WsPid) -> WsPid ! {binary, iolist_to_binary(Data)} end. +stat_fun() -> + fun() -> {ok, get(recv_oct)} end. + websocket_handle({binary, <<>>}, State) -> - put(last_packet_ts, erlang:system_time(millisecond)), {ok, ensure_stats_timer(State)}; websocket_handle({binary, [<<>>]}, State) -> - put(last_packet_ts, erlang:system_time(millisecond)), {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> @@ -166,7 +167,6 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> - put(last_packet_ts, erlang:system_time(millisecond)), {ok, State#state{parser_state = NewParserState}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), @@ -225,7 +225,7 @@ websocket_info({timeout, Timer, emit_stats}, websocket_info({keepalive, start, Interval}, State) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(Interval, {keepalive, check}) of + case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> @@ -234,7 +234,7 @@ websocket_info({keepalive, start, Interval}, State) -> end; websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of + case emqx_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> diff --git a/test/emqx_keepalive_SUITE.erl b/test/emqx_keepalive_SUITE.erl index 333d66f45..c4dbd80f2 100644 --- a/test/emqx_keepalive_SUITE.erl +++ b/test/emqx_keepalive_SUITE.erl @@ -26,18 +26,17 @@ groups() -> [{keepalive, [], [t_keepalive]}]. %%-------------------------------------------------------------------- t_keepalive(_) -> - {ok, KA} = emqx_keepalive:start(1, {keepalive, timeout}), - resumed = keepalive_recv(KA, 100), - timeout = keepalive_recv(KA, 2000). + {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), + [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). -keepalive_recv(KA, MockInterval) -> +keepalive_recv(KA, Acc) -> receive {keepalive, timeout} -> - case emqx_keepalive:check(KA, erlang:system_time(millisecond) - MockInterval) of - {ok, _} -> resumed; - {error, timeout} -> timeout + case emqx_keepalive:check(KA) of + {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]); + {error, timeout} -> [timeout | Acc] end after 4000 -> - error + Acc end. diff --git a/test/emqx_net_SUITE.erl b/test/emqx_net_SUITE.erl new file mode 100644 index 000000000..50a830d10 --- /dev/null +++ b/test/emqx_net_SUITE.erl @@ -0,0 +1,43 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_net_SUITE). + +%% CT +-compile(export_all). +-compile(nowarn_export_all). + +all() -> [{group, keepalive}]. + +groups() -> [{keepalive, [], [t_keepalive]}]. + +%%-------------------------------------------------------------------- +%% Keepalive +%%-------------------------------------------------------------------- + +t_keepalive(_) -> + {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), + [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). + +keepalive_recv(KA, Acc) -> + receive + {keepalive, timeout} -> + case emqx_keepalive:check(KA) of + {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]); + {error, timeout} -> [timeout | Acc] + end + after 4000 -> + Acc + end. +