From af5bf52ddf23e180725e9f0b2a03050b78de6554 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 24 Jun 2022 17:35:36 +0800 Subject: [PATCH] feat: add exclusive subscription --- apps/emqx/i18n/emqx_schema_i18n.conf | 13 ++- apps/emqx/src/emqx_broker.erl | 3 +- apps/emqx/src/emqx_channel.erl | 4 +- apps/emqx/src/emqx_exclusive_subscription.erl | 106 ++++++++++++++++++ apps/emqx/src/emqx_mqtt_caps.erl | 34 ++++-- apps/emqx/src/emqx_schema.erl | 8 ++ apps/emqx/src/emqx_topic.erl | 7 ++ apps/emqx/test/emqx_mqtt_caps_SUITE.erl | 9 +- 8 files changed, 166 insertions(+), 18 deletions(-) create mode 100644 apps/emqx/src/emqx_exclusive_subscription.erl diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 24274930d..3e43a908f 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -754,6 +754,17 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 zone< } } + mqtt_exclusive_subscription { + desc { + en: """Whether to enable support for MQTT exclusive subscription.""" + zh: """是否启用对 MQTT 独占订阅的支持。""" + } + label: { + en: """Exclusive Subscription Available""" + zh: """独占订阅可用""" + } + } + mqtt_ignore_loop_deliver { desc { en: """Ignore loop delivery of messages for MQTT v3.1.1/v3.1.0, similar to No Local subscription option in MQTT 5.0""" @@ -2066,7 +2077,7 @@ Type of the rate limit. base_listener_enable_authn { desc { en: """ -Set true (default) to enable client authentication on this listener. +Set true (default) to enable client authentication on this listener. When set to false clients will be allowed to connect without authentication. """ zh: """ diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 4ea83673a..2e0a69dba 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -196,7 +196,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> case maps:get(shard, SubOpts, 0) of diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f6f3b4c4f..d6f2b87ea 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1865,8 +1865,8 @@ check_sub_authzs([], _Channel, Acc) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> + emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl new file mode 100644 index 000000000..b43511b41 --- /dev/null +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_subscription). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ + check_subscribe/2, + unsubscribe/2 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). +-define(EXCLUSIVE_SHARD, emqx_exclusive_shard). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = mria:create_table(?TAB, [ + {rlog_shard, ?EXCLUSIVE_SHARD}, + {type, set}, + {storage, ram_copies}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]), + ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> + allow | deny. +check_subscribe(#{clientid := ClientId}, Topic) -> + Fun = fun() -> + try_subscribe(ClientId, Topic) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?SLOG(warning, #{ + msg => "Cannot check subscribe ~p due to ~p.", topic => Topic, reason => Reason + }), + deny + end. + +unsubscribe(Topic, #{is_exclusive := true}) -> + _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + ok; +unsubscribe(_Topic, _SubOpts) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +try_subscribe(ClientId, Topic) -> + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + allow; + [_] -> + deny + end. diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index e59bcfd0e..cde97394d 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -38,7 +38,8 @@ retain_available => boolean(), wildcard_subscription => boolean(), subscription_identifiers => boolean(), - shared_subscription => boolean() + shared_subscription => boolean(), + exclusive_subscription => boolean() }. -define(MAX_TOPIC_LEVELS, 65535). @@ -53,7 +54,8 @@ max_topic_levels, max_qos_allowed, wildcard_subscription, - shared_subscription + shared_subscription, + exclusive_subscription ]). -define(DEFAULT_CAPS, #{ @@ -65,7 +67,8 @@ retain_available => true, wildcard_subscription => true, subscription_identifiers => true, - shared_subscription => true + shared_subscription => true, + exclusive_subscription => true }). -spec check_pub( @@ -102,12 +105,12 @@ do_check_pub(_Flags, _Caps) -> ok. -spec check_sub( - emqx_types:zone(), + emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts() ) -> ok_or_error(emqx_types:reason_code()). -check_sub(Zone, Topic, SubOpts) -> +check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone)), Flags = lists:foldl( fun @@ -117,6 +120,8 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> Map#{is_shared => maps:is_key(share, SubOpts)}; + (exclusive_subscription, Map) -> + Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; %% Ignore (_Key, Map) -> Map @@ -124,17 +129,26 @@ check_sub(Zone, Topic, SubOpts) -> #{}, maps:keys(Caps) ), - do_check_sub(Flags, Caps). + do_check_sub(Flags, Caps, ClientInfo, Topic). -do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) when +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> +do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(_Flags, _Caps) -> +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> + case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of + deny -> + {error, ?RC_QUOTA_EXCEEDED}; + _ -> + ok + end; +do_check_sub(_Flags, _Caps, _, _) -> ok. get_caps(Zone) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index c38ef6111..d9ebc2faf 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -439,6 +439,14 @@ fields("mqtt") -> desc => ?DESC(mqtt_shared_subscription) } )}, + {"exclusive_subscription", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(mqtt_exclusive_subscription) + } + )}, {"ignore_loop_deliver", sc( boolean(), diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 35e69bc3e..0f0a8bf1c 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -226,5 +226,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> _ -> error({invalid_topic_filter, TopicFilter}) end end; +parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> + case Topic of + <<>> -> + error({invalid_topic_filter, TopicFilter}); + _ -> + {Topic, Options#{is_exclusive => true}} + end; parse(TopicFilter, Options) -> {TopicFilter, Options}. diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index b992e2041..efe215276 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -55,17 +55,18 @@ t_check_sub(_) -> emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false), emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false), timer:sleep(50), - ok = emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts), + ClientInfo = #{zone => default}, + ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), ?assertEqual( {error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(default, <<"a/b/c/d">>, SubOpts) + emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts) ), ?assertEqual( {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts) + emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts) ), ?assertEqual( {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true}) + emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}) ), emqx_config:put([zones], OldConf).