From af5bf52ddf23e180725e9f0b2a03050b78de6554 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 24 Jun 2022 17:35:36 +0800 Subject: [PATCH 1/5] feat: add exclusive subscription --- apps/emqx/i18n/emqx_schema_i18n.conf | 13 ++- apps/emqx/src/emqx_broker.erl | 3 +- apps/emqx/src/emqx_channel.erl | 4 +- apps/emqx/src/emqx_exclusive_subscription.erl | 106 ++++++++++++++++++ apps/emqx/src/emqx_mqtt_caps.erl | 34 ++++-- apps/emqx/src/emqx_schema.erl | 8 ++ apps/emqx/src/emqx_topic.erl | 7 ++ apps/emqx/test/emqx_mqtt_caps_SUITE.erl | 9 +- 8 files changed, 166 insertions(+), 18 deletions(-) create mode 100644 apps/emqx/src/emqx_exclusive_subscription.erl diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 24274930d..3e43a908f 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -754,6 +754,17 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 zone< } } + mqtt_exclusive_subscription { + desc { + en: """Whether to enable support for MQTT exclusive subscription.""" + zh: """是否启用对 MQTT 独占订阅的支持。""" + } + label: { + en: """Exclusive Subscription Available""" + zh: """独占订阅可用""" + } + } + mqtt_ignore_loop_deliver { desc { en: """Ignore loop delivery of messages for MQTT v3.1.1/v3.1.0, similar to No Local subscription option in MQTT 5.0""" @@ -2066,7 +2077,7 @@ Type of the rate limit. base_listener_enable_authn { desc { en: """ -Set true (default) to enable client authentication on this listener. +Set true (default) to enable client authentication on this listener. When set to false clients will be allowed to connect without authentication. """ zh: """ diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 4ea83673a..2e0a69dba 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -196,7 +196,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> case maps:get(shard, SubOpts, 0) of diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index f6f3b4c4f..d6f2b87ea 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1865,8 +1865,8 @@ check_sub_authzs([], _Channel, Acc) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> + emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl new file mode 100644 index 000000000..b43511b41 --- /dev/null +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_subscription). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ + check_subscribe/2, + unsubscribe/2 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). +-define(EXCLUSIVE_SHARD, emqx_exclusive_shard). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = mria:create_table(?TAB, [ + {rlog_shard, ?EXCLUSIVE_SHARD}, + {type, set}, + {storage, ram_copies}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]), + ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> + allow | deny. +check_subscribe(#{clientid := ClientId}, Topic) -> + Fun = fun() -> + try_subscribe(ClientId, Topic) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?SLOG(warning, #{ + msg => "Cannot check subscribe ~p due to ~p.", topic => Topic, reason => Reason + }), + deny + end. + +unsubscribe(Topic, #{is_exclusive := true}) -> + _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + ok; +unsubscribe(_Topic, _SubOpts) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +try_subscribe(ClientId, Topic) -> + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + allow; + [_] -> + deny + end. diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index e59bcfd0e..cde97394d 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -38,7 +38,8 @@ retain_available => boolean(), wildcard_subscription => boolean(), subscription_identifiers => boolean(), - shared_subscription => boolean() + shared_subscription => boolean(), + exclusive_subscription => boolean() }. -define(MAX_TOPIC_LEVELS, 65535). @@ -53,7 +54,8 @@ max_topic_levels, max_qos_allowed, wildcard_subscription, - shared_subscription + shared_subscription, + exclusive_subscription ]). -define(DEFAULT_CAPS, #{ @@ -65,7 +67,8 @@ retain_available => true, wildcard_subscription => true, subscription_identifiers => true, - shared_subscription => true + shared_subscription => true, + exclusive_subscription => true }). -spec check_pub( @@ -102,12 +105,12 @@ do_check_pub(_Flags, _Caps) -> ok. -spec check_sub( - emqx_types:zone(), + emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts() ) -> ok_or_error(emqx_types:reason_code()). -check_sub(Zone, Topic, SubOpts) -> +check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone)), Flags = lists:foldl( fun @@ -117,6 +120,8 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> Map#{is_shared => maps:is_key(share, SubOpts)}; + (exclusive_subscription, Map) -> + Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; %% Ignore (_Key, Map) -> Map @@ -124,17 +129,26 @@ check_sub(Zone, Topic, SubOpts) -> #{}, maps:keys(Caps) ), - do_check_sub(Flags, Caps). + do_check_sub(Flags, Caps, ClientInfo, Topic). -do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) when +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> +do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(_Flags, _Caps) -> +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> + case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of + deny -> + {error, ?RC_QUOTA_EXCEEDED}; + _ -> + ok + end; +do_check_sub(_Flags, _Caps, _, _) -> ok. get_caps(Zone) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index c38ef6111..d9ebc2faf 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -439,6 +439,14 @@ fields("mqtt") -> desc => ?DESC(mqtt_shared_subscription) } )}, + {"exclusive_subscription", + sc( + boolean(), + #{ + default => true, + desc => ?DESC(mqtt_exclusive_subscription) + } + )}, {"ignore_loop_deliver", sc( boolean(), diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 35e69bc3e..0f0a8bf1c 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -226,5 +226,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> _ -> error({invalid_topic_filter, TopicFilter}) end end; +parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> + case Topic of + <<>> -> + error({invalid_topic_filter, TopicFilter}); + _ -> + {Topic, Options#{is_exclusive => true}} + end; parse(TopicFilter, Options) -> {TopicFilter, Options}. diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index b992e2041..efe215276 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -55,17 +55,18 @@ t_check_sub(_) -> emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false), emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false), timer:sleep(50), - ok = emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts), + ClientInfo = #{zone => default}, + ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), ?assertEqual( {error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(default, <<"a/b/c/d">>, SubOpts) + emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts) ), ?assertEqual( {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts) + emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts) ), ?assertEqual( {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true}) + emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true}) ), emqx_config:put([zones], OldConf). From 85f6846f89bf26c349a3f0230cd3e4913d71af48 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 24 Jun 2022 17:45:09 +0800 Subject: [PATCH 2/5] chore: update emqx.appup.src --- apps/emqx/src/emqx.appup.src | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx.appup.src b/apps/emqx/src/emqx.appup.src index 0bfb23807..35dc3b088 100644 --- a/apps/emqx/src/emqx.appup.src +++ b/apps/emqx/src/emqx.appup.src @@ -5,17 +5,23 @@ [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_schema,brutal_purge,soft_purge,[]}, {load_module,emqx_release,brutal_purge,soft_purge,[]}, - {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_authentication,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {<<".*">>,[]}], [{"5.0.0", [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_schema,brutal_purge,soft_purge,[]}, {load_module,emqx_release,brutal_purge,soft_purge,[]}, - {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_authentication,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}, {load_module,emqx_relup}]}, {<<".*">>,[]}]}. From b1b1d405286dbddb31df3eb77e68706fc00de6a0 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 24 Jun 2022 19:41:20 +0800 Subject: [PATCH 3/5] fix(exclusive): fix xref error && change exclusive default value --- apps/emqx/src/emqx_exclusive_subscription.erl | 4 ++-- apps/emqx/src/emqx_mqtt_caps.erl | 2 +- apps/emqx/src/emqx_schema.erl | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl index b43511b41..2eae1f203 100644 --- a/apps/emqx/src/emqx_exclusive_subscription.erl +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -70,7 +70,7 @@ check_subscribe(#{clientid := ClientId}, Topic) -> Fun = fun() -> try_subscribe(ClientId, Topic) end, - case mnesia:transaction(Fun) of + case mria:transaction(?EXCLUSIVE_SHARD, Fun) of {atomic, Res} -> Res; {aborted, Reason} -> @@ -81,7 +81,7 @@ check_subscribe(#{clientid := ClientId}, Topic) -> end. unsubscribe(Topic, #{is_exclusive := true}) -> - _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + _ = mria:transaction(?EXCLUSIVE_SHARD, fun() -> mnesia:delete({?TAB, Topic}) end), ok; unsubscribe(_Topic, _SubOpts) -> ok. diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index cde97394d..fbe4684a7 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -68,7 +68,7 @@ wildcard_subscription => true, subscription_identifiers => true, shared_subscription => true, - exclusive_subscription => true + exclusive_subscription => false }). -spec check_pub( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d9ebc2faf..5652b37f7 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -443,7 +443,7 @@ fields("mqtt") -> sc( boolean(), #{ - default => true, + default => false, desc => ?DESC(mqtt_exclusive_subscription) } )}, From 5981a23fe46af9adaa008a8e1607678eb913b328 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 30 Jun 2022 09:48:30 +0800 Subject: [PATCH 4/5] fix(exclusive): add upgarde API && fix description error --- apps/emqx/i18n/emqx_schema_i18n.conf | 6 +++--- apps/emqx/src/emqx_exclusive_subscription.erl | 13 +++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 3e43a908f..330c766d1 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -757,11 +757,11 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 zone< mqtt_exclusive_subscription { desc { en: """Whether to enable support for MQTT exclusive subscription.""" - zh: """是否启用对 MQTT 独占订阅的支持。""" + zh: """是否启用对 MQTT 排它订阅的支持。""" } label: { - en: """Exclusive Subscription Available""" - zh: """独占订阅可用""" + en: """Exclusive Subscription""" + zh: """排它订阅""" } } diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl index 2eae1f203..f419740d3 100644 --- a/apps/emqx/src/emqx_exclusive_subscription.erl +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -24,6 +24,9 @@ %% Mnesia bootstrap -export([mnesia/1]). +%% For upgrade +-export([on_add_module/0, on_delete_module/0]). + -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). @@ -61,6 +64,16 @@ mnesia(boot) -> ]), ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity). +%%-------------------------------------------------------------------- +%% Upgrade +%%-------------------------------------------------------------------- + +on_add_module() -> + mnesia(boot). + +on_delete_module() -> + mria:clear_table(?EXCLUSIVE_SHARD). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- From d205f10fab60b66d489038c3bebf434aa4d7a87b Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 1 Jul 2022 14:59:10 +0800 Subject: [PATCH 5/5] chore: add upgarde callback for emqx_exclusive_subscription --- apps/emqx/src/emqx.appup.src | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/src/emqx.appup.src b/apps/emqx/src/emqx.appup.src index 35dc3b088..0cef3736e 100644 --- a/apps/emqx/src/emqx.appup.src +++ b/apps/emqx/src/emqx.appup.src @@ -8,6 +8,7 @@ {load_module,emqx_authentication,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {add_module,emqx_exclusive_subscription}, + {apply, {emqx_exclusive_subscription, on_add_module, []}}, {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, @@ -22,6 +23,7 @@ {load_module,emqx_broker,brutal_purge,soft_purge,[]}, {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {apply, {emqx_exclusive_subscription, on_delete_module, []}}, {delete_module,emqx_exclusive_subscription}, {load_module,emqx_relup}]}, {<<".*">>,[]}]}.