From bf942e4bec923b587d4cc3e138f88a7bee0ea8d6 Mon Sep 17 00:00:00 2001 From: Mousse Date: Thu, 22 Aug 2019 16:21:27 +0800 Subject: [PATCH] Handle the retained flag correctly (#2811) Handle the retained flag correctly --- src/emqx_channel.erl | 4 ++-- src/emqx_packet.erl | 5 +++-- src/emqx_session.erl | 10 ++++++---- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 57ff91a16..8c4412f42 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -407,10 +407,10 @@ process_connect(ConnPkt, Channel) -> %% Process Publish process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), - Channel = #channel{client = Client}) -> + Channel = #channel{client = Client, proto_ver = ProtoVer}) -> Msg = emqx_packet:to_message(Client, Packet), %%TODO: Improve later. - Msg1 = emqx_message:set_flag(dup, false, Msg), + Msg1 = emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg)), process_publish(PacketId, mount(Client, Msg1), Channel). process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index d8205b98a..ea5657f41 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -165,10 +165,11 @@ will_msg(#mqtt_packet_connect{client_id = ClientId, will_qos = QoS, will_topic = Topic, will_props = Properties, - will_payload = Payload}) -> + will_payload = Payload, + proto_ver = ProtoVer}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => false, retain => Retain}, - headers = merge_props(#{username => Username}, Properties)}. + headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}. merge_props(Headers, undefined) -> Headers; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1cb449290..72a683b2a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -567,12 +567,14 @@ enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{up enrich(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) -> enrich(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -enrich([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, Session = #session{}) -> - enrich(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, Session); -enrich([{rap, 0}|Opts], Msg = #message{flags = Flags}, Session) -> +enrich([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) -> enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); -enrich([{rap, _}|Opts], Msg, Session) -> +enrich([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) -> enrich(Opts, Msg, Session); +enrich([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) -> + enrich(Opts, Msg, Session); +enrich([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) -> + enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); enrich([{subid, SubId}|Opts], Msg, Session) -> enrich(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), Session).