diff --git a/priv/emqx.schema b/priv/emqx.schema index 1d308476d..210248ab3 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -634,19 +634,19 @@ end}. ]}. %% @doc Whether the server supports MQTT retained messages. -{mapping, "mqtt.retain_available", "emqx.mqtt_retain_available", [ +{mapping, "mqtt.retain_available", "emqx.retain_available", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports MQTT Wildcard Subscriptions. -{mapping, "mqtt.wildcard_subscription", "emqx.mqtt_wildcard_subscription", [ +{mapping, "mqtt.wildcard_subscription", "emqx.wildcard_subscription", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports MQTT Shared Subscriptions. -{mapping, "mqtt.shared_subscription", "emqx.mqtt_shared_subscription", [ +{mapping, "mqtt.shared_subscription", "emqx.shared_subscription", [ {default, true}, {datatype, {enum, [true, false]}} ]}. @@ -876,7 +876,7 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> - {mqtt_retain_available, Val}; + {retain_available, Val}; ("flapping_threshold", Val) -> [Limit, Duration] = string:tokens(Val, ", "), FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of @@ -887,9 +887,9 @@ end}. end, {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> - {mqtt_wildcard_subscription, Val}; + {wildcard_subscription, Val}; ("shared_subscription", Val) -> - {mqtt_shared_subscription, Val}; + {shared_subscription, Val}; ("publish_limit", Val) -> [Limit, Duration] = string:tokens(Val, ", "), PubLimit = case cuttlefish_duration:parse(Duration, s) of diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 077e24154..c8da4e05d 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -22,7 +22,9 @@ -export([ check_pub/2 , check_sub/3 - , get_caps/1 + ]). + +-export([ get_caps/1 , get_caps/2 ]). @@ -49,6 +51,7 @@ ]). -define(SUBCAP_KEYS, [max_topic_levels, + max_qos_allowed, wildcard_subscription, shared_subscription ]). @@ -94,12 +97,13 @@ check_sub(Zone, Topic, SubOpts) -> (wildcard_subscription, Map) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> - Map#{is_shared => maps:is_key(share, SubOpts)} + Map#{is_shared => maps:is_key(share, SubOpts)}; + (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), do_check_sub(Flags, Caps). do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) - when Levels >= Limit -> + when Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; @@ -119,7 +123,7 @@ get_caps(Zone, subscribe) -> with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). pub_caps(Zone) -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)). + filter_caps(?PUBCAP_KEYS, get_caps(Zone)). sub_caps(Zone) -> filter_caps(?SUBCAP_KEYS, get_caps(Zone)). @@ -130,12 +134,7 @@ all_caps(Zone) -> end, ?DEFAULT_CAPS). filter_caps(Keys, Caps) -> - maps:filter(fun(Key, Val) -> - lists:member(Key, Keys) andalso cap_limited(Key, Val) - end, Caps). - -cap_limited(Key, Val) -> - Val =/= maps:get(Key, ?DEFAULT_CAPS). + maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). with_env(Zone, Key, InitFun) -> case emqx_zone:get_env(Zone, Key) of diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 218cf0fdc..15c3bf37c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -763,7 +763,7 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) -> do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, PState = #protocol{client = Client, session = Session}) -> - case check_subscribe(TopicFilter, PState) of + case check_subscribe(TopicFilter, SubOpts, PState) of ok -> TopicFilter1 = mount(Client, TopicFilter), SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState), case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of @@ -787,9 +787,9 @@ enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge SubOpts#{rap => Rap, nl => Nl}. %% Check Sub -check_subscribe(TopicFilter, PState) -> +check_subscribe(TopicFilter, SubOpts, PState) -> case check_sub_acl(TopicFilter, PState) of - allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState); + allow -> check_sub_caps(TopicFilter, SubOpts, PState); deny -> {error, ?RC_NOT_AUTHORIZED} end. @@ -802,8 +802,8 @@ check_sub_acl(TopicFilter, #protocol{client = Client}) -> end. %% Check Sub Caps -check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter). +check_sub_caps(TopicFilter, SubOpts, #protocol{client = #{zone := Zone}}) -> + emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Process unsubscribe request diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index c4721485b..bac32b849 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -30,6 +30,7 @@ -export([ get_env/2 , get_env/3 , set_env/3 + , unset_env/2 , force_reload/0 ]). @@ -81,7 +82,11 @@ get_env(Zone, Key, Def) -> -spec(set_env(zone(), atom(), term()) -> ok). set_env(Zone, Key, Val) -> - gen_server:cast(?SERVER, {set_env, Zone, Key, Val}). + persistent_term:put(?KEY(Zone, Key), Val). + +-spec(unset_env(zone(), atom()) -> boolean()). +unset_env(Zone, Key) -> + persistent_term:erase(?KEY(Zone, Key)). -spec(force_reload() -> ok). force_reload() -> @@ -107,10 +112,6 @@ handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({set_env, Zone, Key, Val}, State) -> - ok = persistent_term:put(?KEY(Zone, Key), Val), - {noreply, State}; - handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. @@ -130,6 +131,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- do_reload() -> - [ persistent_term:put(?KEY(Zone, Key), Val) - || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts ]. + [persistent_term:put(?KEY(Zone, Key), Val) + || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts]. diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl new file mode 100644 index 000000000..14b1e955f --- /dev/null +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt_caps_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +t_check_pub(_) -> + PubCaps = #{max_qos_allowed => ?QOS_1, + retain_available => false, + max_topic_alias => 4 + }, + ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), + ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, + retain => false, + topic_alias => 1 + }), + PubFlags1 = #{qos => ?QOS_2, retain => false}, + ?assertEqual({error, ?RC_QOS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_pub(zone, PubFlags1)), + PubFlags2 = #{qos => ?QOS_1, retain => true}, + ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, + emqx_mqtt_caps:check_pub(zone, PubFlags2)), + PubFlags3 = #{qos => ?QOS_1, retain => false, topic_alias => 5}, + ?assertEqual({error, ?RC_TOPIC_ALIAS_INVALID}, + emqx_mqtt_caps:check_pub(zone, PubFlags3)), + true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). + +t_check_sub(_) -> + SubOpts = #{rh => 0, + rap => 0, + nl => 0, + qos => ?QOS_2 + }, + SubCaps = #{max_topic_levels => 2, + max_qos_allowed => ?QOS_2, + shared_subscription => false, + wildcard_subscription => false + }, + ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), + ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), + ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, + emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), + ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), + ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, + emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), + true = emqx_zone:unset_env(zone, '$mqtt_sub_caps'). + +t_get_set_caps(_) -> + Caps = emqx_mqtt_caps:default(), + ?assertEqual(Caps, emqx_mqtt_caps:get_caps(zone)), + PubCaps = #{max_qos_allowed => ?QOS_2, + retain_available => true, + max_topic_alias => 0 + }, + ?assertEqual(PubCaps, emqx_mqtt_caps:get_caps(zone, publish)), + NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1, + retain_available => true + }, + emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps), + ?assertEqual(NewPubCaps, emqx_mqtt_caps:get_caps(zone, publish)), + + SubCaps = #{max_topic_levels => 0, + max_qos_allowed => ?QOS_2, + shared_subscription => true, + wildcard_subscription => true + }, + ?assertEqual(SubCaps, emqx_mqtt_caps:get_caps(zone, subscribe)), + true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). +