From cae743803b8eab088696541ad47d00f4b73b0725 Mon Sep 17 00:00:00 2001 From: HeeeJianBo Date: Fri, 3 Nov 2017 17:45:04 +0800 Subject: [PATCH] Improve the process logic of DUP flag (#1319). --- src/emqttd_parser.erl | 8 +++++++- src/emqttd_protocol.erl | 8 ++------ src/emqttd_session.erl | 15 +++++++++++++-- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index dde9ae4dc..27be18a25 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -124,7 +124,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) _ -> <> = Rest1, {Id, R} end, - wrap(Header, #mqtt_packet_publish{topic_name = TopicName, + wrap(fixdup(Header), #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId}, Payload, Rest); {?PUBACK, <>} -> @@ -222,3 +222,9 @@ fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. +%% Fix Issue#1319 +fixdup(Header = #mqtt_packet_header{qos = ?QOS0, dup = true}) -> + Header#mqtt_packet_header{dup = false}; +fixdup(Header = #mqtt_packet_header{qos = ?QOS2, dup = true}) -> + Header#mqtt_packet_header{dup = false}; +fixdup(Header) -> Header. diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 4c4ec55c5..31354dd84 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -308,9 +308,7 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), username = Username, mountpoint = MountPoint, session = Session}) -> - Msg0 = emqttd_message:from_packet(Username, ClientId, Packet), - % MQTT 3.3.1-3: Need reset DUP flag when recv publish message - Msg = emqttd_message:unset_flag(dup, Msg0), + Msg = emqttd_message:from_packet(Username, ClientId, Packet), emqttd_session:publish(Session, mount(MountPoint, Msg)); publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> @@ -324,9 +322,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), username = Username, mountpoint = MountPoint, session = Session}) -> - Msg0 = emqttd_message:from_packet(Username, ClientId, Packet), - % MQTT 3.3.1-3: Need reset DUP flag when recv publish message - Msg = emqttd_message:unset_flag(dup, Msg0), + Msg = emqttd_message:from_packet(Username, ClientId, Packet), case emqttd_session:publish(Session, mount(MountPoint, Msg)) of ok -> send(?PUBACK_PACKET(Type, PacketId), State); diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 0a2893f0b..506743d03 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -535,11 +535,11 @@ handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) -> case IgnoreLoopDeliver of true -> {noreply, State, hibernate}; - false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate} + false -> {noreply, handle_dispatch(Topic, Msg, State), hibernate} end; %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> - {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}; + {noreply, handle_dispatch(Topic, Msg, State), hibernate}; %% Do nothing if the client has been disconnected. handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) -> @@ -687,6 +687,9 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) %% Dispatch Messages %%-------------------------------------------------------------------- +handle_dispatch(Topic, Msg, State) -> + gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)). + %% Enqueue message if the client has been disconnected dispatch(Msg, State = #state{client_pid = undefined}) -> enqueue_msg(Msg, State); @@ -801,6 +804,14 @@ tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS}, Msg end. +%%-------------------------------------------------------------------- +%% Reset Dup +%%-------------------------------------------------------------------- + +reset_dup(Msg = #mqtt_message{dup = true}) -> + Msg#mqtt_message{dup = false}; +reset_dup(Msg) -> Msg. + %%-------------------------------------------------------------------- %% Next Msg Id %%--------------------------------------------------------------------