Merge pull request #323 from emqtt/dev-feng

fix issue #264 - Qos1 message may be dropped under unstable mobile network
This commit is contained in:
Feng Lee 2015-09-29 12:25:50 +08:00
commit 0ac3ae4ba5
6 changed files with 54 additions and 300 deletions

View File

@ -1,244 +0,0 @@
% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
[{kernel,
[{start_timer, true},
{start_pg2, true}
]},
{sasl, [
{sasl_error_logger, {file, "log/emqttd_sasl.log"}}
]},
{ssl, [
%{versions, ['tlsv1.2', 'tlsv1.1']}
]},
{lager, [
{colored, true},
{async_threshold, 1000},
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
{lager_console_backend, info},
{lager_file_backend, [
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_info.log"},
{level, info},
{size, 104857600},
{date, "$D0"},
{count, 30}
]},
{lager_file_backend, [
{formatter_config, [time, " ", pid, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_error.log"},
{level, error},
{size, 104857600},
{date, "$D0"},
{count, 30}
]}
]}
]},
{esockd, [
{logger, {lager, info}}
]},
{emqttd, [
%% Authentication and Authorization
{access, [
%% Authetication. Anonymous Default
{auth, [
%% Authentication with username, password
%{username, []},
%% Authentication with clientid
%{clientid, [{password, no}, {file, "etc/clients.config"}]},
%% Authentication with LDAP
% {ldap, [
% {servers, ["localhost"]},
% {port, 389},
% {timeout, 30},
% {user_dn, "uid=$u,ou=People,dc=example,dc=com"},
% {ssl, fasle},
% {sslopts, [
% {"certfile", "ssl.crt"},
% {"keyfile", "ssl.key"}]}
% ]},
%% Allow all
{anonymous, []}
]},
%% ACL config
{acl, [
%% Internal ACL module
{internal, [{file, "etc/acl.config"}, {nomatch, allow}]}
]}
]},
%% MQTT Protocol Options
{mqtt, [
%% Packet
{packet, [
%% Max ClientId Length Allowed
{max_clientid_len, 1024},
%% Max Packet Size Allowed, 64K default
{max_packet_size, 65536}
]},
%% Client
{client, [
%TODO: Network ingoing limit
%{ingoing_rate_limit, '64KB/s'}
%TODO: Reconnet control
]},
%% Session
{session, [
%% Max number of QoS 1 and 2 messages that can be “in flight” at one time.
%% 0 means no limit
{max_inflight, 100},
%% Max retries for unack Qos1/2 messages
{unack_retries, 3},
%% Retry after 4, 8, 16 seconds
{unack_timeout, 4},
%% Awaiting PUBREL Timeout
{await_rel_timeout, 8},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, 0},
%% Statistics Collection Interval(seconds)
{collect_interval, 10},
%% Expired after 2 days
{expired_after, 48}
]},
%% Session
{queue, [
%% Max queue length. enqueued messages when persistent client disconnected,
%% or inflight window is full.
{max_length, 100},
%% Low-water mark of queued messsages
{low_watermark, 0.2},
%% High-water mark of queued messsages
{high_watermark, 0.6},
%% Queue Qos0 messages?
{queue_qos0, true}
]}
]},
%% Broker Options
{broker, [
%% System interval of publishing broker $SYS messages
{sys_interval, 60},
%% Retained messages
{retained, [
%% Max number of retained messages
{max_message_num, 100000},
%% Max Payload Size of retained message
{max_playload_size, 65536}
]},
%% PubSub
{pubsub, [
%% default should be scheduler numbers
%% {pool_size, 8}
]},
%% Bridge
{bridge, [
%%TODO: bridge queue size
{max_queue_len, 10000},
%% Ping Interval of bridge node
{ping_down_interval, 1} %seconds
]}
]},
%% Modules
{modules, [
%% Client presence management module.
%% Publish messages when client connected or disconnected
{presence, [{qos, 0}]}
%% Subscribe topics automatically when client connected
%% {autosub, [{"$Q/client/$c", 0}]}
%% Rewrite rules
%% {rewrite, [{file, "etc/rewrite.config"}]}
]},
%% Plugins
{plugins, [
%% Plugin App Library Dir
{plugins_dir, "./plugins"},
%% File to store loaded plugin names.
{loaded_file, "./data/loaded_plugins"}
]},
%% Listeners
{listeners, [
{mqtt, 1883, [
%% Size of acceptor pool
{acceptors, 16},
%% Maximum number of concurrent clients
{max_clients, 512},
%% Socket Access Control
{access, [{allow, all}]},
%% Socket Options
{sockopts, [
{backlog, 512}
%Set buffer if hight thoughtput
%{recbuf, 4096},
%{sndbuf, 4096}
%{buffer, 4096},
]}
]},
{mqtts, 8883, [
%% Size of acceptor pool
{acceptors, 4},
%% Maximum number of concurrent clients
{max_clients, 512},
%% Socket Access Control
{access, [{allow, all}]},
%% SSL certificate and key files
{ssl, [{certfile, "etc/ssl/ssl.crt"},
{keyfile, "etc/ssl/ssl.key"}]},
%% Socket Options
{sockopts, [
{backlog, 1024}
%{buffer, 4096},
]}
]},
%% WebSocket over HTTPS Listener
%% {https, 8083, [
%% %% Size of acceptor pool
%% {acceptors, 4},
%% %% Maximum number of concurrent clients
%% {max_clients, 512},
%% %% Socket Access Control
%% {access, [{allow, all}]},
%% %% SSL certificate and key files
%% {ssl, [{certfile, "etc/ssl/ssl.crt"},
%% {keyfile, "etc/ssl/ssl.key"}]},
%% %% Socket Options
%% {sockopts, [
%% %{buffer, 4096},
%% {backlog, 1024}
%% ]}
%%]},
%% HTTP and WebSocket Listener
{http, 8083, [
%% Size of acceptor pool
{acceptors, 4},
%% Maximum number of concurrent clients
{max_clients, 64},
%% Socket Access Control
{access, [{allow, all}]},
%% Socket Options
{sockopts, [
{backlog, 1024}
%{buffer, 4096},
]}
]}
]}
]}
].

