Merge pull request #1004 from emqtt/develop

Fix crash caused by duplicated PUBREC packet
This commit is contained in:
Feng Lee 2017-04-20 10:15:30 +08:00 committed by GitHub
commit 3c9cde59c7
2 changed files with 17 additions and 7 deletions

View File

@ -1,6 +1,6 @@
PROJECT = emqttd PROJECT = emqttd
PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_DESCRIPTION = Erlang MQTT Broker
PROJECT_VERSION = 2.1.1 PROJECT_VERSION = 2.1.2
DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog

View File

@ -726,16 +726,26 @@ await(Msg = #mqtt_message{pktid = PacketId},
acked(puback, PacketId, State = #state{client_id = ClientId, acked(puback, PacketId, State = #state{client_id = ClientId,
username = Username, username = Username,
inflight = Inflight}) -> inflight = Inflight}) ->
{publish, Msg, _Ts} = Inflight:lookup(PacketId), case Inflight:lookup(PacketId) of
{publish, Msg, _Ts} ->
emqttd_hooks:run('message.acked', [ClientId, Username], Msg), emqttd_hooks:run('message.acked', [ClientId, Username], Msg),
State#state{inflight = Inflight:delete(PacketId)}; State#state{inflight = Inflight:delete(PacketId)};
_ ->
?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
State
end;
acked(pubrec, PacketId, State = #state{client_id = ClientId, acked(pubrec, PacketId, State = #state{client_id = ClientId,
username = Username, username = Username,
inflight = Inflight}) -> inflight = Inflight}) ->
{publish, Msg, _Ts} = Inflight:lookup(PacketId), case Inflight:lookup(PacketId) of
{publish, Msg, _Ts} ->
emqttd_hooks:run('message.acked', [ClientId, Username], Msg), emqttd_hooks:run('message.acked', [ClientId, Username], Msg),
State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})};
{pubrel, PacketId, _Ts} ->
?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
State
end;
acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> acked(pubcomp, PacketId, State = #state{inflight = Inflight}) ->
State#state{inflight = Inflight:delete(PacketId)}. State#state{inflight = Inflight:delete(PacketId)}.