diff --git a/Makefile b/Makefile index 54b13727a..302aad42a 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ EUNIT_OPTS = verbose CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ - emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ + emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_pqueue emqx_router emqx_sm \ emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \ emqx_mountpoint emqx_listeners emqx_protocol emqx_pool diff --git a/etc/emqx.conf b/etc/emqx.conf index 635ccf68c..c3aa177cc 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -597,10 +597,10 @@ zone.external.force_gc_policy = 1000|1MB ## zone.external.server_keepalive = 0 ## The backoff for MQTT keepalive timeout. The broker will kick a connection out -## until 'Keepalive * backoff * 2' timeout. +## until 'Keepalive * backoff' timeout. ## ## Value: Float > 0.5 -zone.external.keepalive_backoff = 0.75 +zone.external.keepalive_backoff = 1.5 ## Maximum number of subscriptions allowed, 0 means no limit. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 47ad97293..7ad20ea15 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -748,7 +748,7 @@ end}. %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ - {default, 0.75}, + {default, 1.5}, {datatype, float} ]}. @@ -848,6 +848,7 @@ end}. #{bytes => Bytes1, count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; + (Opt, Val) -> {list_to_atom(Opt), Val} end, diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index d9d67f08d..e2c8fdeed 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -239,6 +239,7 @@ handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), emqx_metrics:inc('bytes/received', Size), Incoming = #{bytes => Size, packets => 0}, + put(last_packet_ts, erlang:system_time(millisecond)), handle_packet(Data, State#state{await_recv = false, incoming = Incoming}); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> @@ -250,15 +251,9 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) -> +handle_info({keepalive, start, Interval}, State) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), - StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - Error -> Error - end - end, - case emqx_keepalive:start(StatFun, Interval, {keepalive, check}) of + case emqx_keepalive:start(Interval, {keepalive, check}) of {ok, KeepAlive} -> {noreply, State#state{keepalive = KeepAlive}}; {error, Error} -> @@ -266,7 +261,7 @@ handle_info({keepalive, start, Interval}, State = #state{transport = Transport, end; handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of + case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of {ok, KeepAlive1} -> {noreply, State#state{keepalive = KeepAlive1}}; {error, timeout} -> diff --git a/src/emqx_keepalive.erl b/src/emqx_keepalive.erl index 25740b099..59dbe73b9 100644 --- a/src/emqx_keepalive.erl +++ b/src/emqx_keepalive.erl @@ -14,51 +14,37 @@ -module(emqx_keepalive). --export([start/3, check/1, cancel/1]). +-export([start/2, check/2, cancel/1]). --record(keepalive, {statfun, statval, tsec, tmsg, tref, repeat = 0}). +-record(keepalive, {tmsec, tmsg, tref}). -type(keepalive() :: #keepalive{}). -export_type([keepalive/0]). +-define(SWEET_SPOT, 50). % 50ms + %% @doc Start a keepalive --spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). -start(_, 0, _) -> +-spec(start(integer(), any()) -> {ok, keepalive()}). +start(0, _) -> {ok, #keepalive{}}; -start(StatFun, TimeoutSec, TimeoutMsg) -> - case catch StatFun() of - {ok, StatVal} -> - {ok, #keepalive{statfun = StatFun, statval = StatVal, - tsec = TimeoutSec, tmsg = TimeoutMsg, - tref = timer(TimeoutSec, TimeoutMsg)}}; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} - end. +start(TimeoutSec, TimeoutMsg) -> + {ok, #keepalive{tmsec = TimeoutSec * 1000, tmsg = TimeoutMsg, tref = timer(TimeoutSec * 1000 + ?SWEET_SPOT, TimeoutMsg)}}. %% @doc Check keepalive, called when timeout... --spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}). -check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) -> - case catch StatFun() of - {ok, NewVal} -> - if NewVal =/= LastVal -> - {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; - Repeat < 1 -> - {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; - true -> - {error, timeout} - end; - {error, Error} -> - {error, Error}; - {'EXIT', Reason} -> - {error, Reason} +-spec(check(keepalive(), integer()) -> {ok, keepalive()} | {error, term()}). +check(KeepAlive = #keepalive{tmsec = TimeoutMs}, LastPacketTs) -> + TimeDiff = erlang:system_time(millisecond) - LastPacketTs, + case TimeDiff >= TimeoutMs of + true -> + {error, timeout}; + false -> + {ok, resume(KeepAlive, TimeoutMs + ?SWEET_SPOT - TimeDiff)} end. --spec(resume(keepalive()) -> keepalive()). -resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> - KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. +-spec(resume(keepalive(), integer()) -> keepalive()). +resume(KeepAlive = #keepalive{tmsg = TimeoutMsg}, TimeoutMs) -> + KeepAlive#keepalive{tref = timer(TimeoutMs, TimeoutMsg)}. %% @doc Cancel Keepalive -spec(cancel(keepalive()) -> ok). @@ -67,6 +53,6 @@ cancel(#keepalive{tref = TRef}) when is_reference(TRef) -> cancel(_) -> ok. -timer(Secs, Msg) -> - erlang:send_after(timer:seconds(Secs), self(), Msg). +timer(Millisecond, Msg) -> + erlang:send_after(Millisecond, self(), Msg). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8301cf014..9d548f066 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -607,7 +607,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}, _PState) -> case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of true -> ok; - false -> {error, ?RC_PROTOCOL_ERROR} + false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} end. %% MQTT3.1 does not allow null clientId diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 54dcfb1fe..eab657eb6 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -782,7 +782,7 @@ redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) -> ConnPid ! {deliver, {publish, PacketId, Msg}}; deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) -> - emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, PacketId, Msg}]). + emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]). %%------------------------------------------------------------------------------ %% Awaiting ACK for QoS1/QoS2 Messages diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..2526392ee 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -152,12 +152,11 @@ send_fun(WsPid) -> WsPid ! {binary, iolist_to_binary(Data)} end. -stat_fun() -> - fun() -> {ok, get(recv_oct)} end. - websocket_handle({binary, <<>>}, State) -> + put(last_packet_ts, erlang:system_time(millisecond)), {ok, ensure_stats_timer(State)}; websocket_handle({binary, [<<>>]}, State) -> + put(last_packet_ts, erlang:system_time(millisecond)), {ok, ensure_stats_timer(State)}; websocket_handle({binary, Data}, State = #state{parser_state = ParserState, proto_state = ProtoState}) -> @@ -167,6 +166,7 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, emqx_metrics:inc('bytes/received', BinSize), case catch emqx_frame:parse(iolist_to_binary(Data), ParserState) of {more, NewParserState} -> + put(last_packet_ts, erlang:system_time(millisecond)), {ok, State#state{parser_state = NewParserState}}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), @@ -225,7 +225,7 @@ websocket_info({timeout, Timer, emit_stats}, websocket_info({keepalive, start, Interval}, State) -> ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State), - case emqx_keepalive:start(stat_fun(), Interval, {keepalive, check}) of + case emqx_keepalive:start(Interval, {keepalive, check}) of {ok, KeepAlive} -> {ok, State#state{keepalive = KeepAlive}}; {error, Error} -> @@ -234,7 +234,7 @@ websocket_info({keepalive, start, Interval}, State) -> end; websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> - case emqx_keepalive:check(KeepAlive) of + case emqx_keepalive:check(KeepAlive, get(last_packet_ts)) of {ok, KeepAlive1} -> {ok, State#state{keepalive = KeepAlive1}}; {error, timeout} -> diff --git a/test/emqx_keepalive_SUITE.erl b/test/emqx_keepalive_SUITE.erl index c4dbd80f2..333d66f45 100644 --- a/test/emqx_keepalive_SUITE.erl +++ b/test/emqx_keepalive_SUITE.erl @@ -26,17 +26,18 @@ groups() -> [{keepalive, [], [t_keepalive]}]. %%-------------------------------------------------------------------- t_keepalive(_) -> - {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), - [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). + {ok, KA} = emqx_keepalive:start(1, {keepalive, timeout}), + resumed = keepalive_recv(KA, 100), + timeout = keepalive_recv(KA, 2000). -keepalive_recv(KA, Acc) -> +keepalive_recv(KA, MockInterval) -> receive {keepalive, timeout} -> - case emqx_keepalive:check(KA) of - {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]); - {error, timeout} -> [timeout | Acc] + case emqx_keepalive:check(KA, erlang:system_time(millisecond) - MockInterval) of + {ok, _} -> resumed; + {error, timeout} -> timeout end after 4000 -> - Acc + error end. diff --git a/test/emqx_net_SUITE.erl b/test/emqx_net_SUITE.erl deleted file mode 100644 index 50a830d10..000000000 --- a/test/emqx_net_SUITE.erl +++ /dev/null @@ -1,43 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_net_SUITE). - -%% CT --compile(export_all). --compile(nowarn_export_all). - -all() -> [{group, keepalive}]. - -groups() -> [{keepalive, [], [t_keepalive]}]. - -%%-------------------------------------------------------------------- -%% Keepalive -%%-------------------------------------------------------------------- - -t_keepalive(_) -> - {ok, KA} = emqx_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}), - [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])). - -keepalive_recv(KA, Acc) -> - receive - {keepalive, timeout} -> - case emqx_keepalive:check(KA) of - {ok, KA1} -> keepalive_recv(KA1, [resumed | Acc]); - {error, timeout} -> [timeout | Acc] - end - after 4000 -> - Acc - end. -