View File

@ -81,6 +81,8 @@
]},
%% Client
{client, [
%% Socket is connected, but no 'CONNECT' packet received
{idle_timeout, 10} %% seconds
%TODO: Network ingoing limit
%{ingoing_rate_limit, '64KB/s'}
%TODO: Reconnet control
@ -91,20 +93,17 @@
%% 0 means no limit
{max_inflight, 100},
%% Max retries for unack Qos1/2 messages
{unack_retries, 3},
%% Retry after 4, 8, 16 seconds
{unack_timeout, 4},
%% Retry interval for redelivering QoS1/2 messages.
{unack_retry_interval, 20},
%% Awaiting PUBREL Timeout
{await_rel_timeout, 8},
{await_rel_timeout, 20},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, 0},
%% Statistics Collection Interval(seconds)
{collect_interval, 10},
{collect_interval, 20},
%% Expired after 2 days
{expired_after, 48}

View File

@ -73,6 +73,8 @@
]},
%% Client
{client, [
%% Socket is connected, but no 'CONNECT' packet received
{idle_timeout, 10} %% seconds
%TODO: Network ingoing limit
%{ingoing_rate_limit, '64KB/s'}
%TODO: Reconnet control
@ -83,14 +85,11 @@
%% 0 means no limit
{max_inflight, 100},
%% Max retries for unack Qos1/2 messages
{unack_retries, 3},
%% Retry after 8, 16, 32 seconds
{unack_timeout, 8},
%% Retry interval for redelivering QoS1/2 messages.
{unack_retry_interval, 30},
%% Awaiting PUBREL Timeout
{await_rel_timeout, 8},
{await_rel_timeout, 20},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, 0},
@ -133,7 +132,7 @@
%% PubSub
{pubsub, [
%% default should be scheduler numbers
{pool_size, 8}
%% {pool_size, 8}
]},
%% Bridge
{bridge, [

View File

@ -90,7 +90,7 @@ open_listener({https, Port, Options}) ->
mochiweb:start_http(Port, Options, MFArgs).
open_listener(Protocol, Port, Options) ->
MFArgs = {emqttd_client, start_link, [env(mqtt, packet)]},
MFArgs = {emqttd_client, start_link, [env(mqtt)]},
esockd:open(Protocol, Port, merge_sockopts(Options) , MFArgs).
merge_sockopts(Options) ->

View File

@ -55,8 +55,8 @@
packet_opts,
keepalive}).
start_link(SockArgs, PktOpts) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}.
start_link(SockArgs, MqttEnv) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
session(CPid) ->
gen_server:call(CPid, session).
@ -70,14 +70,15 @@ kick(CPid) ->
subscribe(CPid, TopicTable) ->
gen_server:cast(CPid, {subscribe, TopicTable}).
init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
% Transform if ssl.
{ok, NewSock} = esockd_connection:accept(SockArgs),
{ok, Peername} = emqttd_net:peername(Sock),
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
lager:info("Connect from ~s", [ConnStr]),
SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts),
PktOpts = proplists:get_value(packet, MqttEnv),
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
State = control_throttle(#state{transport = Transport,
socket = NewSock,
peername = Peername,
@ -85,10 +86,12 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
await_recv = false,
conn_state = running,
conserve = false,
packet_opts = PacketOpts,
parser = emqttd_parser:new(PacketOpts),
packet_opts = PktOpts,
parser = emqttd_parser:new(PktOpts),
proto_state = ProtoState}),
gen_server:enter_loop(?MODULE, [], State, 10000).
ClientOpts = proplists:get_value(client, MqttEnv),
IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State};

