Rewrite the emqx_mqtt_caps module and add test cases

- Rename `mqtt_retain_available` to `retain_available`
- Rename `mqtt_wildcard_subscription` to `wildcard_subscription`
- Rename `mqtt_shared_subscription` to `shared_subscription`
- Add `emqx_zone:unset_env/2` API for unit test
This commit is contained in:
Feng Lee 2019-08-08 14:11:26 +08:00
parent bfd027cf8b
commit bd061415c9
5 changed files with 118 additions and 28 deletions

View File

@ -634,19 +634,19 @@ end}.
]}. ]}.
%% @doc Whether the server supports MQTT retained messages. %% @doc Whether the server supports MQTT retained messages.
{mapping, "mqtt.retain_available", "emqx.mqtt_retain_available", [ {mapping, "mqtt.retain_available", "emqx.retain_available", [
{default, true}, {default, true},
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
%% @doc Whether the Server supports MQTT Wildcard Subscriptions. %% @doc Whether the Server supports MQTT Wildcard Subscriptions.
{mapping, "mqtt.wildcard_subscription", "emqx.mqtt_wildcard_subscription", [ {mapping, "mqtt.wildcard_subscription", "emqx.wildcard_subscription", [
{default, true}, {default, true},
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
%% @doc Whether the Server supports MQTT Shared Subscriptions. %% @doc Whether the Server supports MQTT Shared Subscriptions.
{mapping, "mqtt.shared_subscription", "emqx.mqtt_shared_subscription", [ {mapping, "mqtt.shared_subscription", "emqx.shared_subscription", [
{default, true}, {default, true},
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
@ -876,7 +876,7 @@ end}.
{translation, "emqx.zones", fun(Conf) -> {translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) -> Mapping = fun("retain_available", Val) ->
{mqtt_retain_available, Val}; {retain_available, Val};
("flapping_threshold", Val) -> ("flapping_threshold", Val) ->
[Limit, Duration] = string:tokens(Val, ", "), [Limit, Duration] = string:tokens(Val, ", "),
FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of
@ -887,9 +887,9 @@ end}.
end, end,
{flapping_threshold, FlappingThreshold}; {flapping_threshold, FlappingThreshold};
("wildcard_subscription", Val) -> ("wildcard_subscription", Val) ->
{mqtt_wildcard_subscription, Val}; {wildcard_subscription, Val};
("shared_subscription", Val) -> ("shared_subscription", Val) ->
{mqtt_shared_subscription, Val}; {shared_subscription, Val};
("publish_limit", Val) -> ("publish_limit", Val) ->
[Limit, Duration] = string:tokens(Val, ", "), [Limit, Duration] = string:tokens(Val, ", "),
PubLimit = case cuttlefish_duration:parse(Duration, s) of PubLimit = case cuttlefish_duration:parse(Duration, s) of

View File

@ -22,7 +22,9 @@
-export([ check_pub/2 -export([ check_pub/2
, check_sub/3 , check_sub/3
, get_caps/1 ]).
-export([ get_caps/1
, get_caps/2 , get_caps/2
]). ]).
@ -49,6 +51,7 @@
]). ]).
-define(SUBCAP_KEYS, [max_topic_levels, -define(SUBCAP_KEYS, [max_topic_levels,
max_qos_allowed,
wildcard_subscription, wildcard_subscription,
shared_subscription shared_subscription
]). ]).
@ -94,12 +97,13 @@ check_sub(Zone, Topic, SubOpts) ->
(wildcard_subscription, Map) -> (wildcard_subscription, Map) ->
Map#{is_wildcard => emqx_topic:wildcard(Topic)}; Map#{is_wildcard => emqx_topic:wildcard(Topic)};
(shared_subscription, Map) -> (shared_subscription, Map) ->
Map#{is_shared => maps:is_key(share, SubOpts)} Map#{is_shared => maps:is_key(share, SubOpts)};
(_Key, Map) -> Map %% Ignore
end, #{}, maps:keys(Caps)), end, #{}, maps:keys(Caps)),
do_check_sub(Flags, Caps). do_check_sub(Flags, Caps).
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
when Levels >= Limit -> when Levels > Limit ->
{error, ?RC_TOPIC_FILTER_INVALID}; {error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
@ -119,7 +123,7 @@ get_caps(Zone, subscribe) ->
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
pub_caps(Zone) -> pub_caps(Zone) ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone)). filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
sub_caps(Zone) -> sub_caps(Zone) ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone)). filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
@ -130,12 +134,7 @@ all_caps(Zone) ->
end, ?DEFAULT_CAPS). end, ?DEFAULT_CAPS).
filter_caps(Keys, Caps) -> filter_caps(Keys, Caps) ->
maps:filter(fun(Key, Val) -> maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
lists:member(Key, Keys) andalso cap_limited(Key, Val)
end, Caps).
cap_limited(Key, Val) ->
Val =/= maps:get(Key, ?DEFAULT_CAPS).
with_env(Zone, Key, InitFun) -> with_env(Zone, Key, InitFun) ->
case emqx_zone:get_env(Zone, Key) of case emqx_zone:get_env(Zone, Key) of

View File

@ -763,7 +763,7 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
PState = #protocol{client = Client, session = Session}) -> PState = #protocol{client = Client, session = Session}) ->
case check_subscribe(TopicFilter, PState) of case check_subscribe(TopicFilter, SubOpts, PState) of
ok -> TopicFilter1 = mount(Client, TopicFilter), ok -> TopicFilter1 = mount(Client, TopicFilter),
SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState), SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState),
case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
@ -787,9 +787,9 @@ enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge
SubOpts#{rap => Rap, nl => Nl}. SubOpts#{rap => Rap, nl => Nl}.
%% Check Sub %% Check Sub
check_subscribe(TopicFilter, PState) -> check_subscribe(TopicFilter, SubOpts, PState) ->
case check_sub_acl(TopicFilter, PState) of case check_sub_acl(TopicFilter, PState) of
allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState); allow -> check_sub_caps(TopicFilter, SubOpts, PState);
deny -> {error, ?RC_NOT_AUTHORIZED} deny -> {error, ?RC_NOT_AUTHORIZED}
end. end.
@ -802,8 +802,8 @@ check_sub_acl(TopicFilter, #protocol{client = Client}) ->
end. end.
%% Check Sub Caps %% Check Sub Caps
check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) -> check_sub_caps(TopicFilter, SubOpts, #protocol{client = #{zone := Zone}}) ->
emqx_mqtt_caps:check_sub(Zone, TopicFilter). emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process unsubscribe request %% Process unsubscribe request

View File

@ -30,6 +30,7 @@
-export([ get_env/2 -export([ get_env/2
, get_env/3 , get_env/3
, set_env/3 , set_env/3
, unset_env/2
, force_reload/0 , force_reload/0
]). ]).
@ -81,7 +82,11 @@ get_env(Zone, Key, Def) ->
-spec(set_env(zone(), atom(), term()) -> ok). -spec(set_env(zone(), atom(), term()) -> ok).
set_env(Zone, Key, Val) -> set_env(Zone, Key, Val) ->
gen_server:cast(?SERVER, {set_env, Zone, Key, Val}). persistent_term:put(?KEY(Zone, Key), Val).
-spec(unset_env(zone(), atom()) -> boolean()).
unset_env(Zone, Key) ->
persistent_term:erase(?KEY(Zone, Key)).
-spec(force_reload() -> ok). -spec(force_reload() -> ok).
force_reload() -> force_reload() ->
@ -107,10 +112,6 @@ handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]), ?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
handle_cast({set_env, Zone, Key, Val}, State) ->
ok = persistent_term:put(?KEY(Zone, Key), Val),
{noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]), ?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
@ -130,6 +131,6 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
do_reload() -> do_reload() ->
[ persistent_term:put(?KEY(Zone, Key), Val) [persistent_term:put(?KEY(Zone, Key), Val)
|| {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts ]. || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts].

View File

@ -0,0 +1,90 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt_caps_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
t_check_pub(_) ->
PubCaps = #{max_qos_allowed => ?QOS_1,
retain_available => false,
max_topic_alias => 4
},
ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
retain => false,
topic_alias => 1
}),
PubFlags1 = #{qos => ?QOS_2, retain => false},
?assertEqual({error, ?RC_QOS_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(zone, PubFlags1)),
PubFlags2 = #{qos => ?QOS_1, retain => true},
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(zone, PubFlags2)),
PubFlags3 = #{qos => ?QOS_1, retain => false, topic_alias => 5},
?assertEqual({error, ?RC_TOPIC_ALIAS_INVALID},
emqx_mqtt_caps:check_pub(zone, PubFlags3)),
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').
t_check_sub(_) ->
SubOpts = #{rh => 0,
rap => 0,
nl => 0,
qos => ?QOS_2
},
SubCaps = #{max_topic_levels => 2,
max_qos_allowed => ?QOS_2,
shared_subscription => false,
wildcard_subscription => false
},
ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
true = emqx_zone:unset_env(zone, '$mqtt_sub_caps').
t_get_set_caps(_) ->
Caps = emqx_mqtt_caps:default(),
?assertEqual(Caps, emqx_mqtt_caps:get_caps(zone)),
PubCaps = #{max_qos_allowed => ?QOS_2,
retain_available => true,
max_topic_alias => 0
},
?assertEqual(PubCaps, emqx_mqtt_caps:get_caps(zone, publish)),
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1,
retain_available => true
},
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps),
?assertEqual(NewPubCaps, emqx_mqtt_caps:get_caps(zone, publish)),
SubCaps = #{max_topic_levels => 0,
max_qos_allowed => ?QOS_2,
shared_subscription => true,
wildcard_subscription => true
},
?assertEqual(SubCaps, emqx_mqtt_caps:get_caps(zone, subscribe)),
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').