diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 01e32410d..36a1a7a40 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1064,11 +1064,11 @@ check_pub_alias(_Packet, _Channel) -> ok. %% Check Pub Caps check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, - retain = Retain - } + retain = Retain}, + variable = #mqtt_packet_publish{topic_name = Topic} }, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}). + emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}). %% Check Sub check_subscribe(TopicFilter, SubOpts, Channel) -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 25d2ee5e4..a1bdefcb0 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -46,7 +46,7 @@ -define(UNLIMITED, 0). --define(PUBCAP_KEYS, [max_topic_alias, +-define(PUBCAP_KEYS, [max_topic_levels, max_qos_allowed, retain_available ]). @@ -73,8 +73,16 @@ retain => boolean()}) -> ok_or_error(emqx_types:reason_code())). check_pub(Zone, Flags) when is_map(Flags) -> - do_check_pub(Flags, get_caps(Zone, publish)). + do_check_pub(case maps:take(topic, Flags) of + {Topic, Flags1} -> + Flags1#{topic_levels => emqx_topic:levels(Topic)}; + error -> + Flags + end, get_caps(Zone, publish)). +do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) + when Limit > 0, Levels > Limit -> + {error, ?RC_TOPIC_NAME_INVALID}; do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS}) when QoS > MaxQoS -> {error, ?RC_QOS_NOT_SUPPORTED}; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index a945e23f7..a66118752 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -539,12 +539,12 @@ enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) -> enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -enrich_subopt([{rap, 1}|Opts], Msg, Session) -> - enrich_subopt(Opts, Msg, Session); -enrich_subopt([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> - enrich_subopt(Opts, Msg, Session); +enrich_subopt([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> + enrich_subopt(Opts, emqx_message:set_flag(retain, true, Msg), Session); enrich_subopt([{rap, 0}|Opts], Msg, Session) -> enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session); +enrich_subopt([{rap, 1}|Opts], Msg, Session) -> + enrich_subopt(Opts, Msg, Session); enrich_subopt([{subid, SubId}|Opts], Msg, Session) -> Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), enrich_subopt(Opts, Msg1, Session).