View File

@ -119,11 +119,8 @@
%% Awaiting timers for ack, rel.
awaiting_ack :: map(),
%% Retries to resend the unacked messages
unack_retries = 3,
%% 4, 8, 16 seconds if 3 retries:)
unack_timeout = 4,
%% Retry interval for redelivering QoS1/2 messages
retry_interval = 20,
%% Awaiting for PUBCOMP
awaiting_comp :: map(),
@ -223,7 +220,7 @@ unsubscribe(SessPid, Topics) ->
%%%=============================================================================
init([CleanSess, ClientId, ClientPid]) ->
%process_flag(trap_exit, true),
%% process_flag(trap_exit, true),
QEnv = emqttd:env(mqtt, queue),
SessEnv = emqttd:env(mqtt, session),
Session = #session{
@ -237,8 +234,7 @@ init([CleanSess, ClientId, ClientPid]) ->
awaiting_rel = #{},
awaiting_ack = #{},
awaiting_comp = #{},
unack_retries = emqttd_opts:g(unack_retries, SessEnv),
unack_timeout = emqttd_opts:g(unack_timeout, SessEnv),
retry_interval = emqttd_opts:g(unack_retry_interval, SessEnv),
await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600,
@ -394,7 +390,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
%% Clear awaiting_ack timers
[cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)],
[cancel_timer(TRef) || TRef <- maps:values(AwaitingAck)],
%% Clear awaiting_comp timers
[cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
@ -408,7 +404,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
%% Redeliver inflight messages
Session2 =
lists:foldl(fun({_Id, Msg}, Sess) ->
redeliver(Msg#mqtt_message{dup = true}, Sess)
redeliver(Msg, Sess)
end, Session1, lists:reverse(InflightQ)),
%% Dequeue pending messages
@ -417,7 +413,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
%% PUBACK
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) ->
case maps:find(PktId, AwaitingAck) of
{ok, {_, TRef}} ->
{ok, TRef} ->
cancel_timer(TRef),
noreply(dequeue(acked(PktId, Session)));
error ->
@ -431,7 +427,7 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
awaiting_comp = AwaitingComp,
await_rel_timeout = Timeout}) ->
case maps:find(PktId, AwaitingAck) of
{ok, {_, TRef}} ->
{ok, TRef} ->
cancel_timer(TRef),
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
@ -497,22 +493,24 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde
%% just remove awaiting
noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
inflight_queue = InflightQ,
awaiting_ack = AwaitingAck}) ->
lager:info("Awaiting Ack Timeout: ~p:", [PktId]),
case maps:find(PktId, AwaitingAck) of
{ok, {{0, _Timeout}, _TRef}} ->
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
awaiting_ack = maps:remove(PktId, AwaitingAck)},
noreply(dequeue(Session1));
{ok, {{Retries, Timeout}, _TRef}} ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
{noreply, Session#session{awaiting_ack = AwaitingAck1}, hibernate};
{ok, _TRef} ->
case lists:keyfind(PktId, 1, InflightQ) of
{_, Msg} ->
noreply(redeliver(Msg, Session));
false ->
lager:error([{client, ClientId}], "Session(~s):"
"Awaiting timeout but Cannot find PktId :~p", [ClientId, PktId]),
noreply(dequeue(Session))
end;
error ->
% TODO: too many logs when overloaded...
% lager:error([{client, ClientId}], "Session ~s "
% "Cannot find Awaiting Ack:~p", [ClientId, PktId]),
{noreply, Session, hibernate}
lager:error([{client, ClientId}], "Session(~s):"
"Cannot find Awaiting Ack:~p", [ClientId, PktId]),
noreply(Session)
end;
handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId,
@ -633,17 +631,16 @@ redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
ClientPid ! {deliver, Msg},
ClientPid ! {deliver, Msg#mqtt_message{dup = true}},
await(Msg, Session).
%%------------------------------------------------------------------------------
%% Awaiting ack for qos1, qos2 message
%%------------------------------------------------------------------------------
await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
unack_retries = Retries,
unack_timeout = Timeout}) ->
retry_interval = Timeout}) ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting),
Awaiting1 = maps:put(PktId, TRef, Awaiting),
Session#session{awaiting_ack = Awaiting1}.
acked(PktId, Session = #session{client_id = ClientId,
@ -653,7 +650,7 @@ acked(PktId, Session = #session{client_id = ClientId,
{_, Msg} ->
emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]);
false ->
lager:error("Session(~s) cannot find acked message: ~p", [PktId])
lager:error("Session(~s): Cannot find acked message: ~p", [PktId])
end,
Session#session{awaiting_ack = maps:remove(PktId, Awaiting),
inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.