diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a616f462d..07463b0c5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1670,7 +1670,8 @@ ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> Backoff = get_mqtt_conf(Zone, keepalive_backoff), - Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), + RecvOct = emqx_pd:get_counter(incoming_bytes), + Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 0f7e340cb..b19cd9277 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -17,6 +17,7 @@ -module(emqx_keepalive). -export([ init/1 + , init/2 , info/1 , info/2 , check/2 @@ -37,9 +38,13 @@ %% @doc Init keepalive. -spec(init(Interval :: non_neg_integer()) -> keepalive()). -init(Interval) when Interval > 0 -> +init(Interval) -> init(0, Interval). + +%% @doc Init keepalive. +-spec(init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive()). +init(StatVal, Interval) when Interval > 0 -> #keepalive{interval = Interval, - statval = emqx_pd:get_counter(incoming_bytes), + statval = StatVal, repeat = 0}. %% @doc Get Info of the keepalive. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 04525a805..df6d93948 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -157,7 +157,7 @@ roots(low) -> sc(ref("force_gc"), #{ desc => """Force the MQTT connection process garbage collection after -this number of messages or bytes passed through.""" +this number of messages or bytes have passed through.""" })} , {"conn_congestion", sc(ref("conn_congestion"), @@ -334,17 +334,17 @@ message within this interval.""" , {"shared_subscription", sc(boolean(), #{ default => true, - desc => "Support MQTT Shared Subscriptions" + desc => "Support MQTT Shared Subscriptions." })} , {"ignore_loop_deliver", sc(boolean(), #{ default => false, - desc => "Ignore loop delivery of messages for mqtt v3.1.1" + desc => "Ignore loop delivery of messages for mqtt v3.1.1." })} , {"strict_mode", sc(boolean(), #{default => false, - desc => "Parse the MQTT frame in strict mode" + desc => "Parse the MQTT frame in strict mode." }) } , {"response_information", @@ -358,7 +358,10 @@ This feature is disabled if is set to \"\".""" , {"server_keepalive", sc(hoconsc:union([integer(), disabled]), #{ default => disabled, - desc => "Server Keep Alive of MQTT 5.0" + desc => +"""Server Keep Alive of MQTT 5.0. +If the Server returns a Server Keep Alive on the CONNACK packet, +the Client MUST use that value instead of the value it sent as the Keep Alive.""" }) } , {"keepalive_backoff", @@ -378,7 +381,7 @@ after idling for 'Keepalive * backoff * 2'.""" , {"upgrade_qos", sc(boolean(), #{ default => false, - desc => "Force to upgrade QoS according to subscription." + desc => "Force upgrade of QoS level according to subscription." }) } , {"max_inflight", @@ -427,9 +430,9 @@ or inflight window is full.""" There's no priority table by default, hence all messages are treated equal.
Priority number [1-255]
-**NOTE**: Comma and equal signs are not allowed for priority topic names
+**NOTE**: Comma and equal signs are not allowed for priority topic names.
**NOTE**: Messages for topics not in the priority table are treated as -either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority +either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority.

**Examples**: To configure \"topic/1\" > \"topic/2\": @@ -439,7 +442,7 @@ mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" , {"mqueue_default_priority", sc(hoconsc:enum([highest, lowest]), #{ default => lowest, - desc => "Default to highest priority for topics not matching priority table" + desc => "Default to highest priority for topics not matching priority table." }) } , {"mqueue_store_qos0", @@ -451,7 +454,7 @@ mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}""" , {"use_username_as_clientid", sc(boolean(), #{ default => false, - desc => "Replace client id with the username" + desc => "Replace client id with the username." }) } , {"peer_cert_as_username", @@ -587,12 +590,12 @@ fields("force_gc") -> , {"count", sc(range(0, inf), #{ default => 16000, - desc => "GC the process after this many received messages" + desc => "GC the process after this many received messages." })} , {"bytes", sc(bytesize(), #{ default => "16MB", - desc => "GC the process after how much bytes passed through" + desc => "GC the process after specified number of bytes have passed through." })} ]; diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl index 8baf52528..7f725e61b 100644 --- a/apps/emqx/test/emqx_keepalive_SUITE.erl +++ b/apps/emqx/test/emqx_keepalive_SUITE.erl @@ -39,4 +39,3 @@ t_check(_) -> ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), ?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)), ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)). -