This commit is contained in:
Feng 2015-09-29 10:10:30 +08:00
parent dfc3194d53
commit 86a1f7f7d4
2 changed files with 39 additions and 40 deletions

View File

@ -55,8 +55,8 @@
packet_opts, packet_opts,
keepalive}). keepalive}).
start_link(SockArgs, PktOpts) -> start_link(SockArgs, MqttEnv) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
session(CPid) -> session(CPid) ->
gen_server:call(CPid, session). gen_server:call(CPid, session).
@ -70,14 +70,15 @@ kick(CPid) ->
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
gen_server:cast(CPid, {subscribe, TopicTable}). gen_server:cast(CPid, {subscribe, TopicTable}).
init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
% Transform if ssl. % Transform if ssl.
{ok, NewSock} = esockd_connection:accept(SockArgs), {ok, NewSock} = esockd_connection:accept(SockArgs),
{ok, Peername} = emqttd_net:peername(Sock), {ok, Peername} = emqttd_net:peername(Sock),
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
lager:info("Connect from ~s", [ConnStr]), lager:info("Connect from ~s", [ConnStr]),
SendFun = fun(Data) -> Transport:send(NewSock, Data) end, SendFun = fun(Data) -> Transport:send(NewSock, Data) end,
ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts), PktOpts = proplists:get_value(packet, MqttEnv),
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
State = control_throttle(#state{transport = Transport, State = control_throttle(#state{transport = Transport,
socket = NewSock, socket = NewSock,
peername = Peername, peername = Peername,
@ -85,10 +86,12 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
await_recv = false, await_recv = false,
conn_state = running, conn_state = running,
conserve = false, conserve = false,
packet_opts = PacketOpts, packet_opts = PktOpts,
parser = emqttd_parser:new(PacketOpts), parser = emqttd_parser:new(PktOpts),
proto_state = ProtoState}), proto_state = ProtoState}),
gen_server:enter_loop(?MODULE, [], State, 10000). ClientOpts = proplists:get_value(client, MqttEnv),
IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
handle_call(session, _From, State = #state{proto_state = ProtoState}) -> handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State}; {reply, emqttd_protocol:session(ProtoState), State};

View File

@ -119,11 +119,8 @@
%% Awaiting timers for ack, rel. %% Awaiting timers for ack, rel.
awaiting_ack :: map(), awaiting_ack :: map(),
%% Retries to resend the unacked messages %% Retry interval for redelivering QoS1/2 messages
unack_retries = 3, retry_interval = 20,
%% 4, 8, 16 seconds if 3 retries:)
unack_timeout = 4,
%% Awaiting for PUBCOMP %% Awaiting for PUBCOMP
awaiting_comp :: map(), awaiting_comp :: map(),
@ -237,8 +234,7 @@ init([CleanSess, ClientId, ClientPid]) ->
awaiting_rel = #{}, awaiting_rel = #{},
awaiting_ack = #{}, awaiting_ack = #{},
awaiting_comp = #{}, awaiting_comp = #{},
unack_retries = emqttd_opts:g(unack_retries, SessEnv), retry_interval = emqttd_opts:g(unack_retry_interval, SessEnv),
unack_timeout = emqttd_opts:g(unack_timeout, SessEnv),
await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600,
@ -394,7 +390,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
%% Clear awaiting_ack timers %% Clear awaiting_ack timers
[cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)], [cancel_timer(TRef) || TRef <- maps:values(AwaitingAck)],
%% Clear awaiting_comp timers %% Clear awaiting_comp timers
[cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
@ -408,7 +404,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
%% Redeliver inflight messages %% Redeliver inflight messages
Session2 = Session2 =
lists:foldl(fun({_Id, Msg}, Sess) -> lists:foldl(fun({_Id, Msg}, Sess) ->
redeliver(Msg#mqtt_message{dup = true}, Sess) redeliver(Msg, Sess)
end, Session1, lists:reverse(InflightQ)), end, Session1, lists:reverse(InflightQ)),
%% Dequeue pending messages %% Dequeue pending messages
@ -417,7 +413,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
%% PUBACK %% PUBACK
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) -> handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) ->
case maps:find(PktId, AwaitingAck) of case maps:find(PktId, AwaitingAck) of
{ok, {_, TRef}} -> {ok, TRef} ->
cancel_timer(TRef), cancel_timer(TRef),
noreply(dequeue(acked(PktId, Session))); noreply(dequeue(acked(PktId, Session)));
error -> error ->
@ -431,7 +427,7 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
await_rel_timeout = Timeout}) -> await_rel_timeout = Timeout}) ->
case maps:find(PktId, AwaitingAck) of case maps:find(PktId, AwaitingAck) of
{ok, {_, TRef}} -> {ok, TRef} ->
cancel_timer(TRef), cancel_timer(TRef),
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
@ -497,22 +493,23 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde
%% just remove awaiting %% just remove awaiting
noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
inflight_queue = InflightQ,
awaiting_ack = AwaitingAck}) -> awaiting_ack = AwaitingAck}) ->
case maps:find(PktId, AwaitingAck) of case maps:find(PktId, AwaitingAck) of
{ok, {{0, _Timeout}, _TRef}} -> {ok, _TRef} ->
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), case lists:keyfind(PktId, 1, InflightQ) of
awaiting_ack = maps:remove(PktId, AwaitingAck)}, {_, Msg} ->
noreply(dequeue(Session1)); noreply(redeliver(Msg, Session));
{ok, {{Retries, Timeout}, _TRef}} -> false ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), lager:error([{client, ClientId}], "Session(~s):"
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), "Awaiting timeout but Cannot find PktId :~p", [ClientId, PktId]),
{noreply, Session#session{awaiting_ack = AwaitingAck1}, hibernate}; noreply(dequeue(Session))
end;
error -> error ->
% TODO: too many logs when overloaded... lager:error([{client, ClientId}], "Session(~s):"
% lager:error([{client, ClientId}], "Session ~s " "Cannot find Awaiting Ack:~p", [ClientId, PktId]),
% "Cannot find Awaiting Ack:~p", [ClientId, PktId]), noreply(Session)
{noreply, Session, hibernate}
end; end;
handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId,
@ -633,17 +630,16 @@ redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid}) redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
ClientPid ! {deliver, Msg}, ClientPid ! {deliver, Msg#mqtt_message{dup = true}},
await(Msg, Session). await(Msg, Session).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Awaiting ack for qos1, qos2 message %% Awaiting ack for qos1, qos2 message
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
unack_retries = Retries, retry_interval = Timeout}) ->
unack_timeout = Timeout}) ->
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting), Awaiting1 = maps:put(PktId, TRef, Awaiting),
Session#session{awaiting_ack = Awaiting1}. Session#session{awaiting_ack = Awaiting1}.
acked(PktId, Session = #session{client_id = ClientId, acked(PktId, Session = #session{client_id = ClientId,
@ -653,7 +649,7 @@ acked(PktId, Session = #session{client_id = ClientId,
{_, Msg} -> {_, Msg} ->
emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]); emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]);
false -> false ->
lager:error("Session(~s) cannot find acked message: ~p", [PktId]) lager:error("Session(~s): Cannot find acked message: ~p", [PktId])
end, end,
Session#session{awaiting_ack = maps:remove(PktId, Awaiting), Session#session{awaiting_ack = maps:remove(PktId, Awaiting),
inflight_queue = lists:keydelete(PktId, 1, InflightQ)}. inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.