Merge pull request #6852 from zhongwencool/fix-keepalive-init

fix(keepalive): keepalive desc improvement.
This commit is contained in:
zhongwencool 2022-01-27 22:41:10 +08:00 committed by GitHub
commit c049e296d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 40 deletions

View File

@ -647,12 +647,11 @@ mqtt {
## Default: disabled ## Default: disabled
server_keepalive = disabled server_keepalive = disabled
## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## The backoff for MQTT keepalive timeout. The broker will close the connection
## until 'Keepalive * backoff * 2' timeout. ## after idling for 'Keepalive * backoff * 2'.
## ##
## @doc mqtt.keepalive_backoff ## @doc mqtt.keepalive_backoff
## ValueType: Float ## ValueType: Float
## Range: (0.5, 1]
## Default: 0.75 ## Default: 0.75
keepalive_backoff = 0.75 keepalive_backoff = 0.75

View File

@ -1670,7 +1670,8 @@ ensure_keepalive_timer(0, Channel) -> Channel;
ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(disabled, Channel) -> Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Backoff = get_mqtt_conf(Zone, keepalive_backoff), Backoff = get_mqtt_conf(Zone, keepalive_backoff),
Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), RecvOct = emqx_pd:get_counter(incoming_bytes),
Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) -> clear_keepalive(Channel = #channel{timers = Timers}) ->

View File

@ -17,6 +17,7 @@
-module(emqx_keepalive). -module(emqx_keepalive).
-export([ init/1 -export([ init/1
, init/2
, info/1 , info/1
, info/2 , info/2
, check/2 , check/2
@ -37,9 +38,13 @@
%% @doc Init keepalive. %% @doc Init keepalive.
-spec(init(Interval :: non_neg_integer()) -> keepalive()). -spec(init(Interval :: non_neg_integer()) -> keepalive()).
init(Interval) when Interval > 0 -> init(Interval) -> init(0, Interval).
%% @doc Init keepalive.
-spec(init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive()).
init(StatVal, Interval) when Interval > 0 ->
#keepalive{interval = Interval, #keepalive{interval = Interval,
statval = 0, statval = StatVal,
repeat = 0}. repeat = 0}.
%% @doc Get Info of the keepalive. %% @doc Get Info of the keepalive.

View File

@ -155,7 +155,10 @@ roots(medium) ->
roots(low) -> roots(low) ->
[ {"force_gc", [ {"force_gc",
sc(ref("force_gc"), sc(ref("force_gc"),
#{})} #{ desc =>
"""Force the MQTT connection process garbage collection after
this number of messages or bytes have passed through."""
})}
, {"conn_congestion", , {"conn_congestion",
sc(ref("conn_congestion"), sc(ref("conn_congestion"),
#{})} #{})}
@ -288,131 +291,185 @@ fields("cache") ->
fields("mqtt") -> fields("mqtt") ->
[ {"idle_timeout", [ {"idle_timeout",
sc(hoconsc:union([infinity, duration()]), sc(hoconsc:union([infinity, duration()]),
#{ default => "15s" #{ default => "15s",
desc =>
"""Close TCP connections from the clients that have not sent MQTT CONNECT
message within this interval."""
})} })}
, {"max_packet_size", , {"max_packet_size",
sc(bytesize(), sc(bytesize(),
#{ default => "1MB" #{ default => "1MB",
desc => "Maximum MQTT packet size allowed."
})} })}
, {"max_clientid_len", , {"max_clientid_len",
sc(range(23, 65535), sc(range(23, 65535),
#{ default => 65535 #{ default => 65535,
desc => "Maximum allowed length of MQTT clientId."
})} })}
, {"max_topic_levels", , {"max_topic_levels",
sc(range(1, 65535), sc(range(1, 65535),
#{ default => 65535 #{ default => 65535,
desc => "Maximum topic levels allowed."
})} })}
, {"max_qos_allowed", , {"max_qos_allowed",
sc(range(0, 2), sc(range(0, 2),
#{ default => 2 #{ default => 2,
desc => "Maximum QoS allowed."
})} })}
, {"max_topic_alias", , {"max_topic_alias",
sc(range(0, 65535), sc(range(0, 65535),
#{ default => 65535 #{ default => 65535,
desc => "Maximum Topic Alias, 0 means no topic alias supported."
})} })}
, {"retain_available", , {"retain_available",
sc(boolean(), sc(boolean(),
#{ default => true #{ default => true,
desc => "Support MQTT retained messages."
})} })}
, {"wildcard_subscription", , {"wildcard_subscription",
sc(boolean(), sc(boolean(),
#{ default => true #{ default => true,
desc => "Support MQTT Wildcard Subscriptions."
})} })}
, {"shared_subscription", , {"shared_subscription",
sc(boolean(), sc(boolean(),
#{ default => true #{ default => true,
desc => "Support MQTT Shared Subscriptions."
})} })}
, {"ignore_loop_deliver", , {"ignore_loop_deliver",
sc(boolean(), sc(boolean(),
#{ default => false #{ default => false,
desc => "Ignore loop delivery of messages for mqtt v3.1.1."
})} })}
, {"strict_mode", , {"strict_mode",
sc(boolean(), sc(boolean(),
#{default => false #{default => false,
desc => "Parse the MQTT frame in strict mode."
}) })
} }
, {"response_information", , {"response_information",
sc(string(), sc(string(),
#{default => "" #{default => "",
desc =>
"""Specify the response information returned to the client
This feature is disabled if is set to \"\"."""
}) })
} }
, {"server_keepalive", , {"server_keepalive",
sc(hoconsc:union([integer(), disabled]), sc(hoconsc:union([integer(), disabled]),
#{ default => disabled #{ default => disabled,
desc =>
"""Server Keep Alive of MQTT 5.0.
If the Server returns a Server Keep Alive on the CONNACK packet,
the Client MUST use that value instead of the value it sent as the Keep Alive."""
}) })
} }
, {"keepalive_backoff", , {"keepalive_backoff",
sc(float(), sc(float(),
#{default => 0.75 #{default => 0.75,
desc =>
"""The backoff for MQTT keepalive timeout. The broker will close the connection
after idling for 'Keepalive * backoff * 2'."""
}) })
} }
, {"max_subscriptions", , {"max_subscriptions",
sc(hoconsc:union([range(1, inf), infinity]), sc(hoconsc:union([range(1, inf), infinity]),
#{ default => infinity #{ default => infinity,
desc => "Maximum number of subscriptions allowed."
}) })
} }
, {"upgrade_qos", , {"upgrade_qos",
sc(boolean(), sc(boolean(),
#{ default => false #{ default => false,
desc => "Force upgrade of QoS level according to subscription."
}) })
} }
, {"max_inflight", , {"max_inflight",
sc(range(1, 65535), sc(range(1, 65535),
#{ default => 32 #{ default => 32,
desc => "Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked."
}) })
} }
, {"retry_interval", , {"retry_interval",
sc(duration(), sc(duration(),
#{default => "30s" #{ default => "30s",
desc => "Retry interval for QoS1/2 message delivering."
}) })
} }
, {"max_awaiting_rel", , {"max_awaiting_rel",
sc(hoconsc:union([integer(), infinity]), sc(hoconsc:union([integer(), infinity]),
#{ default => 100 #{ default => 100,
desc => "Maximum QoS2 packets (Client -> Broker) awaiting PUBREL."
}) })
} }
, {"await_rel_timeout", , {"await_rel_timeout",
sc(duration(), sc(duration(),
#{ default => "300s" #{ default => "300s",
desc => "The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout."
}) })
} }
, {"session_expiry_interval", , {"session_expiry_interval",
sc(duration(), sc(duration(),
#{ default => "2h" #{ default => "2h",
desc => "Default session expiry interval for MQTT V3.1.1 connections."
}) })
} }
, {"max_mqueue_len", , {"max_mqueue_len",
sc(hoconsc:union([range(0, inf), infinity]), sc(hoconsc:union([range(0, inf), infinity]),
#{ default => 1000 #{ default => 1000,
desc =>
"""Maximum queue length. Enqueued messages when persistent client disconnected,
or inflight window is full."""
}) })
} }
, {"mqueue_priorities", , {"mqueue_priorities",
sc(hoconsc:union([map(), disabled]), sc(hoconsc:union([map(), disabled]),
#{ default => disabled #{ default => disabled,
desc =>
"""Topic priorities.<br>
There's no priority table by default, hence all messages are treated equal.<br>
Priority number [1-255]<br>
**NOTE**: Comma and equal signs are not allowed for priority topic names.<br>
**NOTE**: Messages for topics not in the priority table are treated as
either highest or lowest priority depending on the configured value for mqtt.mqueue_default_priority.
<br><br>
**Examples**:
To configure \"topic/1\" > \"topic/2\":
mqueue_priorities: {\"topic/1\": 10, \"topic/2\": 8}"""
}) })
} }
, {"mqueue_default_priority", , {"mqueue_default_priority",
sc(hoconsc:enum([highest, lowest]), sc(hoconsc:enum([highest, lowest]),
#{ default => lowest #{ default => lowest,
desc => "Default to highest priority for topics not matching priority table."
}) })
} }
, {"mqueue_store_qos0", , {"mqueue_store_qos0",
sc(boolean(), sc(boolean(),
#{ default => true #{ default => true,
desc => "Support enqueue QoS0 messages."
}) })
} }
, {"use_username_as_clientid", , {"use_username_as_clientid",
sc(boolean(), sc(boolean(),
#{ default => false #{ default => false,
desc => "Replace client id with the username."
}) })
} }
, {"peer_cert_as_username", , {"peer_cert_as_username",
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]), sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{ default => disabled #{ default => disabled,
desc =>
"""Use the CN, DN or CRT field from the client certificate as a username.
Only works for the TLS connection."""
})} })}
, {"peer_cert_as_clientid", , {"peer_cert_as_clientid",
sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]), sc(hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{ default => disabled #{ default => disabled,
desc =>
"""Use the CN, DN or CRT field from the client certificate as a clientid.
Only works for the TLS connection."""
})} })}
]; ];
@ -532,11 +589,13 @@ fields("force_gc") ->
})} })}
, {"count", , {"count",
sc(range(0, inf), sc(range(0, inf),
#{ default => 16000 #{ default => 16000,
desc => "GC the process after this many received messages."
})} })}
, {"bytes", , {"bytes",
sc(bytesize(), sc(bytesize(),
#{ default => "16MB" #{ default => "16MB",
desc => "GC the process after specified number of bytes have passed through."
})} })}
]; ];
@ -1014,7 +1073,7 @@ fields("alarm") ->
example => [log, publish], example => [log, publish],
desc => desc =>
"""The actions triggered when the alarm is activated.<\br> """The actions triggered when the alarm is activated.<\br>
Currently supports two actions, 'log' and 'publish'. Currently support two actions, 'log' and 'publish'.
'log' is to write the alarm to log (console or file). 'log' is to write the alarm to log (console or file).
'publish' is to publish the alarm as an MQTT message to the system topics: 'publish' is to publish the alarm as an MQTT message to the system topics:
<code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate</code> and <code>$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate</code> and

View File

@ -39,4 +39,3 @@ t_check(_) ->
?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)), ?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)). ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)).