diff --git a/etc/emq.conf b/etc/emq.conf index 82209dde7..1c0859921 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -177,6 +177,9 @@ mqtt.max_packet_size = 64KB ## Check Websocket Protocol Header. Enum: on, off mqtt.websocket_protocol_header = on +## The Keepalive timeout: Keepalive * backoff * 2 +mqtt.keepalive_backoff = 1.25 + ##-------------------------------------------------------------------- ## MQTT Connection ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index f4c369b9c..bd533d05e 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -474,9 +474,16 @@ end}. {datatype, bytesize} ]}. +%% @doc Keepalive backoff +{mapping, "mqtt.keepalive_backoff", "emqttd.protocol", [ + {default, 1.25}, + {datatype, float} +]}. + {translation, "emqttd.protocol", fun(Conf) -> [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, - {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] + {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, + {keepalive_backoff, cuttlefish:conf_get("mqtt.keepalive_backoff", Conf)}] end}. {mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b5be7c066..0129faedd 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -42,8 +42,9 @@ %% ws_initial_headers: Headers from first HTTP request for WebSocket Client. -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, proto_ver, proto_name, username, is_superuser, - will_msg, keepalive, max_clientid_len, session, stats_data, - mountpoint, ws_initial_headers, connected_at}). + will_msg, keepalive, keepalive_backoff, max_clientid_len, + session, stats_data, mountpoint, ws_initial_headers, + connected_at}). -type(proto_state() :: #proto_state{}). @@ -58,6 +59,7 @@ %% @doc Init protocol init(Peername, SendFun, Opts) -> + Backoff = get_value(keepalive_backoff, Opts, 1.25), EnableStats = get_value(client_enable_stats, Opts, false), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = get_value(ws_initial_headers, Opts), @@ -67,6 +69,7 @@ init(Peername, SendFun, Opts) -> is_superuser = false, client_pid = self(), ws_initial_headers = WsInitialHeaders, + keepalive_backoff = Backoff, stats_data = #proto_stats{enable_stats = EnableStats}}. init(Conn, Peername, SendFun, Opts) -> @@ -202,7 +205,7 @@ process(?CONNECT_PACKET(Var), State0) -> %% Register the client emqttd_cm:reg(client(State2)), %% Start keepalive - start_keepalive(KeepAlive), + start_keepalive(KeepAlive, State2), %% Emit Stats self() ! emit_stats, %% ACCEPT @@ -411,10 +414,10 @@ send_willmsg(_Client, undefined) -> send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). -start_keepalive(0) -> ignore; +start_keepalive(0, _State) -> ignore; -start_keepalive(Sec) when Sec > 0 -> - self() ! {keepalive, start, round(Sec * 1.25)}. +start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 -> + self() ! {keepalive, start, round(Sec * Backoff)}. %%-------------------------------------------------------------------- %% Validate Packets