Merge pull request #2766 from emqx/improve-mqtt-caps-module
Improve mqtt caps module
This commit is contained in:
commit
6513a32d37
|
@ -634,19 +634,19 @@ end}.
|
|||
]}.
|
||||
|
||||
%% @doc Whether the server supports MQTT retained messages.
|
||||
{mapping, "mqtt.retain_available", "emqx.mqtt_retain_available", [
|
||||
{mapping, "mqtt.retain_available", "emqx.retain_available", [
|
||||
{default, true},
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
%% @doc Whether the Server supports MQTT Wildcard Subscriptions.
|
||||
{mapping, "mqtt.wildcard_subscription", "emqx.mqtt_wildcard_subscription", [
|
||||
{mapping, "mqtt.wildcard_subscription", "emqx.wildcard_subscription", [
|
||||
{default, true},
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
%% @doc Whether the Server supports MQTT Shared Subscriptions.
|
||||
{mapping, "mqtt.shared_subscription", "emqx.mqtt_shared_subscription", [
|
||||
{mapping, "mqtt.shared_subscription", "emqx.shared_subscription", [
|
||||
{default, true},
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
@ -876,7 +876,7 @@ end}.
|
|||
|
||||
{translation, "emqx.zones", fun(Conf) ->
|
||||
Mapping = fun("retain_available", Val) ->
|
||||
{mqtt_retain_available, Val};
|
||||
{retain_available, Val};
|
||||
("flapping_threshold", Val) ->
|
||||
[Limit, Duration] = string:tokens(Val, ", "),
|
||||
FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of
|
||||
|
@ -887,9 +887,9 @@ end}.
|
|||
end,
|
||||
{flapping_threshold, FlappingThreshold};
|
||||
("wildcard_subscription", Val) ->
|
||||
{mqtt_wildcard_subscription, Val};
|
||||
{wildcard_subscription, Val};
|
||||
("shared_subscription", Val) ->
|
||||
{mqtt_shared_subscription, Val};
|
||||
{shared_subscription, Val};
|
||||
("publish_limit", Val) ->
|
||||
[Limit, Duration] = string:tokens(Val, ", "),
|
||||
PubLimit = case cuttlefish_duration:parse(Duration, s) of
|
||||
|
|
|
@ -17,147 +17,134 @@
|
|||
%% @doc MQTTv5 Capabilities
|
||||
-module(emqx_mqtt_caps).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("types.hrl").
|
||||
|
||||
-export([ check_pub/2
|
||||
, check_sub/2
|
||||
, get_caps/1
|
||||
, check_sub/3
|
||||
]).
|
||||
|
||||
-export([ get_caps/1
|
||||
, get_caps/2
|
||||
]).
|
||||
|
||||
-export([default_caps/0]).
|
||||
-export([default/0]).
|
||||
|
||||
-export_type([caps/0]).
|
||||
|
||||
-type(caps() :: #{max_packet_size => integer(),
|
||||
-type(caps() :: #{max_packet_size => integer(),
|
||||
max_clientid_len => integer(),
|
||||
max_topic_alias => integer(),
|
||||
max_topic_alias => integer(),
|
||||
max_topic_levels => integer(),
|
||||
max_qos_allowed => emqx_types:qos(),
|
||||
mqtt_retain_available => boolean(),
|
||||
mqtt_shared_subscription => boolean(),
|
||||
mqtt_wildcard_subscription => boolean()
|
||||
max_qos_allowed => emqx_types:qos(),
|
||||
retain_available => boolean(),
|
||||
wildcard_subscription => boolean(),
|
||||
subscription_identifiers => boolean(),
|
||||
shared_subscription => boolean()
|
||||
}).
|
||||
|
||||
-define(UNLIMITED, 0).
|
||||
|
||||
-define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE},
|
||||
{max_clientid_len, ?MAX_CLIENTID_LEN},
|
||||
{max_topic_alias, ?UNLIMITED},
|
||||
{max_topic_levels, ?UNLIMITED},
|
||||
{max_qos_allowed, ?QOS_2},
|
||||
{mqtt_retain_available, true},
|
||||
{mqtt_shared_subscription, true},
|
||||
{mqtt_wildcard_subscription, true}
|
||||
]).
|
||||
|
||||
-define(PUBCAP_KEYS, [max_qos_allowed,
|
||||
mqtt_retain_available,
|
||||
max_topic_alias
|
||||
-define(PUBCAP_KEYS, [max_topic_alias,
|
||||
max_qos_allowed,
|
||||
retain_available
|
||||
]).
|
||||
|
||||
-define(SUBCAP_KEYS, [max_qos_allowed,
|
||||
max_topic_levels,
|
||||
mqtt_shared_subscription,
|
||||
mqtt_wildcard_subscription
|
||||
-define(SUBCAP_KEYS, [max_topic_levels,
|
||||
max_qos_allowed,
|
||||
wildcard_subscription,
|
||||
shared_subscription
|
||||
]).
|
||||
|
||||
-spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_types:reason_code()}).
|
||||
check_pub(Zone, Props) when is_map(Props) ->
|
||||
do_check_pub(Props, maps:to_list(get_caps(Zone, publish))).
|
||||
-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||
max_clientid_len => ?MAX_CLIENTID_LEN,
|
||||
max_topic_alias => ?UNLIMITED,
|
||||
max_topic_levels => ?UNLIMITED,
|
||||
max_qos_allowed => ?QOS_2,
|
||||
retain_available => true,
|
||||
wildcard_subscription => true,
|
||||
subscription_identifiers => true,
|
||||
shared_subscription => true
|
||||
}).
|
||||
|
||||
do_check_pub(_Props, []) ->
|
||||
ok;
|
||||
do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
|
||||
case QoS > MaxQoS of
|
||||
true -> {error, ?RC_QOS_NOT_SUPPORTED};
|
||||
false -> do_check_pub(Props, Caps)
|
||||
end;
|
||||
do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) ->
|
||||
case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of
|
||||
false -> {error, ?RC_TOPIC_ALIAS_INVALID};
|
||||
true -> do_check_pub(Props, Caps)
|
||||
end;
|
||||
do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) ->
|
||||
-spec(check_pub(emqx_types:zone(),
|
||||
#{qos => emqx_types:qos(),
|
||||
retain => boolean()})
|
||||
-> ok_or_error(emqx_types:reason_code())).
|
||||
check_pub(Zone, Flags) when is_map(Flags) ->
|
||||
do_check_pub(Flags, get_caps(Zone, publish)).
|
||||
|
||||
do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS})
|
||||
when QoS > MaxQoS ->
|
||||
{error, ?RC_QOS_NOT_SUPPORTED};
|
||||
do_check_pub(#{retain := true}, #{retain_available := false}) ->
|
||||
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||
do_check_pub(Props, [{max_topic_alias, _} | Caps]) ->
|
||||
do_check_pub(Props, Caps);
|
||||
do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) ->
|
||||
do_check_pub(Props, Caps).
|
||||
do_check_pub(#{topic_alias := TopicAlias},
|
||||
#{max_topic_alias := MaxTopicAlias})
|
||||
when 0 == TopicAlias; TopicAlias >= MaxTopicAlias ->
|
||||
{error, ?RC_TOPIC_ALIAS_INVALID};
|
||||
do_check_pub(_Flags, _Caps) -> ok.
|
||||
|
||||
-spec(check_sub(emqx_types:zone(), emqx_types:topic_filters())
|
||||
-> {ok | error, emqx_types:topic_filters()}).
|
||||
check_sub(Zone, TopicFilters) ->
|
||||
Caps = maps:to_list(get_caps(Zone, subscribe)),
|
||||
lists:foldr(fun({Topic, Opts}, {Ok, Result}) ->
|
||||
case check_sub(Topic, Opts, Caps) of
|
||||
{ok, Opts1} ->
|
||||
{Ok, [{Topic, Opts1}|Result]};
|
||||
{error, Opts1} ->
|
||||
{error, [{Topic, Opts1}|Result]}
|
||||
end
|
||||
end, {ok, []}, TopicFilters).
|
||||
-spec(check_sub(emqx_types:zone(),
|
||||
emqx_types:topic(),
|
||||
emqx_types:subopts())
|
||||
-> ok_or_error(emqx_types:reason_code())).
|
||||
check_sub(Zone, Topic, SubOpts) ->
|
||||
Caps = get_caps(Zone, subscribe),
|
||||
Flags = lists:foldl(
|
||||
fun(max_topic_levels, Map) ->
|
||||
Map#{topic_levels => emqx_topic:levels(Topic)};
|
||||
(wildcard_subscription, Map) ->
|
||||
Map#{is_wildcard => emqx_topic:wildcard(Topic)};
|
||||
(shared_subscription, Map) ->
|
||||
Map#{is_shared => maps:is_key(share, SubOpts)};
|
||||
(_Key, Map) -> Map %% Ignore
|
||||
end, #{}, maps:keys(Caps)),
|
||||
do_check_sub(Flags, Caps).
|
||||
|
||||
check_sub(_Topic, Opts, []) ->
|
||||
{ok, Opts};
|
||||
check_sub(Topic, Opts = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
|
||||
check_sub(Topic, Opts#{qos := min(QoS, MaxQoS)}, Caps);
|
||||
check_sub(Topic, Opts, [{mqtt_shared_subscription, true}|Caps]) ->
|
||||
check_sub(Topic, Opts, Caps);
|
||||
check_sub(Topic, Opts, [{mqtt_shared_subscription, false}|Caps]) ->
|
||||
case maps:is_key(share, Opts) of
|
||||
true ->
|
||||
{error, Opts#{rc := ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}};
|
||||
false -> check_sub(Topic, Opts, Caps)
|
||||
end;
|
||||
check_sub(Topic, Opts, [{mqtt_wildcard_subscription, true}|Caps]) ->
|
||||
check_sub(Topic, Opts, Caps);
|
||||
check_sub(Topic, Opts, [{mqtt_wildcard_subscription, false}|Caps]) ->
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true ->
|
||||
{error, Opts#{rc := ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}};
|
||||
false -> check_sub(Topic, Opts, Caps)
|
||||
end;
|
||||
check_sub(Topic, Opts, [{max_topic_levels, ?UNLIMITED}|Caps]) ->
|
||||
check_sub(Topic, Opts, Caps);
|
||||
check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) ->
|
||||
case emqx_topic:levels(Topic) of
|
||||
Levels when Levels > Limit ->
|
||||
{error, Opts#{rc := ?RC_TOPIC_FILTER_INVALID}};
|
||||
_ -> check_sub(Topic, Opts, Caps)
|
||||
end.
|
||||
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
|
||||
when Levels > Limit ->
|
||||
{error, ?RC_TOPIC_FILTER_INVALID};
|
||||
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
|
||||
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
|
||||
do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
|
||||
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
|
||||
do_check_sub(_Flags, _Caps) -> ok.
|
||||
|
||||
default_caps() ->
|
||||
?DEFAULT_CAPS.
|
||||
-spec(get_caps(emqx_zone:zone()) -> caps()).
|
||||
get_caps(Zone) ->
|
||||
with_env(Zone, '$mqtt_caps', fun all_caps/1).
|
||||
|
||||
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
|
||||
get_caps(Zone, publish) ->
|
||||
with_env(Zone, '$mqtt_pub_caps',
|
||||
fun() ->
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
|
||||
end);
|
||||
with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1);
|
||||
|
||||
get_caps(Zone, subscribe) ->
|
||||
with_env(Zone, '$mqtt_sub_caps',
|
||||
fun() ->
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
|
||||
end).
|
||||
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
|
||||
|
||||
get_caps(Zone) ->
|
||||
with_env(Zone, '$mqtt_caps',
|
||||
fun() ->
|
||||
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
|
||||
|| {Cap, Def} <- ?DEFAULT_CAPS])
|
||||
end).
|
||||
pub_caps(Zone) ->
|
||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
sub_caps(Zone) ->
|
||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||
|
||||
all_caps(Zone) ->
|
||||
maps:map(fun(Cap, Def) ->
|
||||
emqx_zone:get_env(Zone, Cap, Def)
|
||||
end, ?DEFAULT_CAPS).
|
||||
|
||||
filter_caps(Keys, Caps) ->
|
||||
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
||||
|
||||
with_env(Zone, Key, InitFun) ->
|
||||
case emqx_zone:get_env(Zone, Key) of
|
||||
undefined -> Caps = InitFun(),
|
||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||
Caps;
|
||||
ZoneCaps -> ZoneCaps
|
||||
undefined ->
|
||||
Caps = InitFun(Zone),
|
||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||
Caps;
|
||||
Caps -> Caps
|
||||
end.
|
||||
|
||||
-spec(default() -> caps()).
|
||||
default() -> ?DEFAULT_CAPS.
|
||||
|
||||
|
|
|
@ -314,10 +314,10 @@ handle_out({connack, ?RC_SUCCESS, SP},
|
|||
ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(PState)]),
|
||||
#{max_packet_size := MaxPktSize,
|
||||
max_qos_allowed := MaxQoS,
|
||||
mqtt_retain_available := Retain,
|
||||
retain_available := Retain,
|
||||
max_topic_alias := MaxAlias,
|
||||
mqtt_shared_subscription := Shared,
|
||||
mqtt_wildcard_subscription := Wildcard
|
||||
shared_subscription := Shared,
|
||||
wildcard_subscription := Wildcard
|
||||
} = caps(PState),
|
||||
%% Response-Information is so far not set by broker.
|
||||
%% i.e. It's a Client-to-Client contract for the request-response topic naming scheme.
|
||||
|
@ -763,7 +763,7 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, PState) ->
|
|||
|
||||
do_subscribe(TopicFilter, SubOpts = #{qos := QoS},
|
||||
PState = #protocol{client = Client, session = Session}) ->
|
||||
case check_subscribe(TopicFilter, PState) of
|
||||
case check_subscribe(TopicFilter, SubOpts, PState) of
|
||||
ok -> TopicFilter1 = mount(Client, TopicFilter),
|
||||
SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), PState),
|
||||
case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of
|
||||
|
@ -787,9 +787,9 @@ enrich_subopts(SubOpts, #protocol{client = #{zone := Zone, is_bridge := IsBridge
|
|||
SubOpts#{rap => Rap, nl => Nl}.
|
||||
|
||||
%% Check Sub
|
||||
check_subscribe(TopicFilter, PState) ->
|
||||
check_subscribe(TopicFilter, SubOpts, PState) ->
|
||||
case check_sub_acl(TopicFilter, PState) of
|
||||
allow -> ok; %%TODO: check_sub_caps(TopicFilter, PState);
|
||||
allow -> check_sub_caps(TopicFilter, SubOpts, PState);
|
||||
deny -> {error, ?RC_NOT_AUTHORIZED}
|
||||
end.
|
||||
|
||||
|
@ -802,8 +802,8 @@ check_sub_acl(TopicFilter, #protocol{client = Client}) ->
|
|||
end.
|
||||
|
||||
%% Check Sub Caps
|
||||
check_sub_caps(TopicFilter, #protocol{client = #{zone := Zone}}) ->
|
||||
emqx_mqtt_caps:check_sub(Zone, TopicFilter).
|
||||
check_sub_caps(TopicFilter, SubOpts, #protocol{client = #{zone := Zone}}) ->
|
||||
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Process unsubscribe request
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
-export([ get_env/2
|
||||
, get_env/3
|
||||
, set_env/3
|
||||
, unset_env/2
|
||||
, force_reload/0
|
||||
]).
|
||||
|
||||
|
@ -81,7 +82,11 @@ get_env(Zone, Key, Def) ->
|
|||
|
||||
-spec(set_env(zone(), atom(), term()) -> ok).
|
||||
set_env(Zone, Key, Val) ->
|
||||
gen_server:cast(?SERVER, {set_env, Zone, Key, Val}).
|
||||
persistent_term:put(?KEY(Zone, Key), Val).
|
||||
|
||||
-spec(unset_env(zone(), atom()) -> boolean()).
|
||||
unset_env(Zone, Key) ->
|
||||
persistent_term:erase(?KEY(Zone, Key)).
|
||||
|
||||
-spec(force_reload() -> ok).
|
||||
force_reload() ->
|
||||
|
@ -107,10 +112,6 @@ handle_call(Req, _From, State) ->
|
|||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({set_env, Zone, Key, Val}, State) ->
|
||||
ok = persistent_term:put(?KEY(Zone, Key), Val),
|
||||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
@ -130,6 +131,6 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
do_reload() ->
|
||||
[ persistent_term:put(?KEY(Zone, Key), Val)
|
||||
|| {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts ].
|
||||
[persistent_term:put(?KEY(Zone, Key), Val)
|
||||
|| {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts].
|
||||
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_mqtt_caps_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
|
||||
t_check_pub(_) ->
|
||||
PubCaps = #{max_qos_allowed => ?QOS_1,
|
||||
retain_available => false,
|
||||
max_topic_alias => 4
|
||||
},
|
||||
ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
|
||||
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
|
||||
retain => false,
|
||||
topic_alias => 1
|
||||
}),
|
||||
PubFlags1 = #{qos => ?QOS_2, retain => false},
|
||||
?assertEqual({error, ?RC_QOS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_pub(zone, PubFlags1)),
|
||||
PubFlags2 = #{qos => ?QOS_1, retain => true},
|
||||
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_pub(zone, PubFlags2)),
|
||||
PubFlags3 = #{qos => ?QOS_1, retain => false, topic_alias => 5},
|
||||
?assertEqual({error, ?RC_TOPIC_ALIAS_INVALID},
|
||||
emqx_mqtt_caps:check_pub(zone, PubFlags3)),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').
|
||||
|
||||
t_check_sub(_) ->
|
||||
SubOpts = #{rh => 0,
|
||||
rap => 0,
|
||||
nl => 0,
|
||||
qos => ?QOS_2
|
||||
},
|
||||
SubCaps = #{max_topic_levels => 2,
|
||||
max_qos_allowed => ?QOS_2,
|
||||
shared_subscription => false,
|
||||
wildcard_subscription => false
|
||||
},
|
||||
ok = emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
|
||||
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
|
||||
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
|
||||
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
|
||||
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_sub_caps').
|
||||
|
||||
t_get_set_caps(_) ->
|
||||
Caps = emqx_mqtt_caps:default(),
|
||||
?assertEqual(Caps, emqx_mqtt_caps:get_caps(zone)),
|
||||
PubCaps = #{max_qos_allowed => ?QOS_2,
|
||||
retain_available => true,
|
||||
max_topic_alias => 0
|
||||
},
|
||||
?assertEqual(PubCaps, emqx_mqtt_caps:get_caps(zone, publish)),
|
||||
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1,
|
||||
retain_available => true
|
||||
},
|
||||
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps),
|
||||
?assertEqual(NewPubCaps, emqx_mqtt_caps:get_caps(zone, publish)),
|
||||
|
||||
SubCaps = #{max_topic_levels => 0,
|
||||
max_qos_allowed => ?QOS_2,
|
||||
shared_subscription => true,
|
||||
wildcard_subscription => true
|
||||
},
|
||||
?assertEqual(SubCaps, emqx_mqtt_caps:get_caps(zone, subscribe)),
|
||||
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').
|
||||
|
Loading…
Reference in New Issue