diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index c9421413e..da38d9d8d 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -55,8 +55,8 @@ packet_opts, keepalive}). -start_link(SockArgs, PktOpts) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}. +start_link(SockArgs, MqttEnv) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. session(CPid) -> gen_server:call(CPid, session). @@ -70,14 +70,15 @@ kick(CPid) -> subscribe(CPid, TopicTable) -> gen_server:cast(CPid, {subscribe, TopicTable}). -init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> +init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> % Transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), {ok, Peername} = emqttd_net:peername(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), SendFun = fun(Data) -> Transport:send(NewSock, Data) end, - ProtoState = emqttd_protocol:init(Peername, SendFun, PacketOpts), + PktOpts = proplists:get_value(packet, MqttEnv), + ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), State = control_throttle(#state{transport = Transport, socket = NewSock, peername = Peername, @@ -85,10 +86,12 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> await_recv = false, conn_state = running, conserve = false, - packet_opts = PacketOpts, - parser = emqttd_parser:new(PacketOpts), + packet_opts = PktOpts, + parser = emqttd_parser:new(PktOpts), proto_state = ProtoState}), - gen_server:enter_loop(?MODULE, [], State, 10000). + ClientOpts = proplists:get_value(client, MqttEnv), + IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10), + gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)). handle_call(session, _From, State = #state{proto_state = ProtoState}) -> {reply, emqttd_protocol:session(ProtoState), State}; diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 62cb573a6..3db884d55 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -119,11 +119,8 @@ %% Awaiting timers for ack, rel. awaiting_ack :: map(), - %% Retries to resend the unacked messages - unack_retries = 3, - - %% 4, 8, 16 seconds if 3 retries:) - unack_timeout = 4, + %% Retry interval for redelivering QoS1/2 messages + retry_interval = 20, %% Awaiting for PUBCOMP awaiting_comp :: map(), @@ -237,8 +234,7 @@ init([CleanSess, ClientId, ClientPid]) -> awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{}, - unack_retries = emqttd_opts:g(unack_retries, SessEnv), - unack_timeout = emqttd_opts:g(unack_timeout, SessEnv), + retry_interval = emqttd_opts:g(unack_retry_interval, SessEnv), await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, @@ -394,7 +390,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], %% Clear awaiting_ack timers - [cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)], + [cancel_timer(TRef) || TRef <- maps:values(AwaitingAck)], %% Clear awaiting_comp timers [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], @@ -408,7 +404,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> %% Redeliver inflight messages Session2 = lists:foldl(fun({_Id, Msg}, Sess) -> - redeliver(Msg#mqtt_message{dup = true}, Sess) + redeliver(Msg, Sess) end, Session1, lists:reverse(InflightQ)), %% Dequeue pending messages @@ -417,7 +413,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> %% PUBACK handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of - {ok, {_, TRef}} -> + {ok, TRef} -> cancel_timer(TRef), noreply(dequeue(acked(PktId, Session))); error -> @@ -426,12 +422,12 @@ handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_a end; %% PUBREC -handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, +handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, await_rel_timeout = Timeout}) -> case maps:find(PktId, AwaitingAck) of - {ok, {_, TRef}} -> + {ok, TRef} -> cancel_timer(TRef), TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp), @@ -497,22 +493,23 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde %% just remove awaiting noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, + inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of - {ok, {{0, _Timeout}, _TRef}} -> - Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), - awaiting_ack = maps:remove(PktId, AwaitingAck)}, - noreply(dequeue(Session1)); - {ok, {{Retries, Timeout}, _TRef}} -> - TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), - AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), - {noreply, Session#session{awaiting_ack = AwaitingAck1}, hibernate}; + {ok, _TRef} -> + case lists:keyfind(PktId, 1, InflightQ) of + {_, Msg} -> + noreply(redeliver(Msg, Session)); + false -> + lager:error([{client, ClientId}], "Session(~s):" + "Awaiting timeout but Cannot find PktId :~p", [ClientId, PktId]), + noreply(dequeue(Session)) + end; error -> - % TODO: too many logs when overloaded... - % lager:error([{client, ClientId}], "Session ~s " - % "Cannot find Awaiting Ack:~p", [ClientId, PktId]), - {noreply, Session, hibernate} + lager:error([{client, ClientId}], "Session(~s):" + "Cannot find Awaiting Ack:~p", [ClientId, PktId]), + noreply(Session) end; handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, @@ -633,17 +630,16 @@ redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) -> redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = ClientPid}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - ClientPid ! {deliver, Msg}, + ClientPid ! {deliver, Msg#mqtt_message{dup = true}}, await(Msg, Session). %%------------------------------------------------------------------------------ %% Awaiting ack for qos1, qos2 message %%------------------------------------------------------------------------------ -await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, - unack_retries = Retries, - unack_timeout = Timeout}) -> +await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, + retry_interval = Timeout}) -> TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), - Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting), + Awaiting1 = maps:put(PktId, TRef, Awaiting), Session#session{awaiting_ack = Awaiting1}. acked(PktId, Session = #session{client_id = ClientId, @@ -653,7 +649,7 @@ acked(PktId, Session = #session{client_id = ClientId, {_, Msg} -> emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]); false -> - lager:error("Session(~s) cannot find acked message: ~p", [PktId]) + lager:error("Session(~s): Cannot find acked message: ~p", [PktId]) end, Session#session{awaiting_ack = maps:remove(PktId, Awaiting), inflight_queue = lists:keydelete(PktId, 1, InflightQ)}.