Support to configure keepalive backoff

This commit is contained in:
Feng Lee 2017-08-07 18:27:16 +08:00
parent 925e35dcbd
commit 88f84a4a0c
3 changed files with 20 additions and 7 deletions

View File

@ -177,6 +177,9 @@ mqtt.max_packet_size = 64KB
## Check Websocket Protocol Header. Enum: on, off ## Check Websocket Protocol Header. Enum: on, off
mqtt.websocket_protocol_header = on mqtt.websocket_protocol_header = on
## The Keepalive timeout: Keepalive * backoff * 2
mqtt.keepalive_backoff = 1.25
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## MQTT Connection ## MQTT Connection
##-------------------------------------------------------------------- ##--------------------------------------------------------------------

View File

@ -474,9 +474,16 @@ end}.
{datatype, bytesize} {datatype, bytesize}
]}. ]}.
%% @doc Keepalive backoff
{mapping, "mqtt.keepalive_backoff", "emqttd.protocol", [
{default, 1.25},
{datatype, float}
]}.
{translation, "emqttd.protocol", fun(Conf) -> {translation, "emqttd.protocol", fun(Conf) ->
[{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
{max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
{keepalive_backoff, cuttlefish:conf_get("mqtt.keepalive_backoff", Conf)}]
end}. end}.
{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ {mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [

View File

@ -42,8 +42,9 @@
%% ws_initial_headers: Headers from first HTTP request for WebSocket Client. %% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
-record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid,
clean_sess, proto_ver, proto_name, username, is_superuser, clean_sess, proto_ver, proto_name, username, is_superuser,
will_msg, keepalive, max_clientid_len, session, stats_data, will_msg, keepalive, keepalive_backoff, max_clientid_len,
mountpoint, ws_initial_headers, connected_at}). session, stats_data, mountpoint, ws_initial_headers,
connected_at}).
-type(proto_state() :: #proto_state{}). -type(proto_state() :: #proto_state{}).
@ -58,6 +59,7 @@
%% @doc Init protocol %% @doc Init protocol
init(Peername, SendFun, Opts) -> init(Peername, SendFun, Opts) ->
Backoff = get_value(keepalive_backoff, Opts, 1.25),
EnableStats = get_value(client_enable_stats, Opts, false), EnableStats = get_value(client_enable_stats, Opts, false),
MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
WsInitialHeaders = get_value(ws_initial_headers, Opts), WsInitialHeaders = get_value(ws_initial_headers, Opts),
@ -67,6 +69,7 @@ init(Peername, SendFun, Opts) ->
is_superuser = false, is_superuser = false,
client_pid = self(), client_pid = self(),
ws_initial_headers = WsInitialHeaders, ws_initial_headers = WsInitialHeaders,
keepalive_backoff = Backoff,
stats_data = #proto_stats{enable_stats = EnableStats}}. stats_data = #proto_stats{enable_stats = EnableStats}}.
init(Conn, Peername, SendFun, Opts) -> init(Conn, Peername, SendFun, Opts) ->
@ -202,7 +205,7 @@ process(?CONNECT_PACKET(Var), State0) ->
%% Register the client %% Register the client
emqttd_cm:reg(client(State2)), emqttd_cm:reg(client(State2)),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive), start_keepalive(KeepAlive, State2),
%% Emit Stats %% Emit Stats
self() ! emit_stats, self() ! emit_stats,
%% ACCEPT %% ACCEPT
@ -411,10 +414,10 @@ send_willmsg(_Client, undefined) ->
send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
start_keepalive(0) -> ignore; start_keepalive(0, _State) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec, #proto_state{keepalive_backoff = Backoff}) when Sec > 0 ->
self() ! {keepalive, start, round(Sec * 1.25)}. self() ! {keepalive, start, round(Sec * Backoff)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Validate Packets %% Validate Packets