fix issue#42: Redelivery on reconnect

This commit is contained in:
Ery Lee 2015-01-16 01:13:02 +08:00
parent 46d1749120
commit e31068787b
3 changed files with 33 additions and 7 deletions

View File

@ -37,6 +37,8 @@
-include("emqtt.hrl").
-include("emqtt_packet.hrl").
%%Client State...
-record(state, {
socket,
@ -103,6 +105,10 @@ handle_info({dispatch, {From, Message}}, #state{proto_state = ProtoState} = Stat
{ok, ProtoState1} = emqtt_protocol:send_message({From, Message}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqtt_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate};

View File

@ -32,7 +32,7 @@
-export([initial_state/2, client_id/1]).
-export([handle_packet/2, send_message/2, send_packet/2, shutdown/2]).
-export([handle_packet/2, send_message/2, send_packet/2, redeliver/2, shutdown/2]).
-export([info/1]).
@ -253,6 +253,12 @@ send_packet(Packet, State = #proto_state{socket = Sock, peer_name = PeerName, cl
erlang:port_command(Sock, Data),
{ok, State}.
%%
%% @doc redeliver PUBREL PacketId
%%
redeliver({?PUBREL, PacketId}, State) ->
send_packet( make_packet(?PUBREL, PacketId), State).
shutdown(Error, #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) ->
send_willmsg(WillMsg),
try_unregister(ClientId, self()),

View File

@ -228,14 +228,28 @@ handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session_state {
client_id = ClientId,
client_pid = undefined,
msg_queue = Queue,
expire_timer = ETimer}) ->
lager:info("Session: client ~s resumed by ~p", [ClientId, ClientPid]),
client_id = ClientId,
client_pid = undefined,
msg_queue = Queue,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
expire_timer = ETimer}) ->
lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]),
erlang:cancel_timer(ETimer),
%% redelivery PUBREL
[ ClientPid ! {redeliver, {?PUBREL, PacketId}} || PacketId <- maps:keys(AwaitingComp) ],
%% redelivery messages that awaiting PUBACK or PUBREC
Dup = fun(Msg) -> Msg#mqtt_message{ dup = true } end,
[ ClientPid ! {dispatch, {self(), Dup(Message)}} || Message <- maps:values(AwaitingAck) ],
%% send offline messages
[ClientPid ! {dispatch, {self(), Message}} || Message <- emqtt_queue:all(Queue)],
NewState = State#session_state{ client_pid = ClientPid, msg_queue = emqtt_queue:clear(Queue), expire_timer = undefined},
NewState = State#session_state{ client_pid = ClientPid,
msg_queue = emqtt_queue:clear(Queue),
expire_timer = undefined},
{noreply, NewState, hibernate};
handle_cast({publish, ?QOS_2, Message}, State) ->