Fix will retain checking (#2820)
Fix will retain checking and handle the retained flag correctly
This commit is contained in:
parent
68f6a43492
commit
b4bbfad415
|
@ -162,10 +162,11 @@ will_msg(#mqtt_packet_connect{client_id = ClientId,
|
||||||
will_qos = QoS,
|
will_qos = QoS,
|
||||||
will_topic = Topic,
|
will_topic = Topic,
|
||||||
will_props = Properties,
|
will_props = Properties,
|
||||||
will_payload = Payload}) ->
|
will_payload = Payload,
|
||||||
|
proto_ver = ProtoVer}) ->
|
||||||
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
|
||||||
Msg#message{flags = #{dup => false, retain => Retain},
|
Msg#message{flags = #{dup => false, retain => Retain},
|
||||||
headers = merge_props(#{username => Username}, Properties)}.
|
headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}.
|
||||||
|
|
||||||
merge_props(Headers, undefined) ->
|
merge_props(Headers, undefined) ->
|
||||||
Headers;
|
Headers;
|
||||||
|
|
|
@ -562,10 +562,10 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Creden
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
|
||||||
PState = #pstate{session = SPid, credentials = Credentials}) ->
|
PState = #pstate{session = SPid, credentials = Credentials, proto_ver = ProtoVer}) ->
|
||||||
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
|
Msg = emqx_mountpoint:mount(mountpoint(Credentials),
|
||||||
emqx_packet:to_message(Credentials, Packet)),
|
emqx_packet:to_message(Credentials, Packet)),
|
||||||
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
|
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg))), PState).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Puback -> Client
|
%% Puback -> Client
|
||||||
|
@ -834,8 +834,8 @@ check_will_retain(#mqtt_packet_connect{will_retain = false, proto_ver = ?MQTT_PR
|
||||||
ok;
|
ok;
|
||||||
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
|
check_will_retain(#mqtt_packet_connect{will_retain = true, proto_ver = ?MQTT_PROTO_V5}, #pstate{zone = Zone}) ->
|
||||||
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
|
case emqx_zone:get_env(Zone, mqtt_retain_available, true) of
|
||||||
true -> {error, ?RC_RETAIN_NOT_SUPPORTED};
|
true -> ok;
|
||||||
false -> ok
|
false -> {error, ?RC_RETAIN_NOT_SUPPORTED}
|
||||||
end;
|
end;
|
||||||
check_will_retain(_Packet, _PState) ->
|
check_will_retain(_Packet, _PState) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -877,12 +877,14 @@ process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #sta
|
||||||
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
true -> process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State);
|
||||||
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
|
false -> process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State)
|
||||||
end;
|
end;
|
||||||
process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) ->
|
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
|
||||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State);
|
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
|
||||||
process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) ->
|
process_subopts([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
|
||||||
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, State);
|
process_subopts(Opts, Msg, Session);
|
||||||
process_subopts([{rap, _}|Opts], Msg, State) ->
|
process_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) ->
|
||||||
process_subopts(Opts, Msg, State);
|
process_subopts(Opts, Msg, Session);
|
||||||
|
process_subopts([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) ->
|
||||||
|
process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
|
||||||
process_subopts([{subid, SubId}|Opts], Msg, State) ->
|
process_subopts([{subid, SubId}|Opts], Msg, State) ->
|
||||||
process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
|
process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue