From e31068787b4ba5810f78f082f119122b9301a656 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Fri, 16 Jan 2015 01:13:02 +0800 Subject: [PATCH] fix issue#42: Redelivery on reconnect --- apps/emqtt/src/emqtt_client.erl | 6 ++++++ apps/emqtt/src/emqtt_protocol.erl | 8 +++++++- apps/emqtt/src/emqtt_session.erl | 26 ++++++++++++++++++++------ 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index d0b4774f2..751f29d17 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -37,6 +37,8 @@ -include("emqtt.hrl"). +-include("emqtt_packet.hrl"). + %%Client State... -record(state, { socket, @@ -103,6 +105,10 @@ handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = Stat {ok, ProtoState1} = emqtt_protocol:send_message({From, Message}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; +handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqtt_protocol:redeliver({?PUBREL, PacketId}, ProtoState), + {noreply, State#state{proto_state = ProtoState1}}; + handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 6481a2e81..d0857c554 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -32,7 +32,7 @@ -export([initial_state/2, client_id/1]). --export([handle_packet/2, send_message/2, send_packet/2, shutdown/2]). +-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]). -export([info/1]). @@ -253,6 +253,12 @@ send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, cl erlang:port_command(Sock, Data), {ok, State}. +%% +%% @doc redeliver PUBREL PacketId +%% +redeliver({?PUBREL, PacketId}, State) -> + send_packet( make_packet(?PUBREL, PacketId), State). + shutdown(Error, #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) -> send_willmsg(WillMsg), try_unregister(ClientId, self()), diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index c4abd8e3f..e4d1e04c7 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -228,14 +228,28 @@ handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast({resume, ClientId, ClientPid}, State = #session_state { - client_id = ClientId, - client_pid = undefined, - msg_queue = Queue, - expire_timer = ETimer}) -> - lager:info("Session: client ~s resumed by ~p", [ClientId, ClientPid]), + client_id = ClientId, + client_pid = undefined, + msg_queue = Queue, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + expire_timer = ETimer}) -> + lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]), erlang:cancel_timer(ETimer), + + %% redelivery PUBREL + [ ClientPid ! {redeliver, {?PUBREL, PacketId}} || PacketId <- maps:keys(AwaitingComp) ], + + %% redelivery messages that awaiting PUBACK or PUBREC + Dup = fun(Msg) -> Msg#mqtt_message{ dup = true } end, + [ ClientPid ! {dispatch, {self(), Dup(Message)}} || Message <- maps:values(AwaitingAck) ], + + %% send offline messages [ClientPid ! {dispatch, {self(), Message}} || Message <- emqtt_queue:all(Queue)], - NewState = State#session_state{ client_pid = ClientPid, msg_queue = emqtt_queue:clear(Queue), expire_timer = undefined}, + + NewState = State#session_state{ client_pid = ClientPid, + msg_queue = emqtt_queue:clear(Queue), + expire_timer = undefined}, {noreply, NewState, hibernate}; handle_cast({publish, ?QOS_2, Message}, State) ->