fix(config): make emqx_caps work with the new config struct

This commit is contained in:
Shawn 2021-07-13 16:42:32 +08:00
parent a0bddfc834
commit 01c4c655fb
1 changed files with 15 additions and 56 deletions

View File

@ -20,19 +20,13 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("types.hrl"). -include("types.hrl").
-export([ check_pub/2 -export([ check_pub/3
, check_sub/3 , check_sub/4
]). ]).
-export([ get_caps/1 -export([ get_caps/2
, get_caps/2
, get_caps/3
]). ]).
-export([default_caps/0]).
-export([default/0]).
-export_type([caps/0]). -export_type([caps/0]).
-type(caps() :: #{max_packet_size => integer(), -type(caps() :: #{max_packet_size => integer(),
@ -46,7 +40,7 @@
shared_subscription => boolean() shared_subscription => boolean()
}). }).
-define(UNLIMITED, 0). -define(MAX_TOPIC_LEVELS, 65535).
-define(PUBCAP_KEYS, [max_topic_levels, -define(PUBCAP_KEYS, [max_topic_levels,
max_qos_allowed, max_qos_allowed,
@ -62,7 +56,7 @@
-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, -define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
max_clientid_len => ?MAX_CLIENTID_LEN, max_clientid_len => ?MAX_CLIENTID_LEN,
max_topic_alias => ?MAX_TOPIC_AlIAS, max_topic_alias => ?MAX_TOPIC_AlIAS,
max_topic_levels => ?UNLIMITED, max_topic_levels => ?MAX_TOPIC_LEVELS,
max_qos_allowed => ?QOS_2, max_qos_allowed => ?QOS_2,
retain_available => true, retain_available => true,
wildcard_subscription => true, wildcard_subscription => true,
@ -70,18 +64,18 @@
shared_subscription => true shared_subscription => true
}). }).
-spec(check_pub(emqx_types:zone(), -spec(check_pub(emqx_types:zone(), atom(),
#{qos := emqx_types:qos(), #{qos := emqx_types:qos(),
retain := boolean(), retain := boolean(),
topic := emqx_topic:topic()}) topic := emqx_topic:topic()})
-> ok_or_error(emqx_types:reason_code())). -> ok_or_error(emqx_types:reason_code())).
check_pub(Zone, Flags) when is_map(Flags) -> check_pub(Zone, Listener, Flags) when is_map(Flags) ->
do_check_pub(case maps:take(topic, Flags) of do_check_pub(case maps:take(topic, Flags) of
{Topic, Flags1} -> {Topic, Flags1} ->
Flags1#{topic_levels => emqx_topic:levels(Topic)}; Flags1#{topic_levels => emqx_topic:levels(Topic)};
error -> error ->
Flags Flags
end, get_caps(Zone, publish)). end, maps:with(?PUBCAP_KEYS, get_caps(Zone, Listener))).
do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) do_check_pub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
when Limit > 0, Levels > Limit -> when Limit > 0, Levels > Limit ->
@ -93,12 +87,12 @@ do_check_pub(#{retain := true}, #{retain_available := false}) ->
{error, ?RC_RETAIN_NOT_SUPPORTED}; {error, ?RC_RETAIN_NOT_SUPPORTED};
do_check_pub(_Flags, _Caps) -> ok. do_check_pub(_Flags, _Caps) -> ok.
-spec(check_sub(emqx_types:zone(), -spec(check_sub(emqx_types:zone(), atom(),
emqx_types:topic(), emqx_types:topic(),
emqx_types:subopts()) emqx_types:subopts())
-> ok_or_error(emqx_types:reason_code())). -> ok_or_error(emqx_types:reason_code())).
check_sub(Zone, Topic, SubOpts) -> check_sub(Zone, Listener, Topic, SubOpts) ->
Caps = get_caps(Zone, subscribe), Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone, Listener)),
Flags = lists:foldl( Flags = lists:foldl(
fun(max_topic_levels, Map) -> fun(max_topic_levels, Map) ->
Map#{topic_levels => emqx_topic:levels(Topic)}; Map#{topic_levels => emqx_topic:levels(Topic)};
@ -119,42 +113,7 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(_Flags, _Caps) -> ok. do_check_sub(_Flags, _Caps) -> ok.
default_caps() -> get_caps(Zone, Listener) ->
?DEFAULT_CAPS. lists:foldl(fun({K, V}, Acc) ->
Acc#{K => emqx_config:get_listener_conf(Zone, Listener, [mqtt, K], V)}
get_caps(Zone, Cap, Def) -> end, #{}, maps:to_list(?DEFAULT_CAPS)).
emqx_zone:get_env(Zone, Cap, Def).
get_caps(Zone, publish) ->
with_env(Zone, '$mqtt_pub_caps',
fun() ->
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
end);
get_caps(Zone, subscribe) ->
with_env(Zone, '$mqtt_sub_caps',
fun() ->
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
end).
get_caps(Zone) ->
with_env(Zone, '$mqtt_caps',
fun() ->
maps:map(fun(Cap, Def) ->
emqx_zone:get_env(Zone, Cap, Def)
end, ?DEFAULT_CAPS)
end).
filter_caps(Keys, Caps) ->
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
-spec(default() -> caps()).
default() -> ?DEFAULT_CAPS.
with_env(Zone, Key, InitFun) ->
case emqx_zone:get_env(Zone, Key) of
undefined -> Caps = InitFun(),
ok = emqx_zone:set_env(Zone, Key, Caps),
Caps;
ZoneCaps -> ZoneCaps
end.