From bfd027cf8b5cb46ad4500c151c07e237eba053f7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 7 Aug 2019 14:01:50 +0800 Subject: [PATCH] Rewrite the mqtt_caps module --- src/emqx_mqtt_caps.erl | 202 +++++++++++++++++++---------------------- 1 file changed, 95 insertions(+), 107 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 5e0d2a3fc..077e24154 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -17,147 +17,135 @@ %% @doc MQTTv5 Capabilities -module(emqx_mqtt_caps). --include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-include("types.hrl"). -export([ check_pub/2 - , check_sub/2 + , check_sub/3 , get_caps/1 , get_caps/2 ]). --export([default_caps/0]). +-export([default/0]). -export_type([caps/0]). --type(caps() :: #{max_packet_size => integer(), +-type(caps() :: #{max_packet_size => integer(), max_clientid_len => integer(), - max_topic_alias => integer(), + max_topic_alias => integer(), max_topic_levels => integer(), - max_qos_allowed => emqx_types:qos(), - mqtt_retain_available => boolean(), - mqtt_shared_subscription => boolean(), - mqtt_wildcard_subscription => boolean() + max_qos_allowed => emqx_types:qos(), + retain_available => boolean(), + wildcard_subscription => boolean(), + subscription_identifiers => boolean(), + shared_subscription => boolean() }). -define(UNLIMITED, 0). --define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE}, - {max_clientid_len, ?MAX_CLIENTID_LEN}, - {max_topic_alias, ?UNLIMITED}, - {max_topic_levels, ?UNLIMITED}, - {max_qos_allowed, ?QOS_2}, - {mqtt_retain_available, true}, - {mqtt_shared_subscription, true}, - {mqtt_wildcard_subscription, true} - ]). - --define(PUBCAP_KEYS, [max_qos_allowed, - mqtt_retain_available, - max_topic_alias +-define(PUBCAP_KEYS, [max_topic_alias, + max_qos_allowed, + retain_available ]). --define(SUBCAP_KEYS, [max_qos_allowed, - max_topic_levels, - mqtt_shared_subscription, - mqtt_wildcard_subscription +-define(SUBCAP_KEYS, [max_topic_levels, + wildcard_subscription, + shared_subscription ]). --spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_types:reason_code()}). -check_pub(Zone, Props) when is_map(Props) -> - do_check_pub(Props, maps:to_list(get_caps(Zone, publish))). +-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, + max_clientid_len => ?MAX_CLIENTID_LEN, + max_topic_alias => ?UNLIMITED, + max_topic_levels => ?UNLIMITED, + max_qos_allowed => ?QOS_2, + retain_available => true, + wildcard_subscription => true, + subscription_identifiers => true, + shared_subscription => true + }). -do_check_pub(_Props, []) -> - ok; -do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) -> - case QoS > MaxQoS of - true -> {error, ?RC_QOS_NOT_SUPPORTED}; - false -> do_check_pub(Props, Caps) - end; -do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) -> - case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of - false -> {error, ?RC_TOPIC_ALIAS_INVALID}; - true -> do_check_pub(Props, Caps) - end; -do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> +-spec(check_pub(emqx_types:zone(), + #{qos => emqx_types:qos(), + retain => boolean()}) + -> ok_or_error(emqx_types:reason_code())). +check_pub(Zone, Flags) when is_map(Flags) -> + do_check_pub(Flags, get_caps(Zone, publish)). + +do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS}) + when QoS > MaxQoS -> + {error, ?RC_QOS_NOT_SUPPORTED}; +do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; -do_check_pub(Props, [{max_topic_alias, _} | Caps]) -> - do_check_pub(Props, Caps); -do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) -> - do_check_pub(Props, Caps). +do_check_pub(#{topic_alias := TopicAlias}, + #{max_topic_alias := MaxTopicAlias}) + when 0 == TopicAlias; TopicAlias >= MaxTopicAlias -> + {error, ?RC_TOPIC_ALIAS_INVALID}; +do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), emqx_types:topic_filters()) - -> {ok | error, emqx_types:topic_filters()}). -check_sub(Zone, TopicFilters) -> - Caps = maps:to_list(get_caps(Zone, subscribe)), - lists:foldr(fun({Topic, Opts}, {Ok, Result}) -> - case check_sub(Topic, Opts, Caps) of - {ok, Opts1} -> - {Ok, [{Topic, Opts1}|Result]}; - {error, Opts1} -> - {error, [{Topic, Opts1}|Result]} - end - end, {ok, []}, TopicFilters). +-spec(check_sub(emqx_types:zone(), + emqx_types:topic(), + emqx_types:subopts()) + -> ok_or_error(emqx_types:reason_code())). +check_sub(Zone, Topic, SubOpts) -> + Caps = get_caps(Zone, subscribe), + Flags = lists:foldl( + fun(max_topic_levels, Map) -> + Map#{topic_levels => emqx_topic:levels(Topic)}; + (wildcard_subscription, Map) -> + Map#{is_wildcard => emqx_topic:wildcard(Topic)}; + (shared_subscription, Map) -> + Map#{is_shared => maps:is_key(share, SubOpts)} + end, #{}, maps:keys(Caps)), + do_check_sub(Flags, Caps). -check_sub(_Topic, Opts, []) -> - {ok, Opts}; -check_sub(Topic, Opts = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) -> - check_sub(Topic, Opts#{qos := min(QoS, MaxQoS)}, Caps); -check_sub(Topic, Opts, [{mqtt_shared_subscription, true}|Caps]) -> - check_sub(Topic, Opts, Caps); -check_sub(Topic, Opts, [{mqtt_shared_subscription, false}|Caps]) -> - case maps:is_key(share, Opts) of - true -> - {error, Opts#{rc := ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}; - false -> check_sub(Topic, Opts, Caps) - end; -check_sub(Topic, Opts, [{mqtt_wildcard_subscription, true}|Caps]) -> - check_sub(Topic, Opts, Caps); -check_sub(Topic, Opts, [{mqtt_wildcard_subscription, false}|Caps]) -> - case emqx_topic:wildcard(Topic) of - true -> - {error, Opts#{rc := ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}; - false -> check_sub(Topic, Opts, Caps) - end; -check_sub(Topic, Opts, [{max_topic_levels, ?UNLIMITED}|Caps]) -> - check_sub(Topic, Opts, Caps); -check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) -> - case emqx_topic:levels(Topic) of - Levels when Levels > Limit -> - {error, Opts#{rc := ?RC_TOPIC_FILTER_INVALID}}; - _ -> check_sub(Topic, Opts, Caps) - end. +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) + when Levels >= Limit -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> + {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; +do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> + {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; +do_check_sub(_Flags, _Caps) -> ok. -default_caps() -> - ?DEFAULT_CAPS. +-spec(get_caps(emqx_zone:zone()) -> caps()). +get_caps(Zone) -> + with_env(Zone, '$mqtt_caps', fun all_caps/1). +-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). get_caps(Zone, publish) -> - with_env(Zone, '$mqtt_pub_caps', - fun() -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)) - end); + with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1); get_caps(Zone, subscribe) -> - with_env(Zone, '$mqtt_sub_caps', - fun() -> - filter_caps(?SUBCAP_KEYS, get_caps(Zone)) - end). + with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1). -get_caps(Zone) -> - with_env(Zone, '$mqtt_caps', - fun() -> - maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)} - || {Cap, Def} <- ?DEFAULT_CAPS]) - end). +pub_caps(Zone) -> + filter_caps(?PUBCAP_KEYS, get_caps(Zone)). + +sub_caps(Zone) -> + filter_caps(?SUBCAP_KEYS, get_caps(Zone)). + +all_caps(Zone) -> + maps:map(fun(Cap, Def) -> + emqx_zone:get_env(Zone, Cap, Def) + end, ?DEFAULT_CAPS). filter_caps(Keys, Caps) -> - maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). + maps:filter(fun(Key, Val) -> + lists:member(Key, Keys) andalso cap_limited(Key, Val) + end, Caps). + +cap_limited(Key, Val) -> + Val =/= maps:get(Key, ?DEFAULT_CAPS). with_env(Zone, Key, InitFun) -> case emqx_zone:get_env(Zone, Key) of - undefined -> Caps = InitFun(), - ok = emqx_zone:set_env(Zone, Key, Caps), - Caps; - ZoneCaps -> ZoneCaps + undefined -> + Caps = InitFun(Zone), + ok = emqx_zone:set_env(Zone, Key, Caps), + Caps; + Caps -> Caps end. +-spec(default() -> caps()). +default() -> ?DEFAULT_CAPS. +