Check topic level for publish packet and optimize the handling of rap
This commit is contained in:
parent
d004a5b68e
commit
7512d6cb03
|
@ -1064,11 +1064,11 @@ check_pub_alias(_Packet, _Channel) -> ok.
|
||||||
|
|
||||||
%% Check Pub Caps
|
%% Check Pub Caps
|
||||||
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
|
check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS,
|
||||||
retain = Retain
|
retain = Retain},
|
||||||
}
|
variable = #mqtt_packet_publish{topic_name = Topic}
|
||||||
},
|
},
|
||||||
#channel{clientinfo = #{zone := Zone}}) ->
|
#channel{clientinfo = #{zone := Zone}}) ->
|
||||||
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain}).
|
emqx_mqtt_caps:check_pub(Zone, #{qos => QoS, retain => Retain, topic => Topic}).
|
||||||
|
|
||||||
%% Check Sub
|
%% Check Sub
|
||||||
check_subscribe(TopicFilter, SubOpts, Channel) ->
|
check_subscribe(TopicFilter, SubOpts, Channel) ->
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
|
|
||||||
-define(UNLIMITED, 0).
|
-define(UNLIMITED, 0).
|
||||||
|
|
||||||
-define(PUBCAP_KEYS, [max_topic_alias,
|
-define(PUBCAP_KEYS, [max_topic_levels,
|
||||||
max_qos_allowed,
|
max_qos_allowed,
|
||||||
retain_available
|
retain_available
|
||||||
]).
|
]).
|
||||||
|
@ -73,8 +73,16 @@
|
||||||
retain => boolean()})
|
retain => boolean()})
|
||||||
-> ok_or_error(emqx_types:reason_code())).
|
-> ok_or_error(emqx_types:reason_code())).
|
||||||
check_pub(Zone, Flags) when is_map(Flags) ->
|
check_pub(Zone, Flags) when is_map(Flags) ->
|
||||||
do_check_pub(Flags, get_caps(Zone, publish)).
|
do_check_pub(case maps:take(topic, Flags) of
|
||||||
|
{Topic, Flags1} ->
|
||||||
|
Flags1#{topic_levels => emqx_topic:levels(Topic)};
|
||||||
|
error ->
|
||||||
|
Flags
|
||||||
|
end, get_caps(Zone, publish)).
|
||||||
|
|
||||||
|
do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
|
||||||
|
when Limit > 0, Levels > Limit ->
|
||||||
|
{error, ?RC_TOPIC_NAME_INVALID};
|
||||||
do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS})
|
do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS})
|
||||||
when QoS > MaxQoS ->
|
when QoS > MaxQoS ->
|
||||||
{error, ?RC_QOS_NOT_SUPPORTED};
|
{error, ?RC_QOS_NOT_SUPPORTED};
|
||||||
|
|
|
@ -539,12 +539,12 @@ enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
|
||||||
enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
|
enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
|
||||||
Session = #session{upgrade_qos= false}) ->
|
Session = #session{upgrade_qos= false}) ->
|
||||||
enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
|
enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
|
||||||
enrich_subopt([{rap, 1}|Opts], Msg, Session) ->
|
enrich_subopt([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
|
||||||
enrich_subopt(Opts, Msg, Session);
|
enrich_subopt(Opts, emqx_message:set_flag(retain, true, Msg), Session);
|
||||||
enrich_subopt([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
|
|
||||||
enrich_subopt(Opts, Msg, Session);
|
|
||||||
enrich_subopt([{rap, 0}|Opts], Msg, Session) ->
|
enrich_subopt([{rap, 0}|Opts], Msg, Session) ->
|
||||||
enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session);
|
enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session);
|
||||||
|
enrich_subopt([{rap, 1}|Opts], Msg, Session) ->
|
||||||
|
enrich_subopt(Opts, Msg, Session);
|
||||||
enrich_subopt([{subid, SubId}|Opts], Msg, Session) ->
|
enrich_subopt([{subid, SubId}|Opts], Msg, Session) ->
|
||||||
Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
|
Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
|
||||||
enrich_subopt(Opts, Msg1, Session).
|
enrich_subopt(Opts, Msg1, Session).
|
||||||
|
|
Loading…
Reference in New Issue