Rewrite the mqtt_caps module
This commit is contained in:
parent
3230c25b56
commit
bfd027cf8b
|
@ -17,16 +17,16 @@
|
||||||
%% @doc MQTTv5 Capabilities
|
%% @doc MQTTv5 Capabilities
|
||||||
-module(emqx_mqtt_caps).
|
-module(emqx_mqtt_caps).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
-include("types.hrl").
|
||||||
|
|
||||||
-export([ check_pub/2
|
-export([ check_pub/2
|
||||||
, check_sub/2
|
, check_sub/3
|
||||||
, get_caps/1
|
, get_caps/1
|
||||||
, get_caps/2
|
, get_caps/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([default_caps/0]).
|
-export([default/0]).
|
||||||
|
|
||||||
-export_type([caps/0]).
|
-export_type([caps/0]).
|
||||||
|
|
||||||
|
@ -35,129 +35,117 @@
|
||||||
max_topic_alias => integer(),
|
max_topic_alias => integer(),
|
||||||
max_topic_levels => integer(),
|
max_topic_levels => integer(),
|
||||||
max_qos_allowed => emqx_types:qos(),
|
max_qos_allowed => emqx_types:qos(),
|
||||||
mqtt_retain_available => boolean(),
|
retain_available => boolean(),
|
||||||
mqtt_shared_subscription => boolean(),
|
wildcard_subscription => boolean(),
|
||||||
mqtt_wildcard_subscription => boolean()
|
subscription_identifiers => boolean(),
|
||||||
|
shared_subscription => boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(UNLIMITED, 0).
|
-define(UNLIMITED, 0).
|
||||||
|
|
||||||
-define(DEFAULT_CAPS, [{max_packet_size, ?MAX_PACKET_SIZE},
|
-define(PUBCAP_KEYS, [max_topic_alias,
|
||||||
{max_clientid_len, ?MAX_CLIENTID_LEN},
|
max_qos_allowed,
|
||||||
{max_topic_alias, ?UNLIMITED},
|
retain_available
|
||||||
{max_topic_levels, ?UNLIMITED},
|
|
||||||
{max_qos_allowed, ?QOS_2},
|
|
||||||
{mqtt_retain_available, true},
|
|
||||||
{mqtt_shared_subscription, true},
|
|
||||||
{mqtt_wildcard_subscription, true}
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(PUBCAP_KEYS, [max_qos_allowed,
|
-define(SUBCAP_KEYS, [max_topic_levels,
|
||||||
mqtt_retain_available,
|
wildcard_subscription,
|
||||||
max_topic_alias
|
shared_subscription
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(SUBCAP_KEYS, [max_qos_allowed,
|
-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||||
max_topic_levels,
|
max_clientid_len => ?MAX_CLIENTID_LEN,
|
||||||
mqtt_shared_subscription,
|
max_topic_alias => ?UNLIMITED,
|
||||||
mqtt_wildcard_subscription
|
max_topic_levels => ?UNLIMITED,
|
||||||
]).
|
max_qos_allowed => ?QOS_2,
|
||||||
|
retain_available => true,
|
||||||
|
wildcard_subscription => true,
|
||||||
|
subscription_identifiers => true,
|
||||||
|
shared_subscription => true
|
||||||
|
}).
|
||||||
|
|
||||||
-spec(check_pub(emqx_types:zone(), map()) -> ok | {error, emqx_types:reason_code()}).
|
-spec(check_pub(emqx_types:zone(),
|
||||||
check_pub(Zone, Props) when is_map(Props) ->
|
#{qos => emqx_types:qos(),
|
||||||
do_check_pub(Props, maps:to_list(get_caps(Zone, publish))).
|
retain => boolean()})
|
||||||
|
-> ok_or_error(emqx_types:reason_code())).
|
||||||
|
check_pub(Zone, Flags) when is_map(Flags) ->
|
||||||
|
do_check_pub(Flags, get_caps(Zone, publish)).
|
||||||
|
|
||||||
do_check_pub(_Props, []) ->
|
do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS})
|
||||||
ok;
|
when QoS > MaxQoS ->
|
||||||
do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
|
{error, ?RC_QOS_NOT_SUPPORTED};
|
||||||
case QoS > MaxQoS of
|
do_check_pub(#{retain := true}, #{retain_available := false}) ->
|
||||||
true -> {error, ?RC_QOS_NOT_SUPPORTED};
|
|
||||||
false -> do_check_pub(Props, Caps)
|
|
||||||
end;
|
|
||||||
do_check_pub(Props = #{ topic_alias := TopicAlias}, [{max_topic_alias, MaxTopicAlias}| Caps]) ->
|
|
||||||
case TopicAlias =< MaxTopicAlias andalso TopicAlias > 0 of
|
|
||||||
false -> {error, ?RC_TOPIC_ALIAS_INVALID};
|
|
||||||
true -> do_check_pub(Props, Caps)
|
|
||||||
end;
|
|
||||||
do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) ->
|
|
||||||
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
{error, ?RC_RETAIN_NOT_SUPPORTED};
|
||||||
do_check_pub(Props, [{max_topic_alias, _} | Caps]) ->
|
do_check_pub(#{topic_alias := TopicAlias},
|
||||||
do_check_pub(Props, Caps);
|
#{max_topic_alias := MaxTopicAlias})
|
||||||
do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) ->
|
when 0 == TopicAlias; TopicAlias >= MaxTopicAlias ->
|
||||||
do_check_pub(Props, Caps).
|
{error, ?RC_TOPIC_ALIAS_INVALID};
|
||||||
|
do_check_pub(_Flags, _Caps) -> ok.
|
||||||
|
|
||||||
-spec(check_sub(emqx_types:zone(), emqx_types:topic_filters())
|
-spec(check_sub(emqx_types:zone(),
|
||||||
-> {ok | error, emqx_types:topic_filters()}).
|
emqx_types:topic(),
|
||||||
check_sub(Zone, TopicFilters) ->
|
emqx_types:subopts())
|
||||||
Caps = maps:to_list(get_caps(Zone, subscribe)),
|
-> ok_or_error(emqx_types:reason_code())).
|
||||||
lists:foldr(fun({Topic, Opts}, {Ok, Result}) ->
|
check_sub(Zone, Topic, SubOpts) ->
|
||||||
case check_sub(Topic, Opts, Caps) of
|
Caps = get_caps(Zone, subscribe),
|
||||||
{ok, Opts1} ->
|
Flags = lists:foldl(
|
||||||
{Ok, [{Topic, Opts1}|Result]};
|
fun(max_topic_levels, Map) ->
|
||||||
{error, Opts1} ->
|
Map#{topic_levels => emqx_topic:levels(Topic)};
|
||||||
{error, [{Topic, Opts1}|Result]}
|
(wildcard_subscription, Map) ->
|
||||||
end
|
Map#{is_wildcard => emqx_topic:wildcard(Topic)};
|
||||||
end, {ok, []}, TopicFilters).
|
(shared_subscription, Map) ->
|
||||||
|
Map#{is_shared => maps:is_key(share, SubOpts)}
|
||||||
|
end, #{}, maps:keys(Caps)),
|
||||||
|
do_check_sub(Flags, Caps).
|
||||||
|
|
||||||
check_sub(_Topic, Opts, []) ->
|
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
|
||||||
{ok, Opts};
|
when Levels >= Limit ->
|
||||||
check_sub(Topic, Opts = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
|
{error, ?RC_TOPIC_FILTER_INVALID};
|
||||||
check_sub(Topic, Opts#{qos := min(QoS, MaxQoS)}, Caps);
|
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
|
||||||
check_sub(Topic, Opts, [{mqtt_shared_subscription, true}|Caps]) ->
|
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
|
||||||
check_sub(Topic, Opts, Caps);
|
do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
|
||||||
check_sub(Topic, Opts, [{mqtt_shared_subscription, false}|Caps]) ->
|
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
|
||||||
case maps:is_key(share, Opts) of
|
do_check_sub(_Flags, _Caps) -> ok.
|
||||||
true ->
|
|
||||||
{error, Opts#{rc := ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}};
|
|
||||||
false -> check_sub(Topic, Opts, Caps)
|
|
||||||
end;
|
|
||||||
check_sub(Topic, Opts, [{mqtt_wildcard_subscription, true}|Caps]) ->
|
|
||||||
check_sub(Topic, Opts, Caps);
|
|
||||||
check_sub(Topic, Opts, [{mqtt_wildcard_subscription, false}|Caps]) ->
|
|
||||||
case emqx_topic:wildcard(Topic) of
|
|
||||||
true ->
|
|
||||||
{error, Opts#{rc := ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}};
|
|
||||||
false -> check_sub(Topic, Opts, Caps)
|
|
||||||
end;
|
|
||||||
check_sub(Topic, Opts, [{max_topic_levels, ?UNLIMITED}|Caps]) ->
|
|
||||||
check_sub(Topic, Opts, Caps);
|
|
||||||
check_sub(Topic, Opts, [{max_topic_levels, Limit}|Caps]) ->
|
|
||||||
case emqx_topic:levels(Topic) of
|
|
||||||
Levels when Levels > Limit ->
|
|
||||||
{error, Opts#{rc := ?RC_TOPIC_FILTER_INVALID}};
|
|
||||||
_ -> check_sub(Topic, Opts, Caps)
|
|
||||||
end.
|
|
||||||
|
|
||||||
default_caps() ->
|
-spec(get_caps(emqx_zone:zone()) -> caps()).
|
||||||
?DEFAULT_CAPS.
|
get_caps(Zone) ->
|
||||||
|
with_env(Zone, '$mqtt_caps', fun all_caps/1).
|
||||||
|
|
||||||
|
-spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()).
|
||||||
get_caps(Zone, publish) ->
|
get_caps(Zone, publish) ->
|
||||||
with_env(Zone, '$mqtt_pub_caps',
|
with_env(Zone, '$mqtt_pub_caps', fun pub_caps/1);
|
||||||
fun() ->
|
|
||||||
filter_caps(?PUBCAP_KEYS, get_caps(Zone))
|
|
||||||
end);
|
|
||||||
|
|
||||||
get_caps(Zone, subscribe) ->
|
get_caps(Zone, subscribe) ->
|
||||||
with_env(Zone, '$mqtt_sub_caps',
|
with_env(Zone, '$mqtt_sub_caps', fun sub_caps/1).
|
||||||
fun() ->
|
|
||||||
filter_caps(?SUBCAP_KEYS, get_caps(Zone))
|
|
||||||
end).
|
|
||||||
|
|
||||||
get_caps(Zone) ->
|
pub_caps(Zone) ->
|
||||||
with_env(Zone, '$mqtt_caps',
|
filter_caps(?PUBCAP_KEYS, get_caps(Zone)).
|
||||||
fun() ->
|
|
||||||
maps:from_list([{Cap, emqx_zone:get_env(Zone, Cap, Def)}
|
sub_caps(Zone) ->
|
||||||
|| {Cap, Def} <- ?DEFAULT_CAPS])
|
filter_caps(?SUBCAP_KEYS, get_caps(Zone)).
|
||||||
end).
|
|
||||||
|
all_caps(Zone) ->
|
||||||
|
maps:map(fun(Cap, Def) ->
|
||||||
|
emqx_zone:get_env(Zone, Cap, Def)
|
||||||
|
end, ?DEFAULT_CAPS).
|
||||||
|
|
||||||
filter_caps(Keys, Caps) ->
|
filter_caps(Keys, Caps) ->
|
||||||
maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps).
|
maps:filter(fun(Key, Val) ->
|
||||||
|
lists:member(Key, Keys) andalso cap_limited(Key, Val)
|
||||||
|
end, Caps).
|
||||||
|
|
||||||
|
cap_limited(Key, Val) ->
|
||||||
|
Val =/= maps:get(Key, ?DEFAULT_CAPS).
|
||||||
|
|
||||||
with_env(Zone, Key, InitFun) ->
|
with_env(Zone, Key, InitFun) ->
|
||||||
case emqx_zone:get_env(Zone, Key) of
|
case emqx_zone:get_env(Zone, Key) of
|
||||||
undefined -> Caps = InitFun(),
|
undefined ->
|
||||||
|
Caps = InitFun(Zone),
|
||||||
ok = emqx_zone:set_env(Zone, Key, Caps),
|
ok = emqx_zone:set_env(Zone, Key, Caps),
|
||||||
Caps;
|
Caps;
|
||||||
ZoneCaps -> ZoneCaps
|
Caps -> Caps
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(default() -> caps()).
|
||||||
|
default() -> ?DEFAULT_CAPS.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue