From 6599b71b5e5bdfabe7545c31de0cd76f309c98f7 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 22 Jun 2022 15:01:04 +0800 Subject: [PATCH 1/5] feat: add exclusive subscription --- .../emqx_modules/src/emqx_mod_exclusive.erl | 183 ++++++++++++++++++ priv/emqx.schema | 11 ++ src/emqx_mqtt_caps.erl | 11 +- 3 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 lib-ce/emqx_modules/src/emqx_mod_exclusive.erl diff --git a/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl b/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl new file mode 100644 index 000000000..a7f36bdd6 --- /dev/null +++ b/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl @@ -0,0 +1,183 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_exclusive). + +-behaviour(emqx_gen_mod). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% emqx_gen_mod callbacks +-export([ + load/1, + unload/1, + description/0 +]). + +-export([ + exclusive_subscribe/3, + exclusive_unsubscribe/3 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% Load/Unload +%%-------------------------------------------------------------------- + +-spec load(list()) -> ok. +load(_Env) -> + emqx_hooks:put('client.subscribe', {?MODULE, exclusive_subscribe, []}), + emqx_hooks:put('client.unsubscribe', {?MODULE, exclusive_unsubscribe, []}). + +-spec unload(list()) -> ok. +unload(_Env) -> + emqx_hooks:del('client.subscribe', {?MODULE, exclusive_subscribe}), + emqx_hooks:del('client.unsubscribe', {?MODULE, exclusive_unsubscribe}). + +description() -> + "EMQ X Exclusive Subscription Module". +%%-------------------------------------------------------------------- +%% Hooks +%%-------------------------------------------------------------------- +exclusive_subscribe(ClientInfo, _Prop, TopicFilters) -> + {ok, check_is_enabled(ClientInfo, TopicFilters, fun on_subscribe/2)}. + +exclusive_unsubscribe(ClientInfo, _Prop, TopicFilters) -> + check_is_enabled(ClientInfo, TopicFilters, fun on_unsubscribe/2). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +check_is_enabled(#{zone := Zone} = ClientInfo, TopicFilters, Cont) -> + case emqx_zone:get_env(Zone, exclusive_subscription) of + false -> + TopicFilters; + _ -> + case lists:any(fun is_exclusive_subscribe/1, TopicFilters) of + false -> + TopicFilters; + _ -> + Cont(ClientInfo, TopicFilters) + end + end. + +on_subscribe(#{clientid := ClientId}, TopicFilters) -> + Fun = fun() -> + try_subscribe(ClientId, TopicFilters) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [TopicFilters, Reason]), + lists:map( + fun({Filter, SubOpts} = TopicFilter) -> + case is_exclusive_subscribe(Filter) of + false -> + TopicFilter; + _ -> + {Filter, SubOpts#{is_exclusive => true}} + end + end, + TopicFilters + ) + end. + +try_subscribe(ClientId, TopicFilters) -> + try_subscribe(TopicFilters, ClientId, []). + +try_subscribe([{<<"$exclusive/", _/binary>> = Topic, SubOpts} = TopicFilters | T], ClientId, Acc) -> + try_subscribe( + T, + ClientId, + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + [TopicFilters | Acc]; + [_] -> + [{Topic, SubOpts#{is_exclusive => true}} | Acc] + end + ); +try_subscribe([H | T], ClientId, Acc) -> + try_subscribe(T, ClientId, [H | Acc]); +try_subscribe([], _ClientId, Acc) -> + lists:reverse(Acc). + +on_unsubscribe(#{clientid := ClientId}, TopicFilters) -> + _ = mnesia:transaction(fun() -> try_unsubscribe(TopicFilters, ClientId) end). + +try_unsubscribe([{<<"$exclusive/", _/binary>> = Topic, _} | T], ClientId) -> + case mnesia:wread({?TAB, Topic}) of + [#exclusive_subscription{clientid = ClientId}] -> + mnesia:delete({?TAB, Topic}); + _ -> + ok + end, + try_unsubscribe(T, ClientId); +try_unsubscribe([H | T], ClientId) -> + try_unsubscribe(T, ClientId); +try_unsubscribe([], _) -> + ok. + +is_exclusive_subscribe({<<"$exclusive/", Rest/binary>>, _SubOpt}) when Rest =/= <<>> -> + true; +is_exclusive_subscribe(<<"$exclusive/", Rest/binary>>) when Rest =/= <<>> -> + true; +is_exclusive_subscribe(_) -> + false. diff --git a/priv/emqx.schema b/priv/emqx.schema index 576177c5d..b210b9be7 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -945,6 +945,12 @@ end}. {datatype, string} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "mqtt.shared_subscription", "emqx.exclusive_subscription", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -1199,6 +1205,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "zone.$name.exclusive_subscription", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx.zones", fun(Conf) -> Ratelimit = fun(Val) -> [L, D] = string:tokens(Val, ", "), diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 8e94d25a7..196227d3c 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -44,6 +44,7 @@ wildcard_subscription => boolean(), subscription_identifiers => boolean(), shared_subscription => boolean() + exclusive_subscription => boolean() }). -define(UNLIMITED, 0). @@ -56,7 +57,8 @@ -define(SUBCAP_KEYS, [max_topic_levels, max_qos_allowed, wildcard_subscription, - shared_subscription + shared_subscription, + exclusive_subscription ]). -define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, @@ -67,7 +69,8 @@ retain_available => true, wildcard_subscription => true, subscription_identifiers => true, - shared_subscription => true + shared_subscription => true, + exclusive_subscription => false }). -spec(check_pub(emqx_types:zone(), @@ -106,6 +109,8 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> Map#{is_shared => maps:is_key(share, SubOpts)}; + (exclusive_subscription, Map) -> + Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), do_check_sub(Flags, Caps). @@ -117,6 +122,8 @@ do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}) -> + {error, ?RC_QUOTA_EXCEEDED}; do_check_sub(_Flags, _Caps) -> ok. default_caps() -> From 7084e510d6f2e3a656939ab0a08ea21ba9a57366 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 23 Jun 2022 14:16:48 +0800 Subject: [PATCH 2/5] feat(exclusive): refactor the implementation --- .../emqx_modules/src/emqx_mod_exclusive.erl | 183 ------------------ src/emqx_broker.erl | 4 +- src/emqx_channel.erl | 4 +- src/emqx_exclusive_subscription.erl | 103 ++++++++++ src/emqx_mqtt_caps.erl | 27 ++- src/emqx_topic.erl | 8 +- 6 files changed, 131 insertions(+), 198 deletions(-) delete mode 100644 lib-ce/emqx_modules/src/emqx_mod_exclusive.erl create mode 100644 src/emqx_exclusive_subscription.erl diff --git a/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl b/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl deleted file mode 100644 index a7f36bdd6..000000000 --- a/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl +++ /dev/null @@ -1,183 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_exclusive). - --behaviour(emqx_gen_mod). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - --logger_header("[exclusive]"). - -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% emqx_gen_mod callbacks --export([ - load/1, - unload/1, - description/0 -]). - --export([ - exclusive_subscribe/3, - exclusive_unsubscribe/3 -]). - --record(exclusive_subscription, { - topic :: emqx_types:topic(), - clientid :: emqx_types:clientid() -}). - --define(TAB, emqx_exclusive_subscription). - -%%-------------------------------------------------------------------- -%% Mnesia bootstrap -%%-------------------------------------------------------------------- - -mnesia(boot) -> - StoreProps = [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ], - ok = ekka_mnesia:create_table(?TAB, [ - {type, set}, - {ram_copies, [node()]}, - {record_name, exclusive_subscription}, - {attributes, record_info(fields, exclusive_subscription)}, - {storage_properties, StoreProps} - ]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, ram_copies). - -%%-------------------------------------------------------------------- -%% Load/Unload -%%-------------------------------------------------------------------- - --spec load(list()) -> ok. -load(_Env) -> - emqx_hooks:put('client.subscribe', {?MODULE, exclusive_subscribe, []}), - emqx_hooks:put('client.unsubscribe', {?MODULE, exclusive_unsubscribe, []}). - --spec unload(list()) -> ok. -unload(_Env) -> - emqx_hooks:del('client.subscribe', {?MODULE, exclusive_subscribe}), - emqx_hooks:del('client.unsubscribe', {?MODULE, exclusive_unsubscribe}). - -description() -> - "EMQ X Exclusive Subscription Module". -%%-------------------------------------------------------------------- -%% Hooks -%%-------------------------------------------------------------------- -exclusive_subscribe(ClientInfo, _Prop, TopicFilters) -> - {ok, check_is_enabled(ClientInfo, TopicFilters, fun on_subscribe/2)}. - -exclusive_unsubscribe(ClientInfo, _Prop, TopicFilters) -> - check_is_enabled(ClientInfo, TopicFilters, fun on_unsubscribe/2). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- -check_is_enabled(#{zone := Zone} = ClientInfo, TopicFilters, Cont) -> - case emqx_zone:get_env(Zone, exclusive_subscription) of - false -> - TopicFilters; - _ -> - case lists:any(fun is_exclusive_subscribe/1, TopicFilters) of - false -> - TopicFilters; - _ -> - Cont(ClientInfo, TopicFilters) - end - end. - -on_subscribe(#{clientid := ClientId}, TopicFilters) -> - Fun = fun() -> - try_subscribe(ClientId, TopicFilters) - end, - case mnesia:transaction(Fun) of - {atomic, Res} -> - Res; - {aborted, Reason} -> - ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [TopicFilters, Reason]), - lists:map( - fun({Filter, SubOpts} = TopicFilter) -> - case is_exclusive_subscribe(Filter) of - false -> - TopicFilter; - _ -> - {Filter, SubOpts#{is_exclusive => true}} - end - end, - TopicFilters - ) - end. - -try_subscribe(ClientId, TopicFilters) -> - try_subscribe(TopicFilters, ClientId, []). - -try_subscribe([{<<"$exclusive/", _/binary>> = Topic, SubOpts} = TopicFilters | T], ClientId, Acc) -> - try_subscribe( - T, - ClientId, - case mnesia:wread({?TAB, Topic}) of - [] -> - mnesia:write( - ?TAB, - #exclusive_subscription{ - clientid = ClientId, - topic = Topic - }, - write - ), - [TopicFilters | Acc]; - [_] -> - [{Topic, SubOpts#{is_exclusive => true}} | Acc] - end - ); -try_subscribe([H | T], ClientId, Acc) -> - try_subscribe(T, ClientId, [H | Acc]); -try_subscribe([], _ClientId, Acc) -> - lists:reverse(Acc). - -on_unsubscribe(#{clientid := ClientId}, TopicFilters) -> - _ = mnesia:transaction(fun() -> try_unsubscribe(TopicFilters, ClientId) end). - -try_unsubscribe([{<<"$exclusive/", _/binary>> = Topic, _} | T], ClientId) -> - case mnesia:wread({?TAB, Topic}) of - [#exclusive_subscription{clientid = ClientId}] -> - mnesia:delete({?TAB, Topic}); - _ -> - ok - end, - try_unsubscribe(T, ClientId); -try_unsubscribe([H | T], ClientId) -> - try_unsubscribe(T, ClientId); -try_unsubscribe([], _) -> - ok. - -is_exclusive_subscribe({<<"$exclusive/", Rest/binary>>, _SubOpt}) when Rest =/= <<>> -> - true; -is_exclusive_subscribe(<<"$exclusive/", Rest/binary>>) when Rest =/= <<>> -> - true; -is_exclusive_subscribe(_) -> - false. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b93227990..876ba07eb 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -180,7 +180,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> case maps:get(shard, SubOpts, 0) of @@ -498,4 +499,3 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c667be30e..8ac537b69 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1483,8 +1483,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> + emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId diff --git a/src/emqx_exclusive_subscription.erl b/src/emqx_exclusive_subscription.erl new file mode 100644 index 000000000..165629a86 --- /dev/null +++ b/src/emqx_exclusive_subscription.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_subscription). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ + check_subscribe/2, + unsubscribe/2 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> + allow | deny. +check_subscribe(#{clientid := ClientId}, Topic) -> + Fun = fun() -> + try_subscribe(ClientId, Topic) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]), + deny + end. + +unsubscribe(Topic, #{is_exclusive := true}) -> + _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + ok; +unsubscribe(_Topic, _SubOpts) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +try_subscribe(Topic, ClientId) -> + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + allow; + [_] -> + deny + end. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 196227d3c..f218fa795 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -43,7 +43,7 @@ retain_available => boolean(), wildcard_subscription => boolean(), subscription_identifiers => boolean(), - shared_subscription => boolean() + shared_subscription => boolean(), exclusive_subscription => boolean() }). @@ -96,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), +-spec(check_sub(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) -> ok_or_error(emqx_types:reason_code())). -check_sub(Zone, Topic, SubOpts) -> +check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = get_caps(Zone, subscribe), Flags = lists:foldl( fun(max_topic_levels, Map) -> @@ -113,18 +113,25 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), - do_check_sub(Flags, Caps). + do_check_sub(Flags, Caps, ClientInfo, Topic). -do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> +do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}) -> - {error, ?RC_QUOTA_EXCEEDED}; -do_check_sub(_Flags, _Caps) -> ok. +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> + case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of + deny -> + {error, ?RC_QUOTA_EXCEEDED}; + _ -> + ok + end; +do_check_sub(_Flags, _Caps, _, _) -> ok. default_caps() -> ?DEFAULT_CAPS. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 8c6ac7225..3d758d32e 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> _ -> error({invalid_topic_filter, TopicFilter}) end end; +parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> + case Topic of + <<>> -> + error({invalid_topic_filter, TopicFilter}); + _ -> + {Topic, Options#{is_exclusive => true}} + end; parse(TopicFilter, Options) -> {TopicFilter, Options}. - From 746b996de40d4bdac1d29d87558b90ef860511fa Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 23 Jun 2022 15:56:47 +0800 Subject: [PATCH 3/5] feat(exclusive): update schema and conf --- etc/emqx.conf | 15 +++++++++++++++ priv/emqx.schema | 5 +++-- src/emqx_exclusive_subscription.erl | 2 +- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index a4892c610..94d5cbecb 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -742,6 +742,11 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +mqtt.exclusive_subscription = false + ## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) ## ## Value: true | false @@ -847,6 +852,11 @@ zone.external.force_gc_policy = 16000|16MB ## Value: boolean ## zone.external.shared_subscription = false +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +## zone.external.exclusive_subscription = false + ## Server Keep Alive ## ## Value: Number @@ -1049,6 +1059,11 @@ zone.internal.acl_deny_action = ignore ## Value: boolean ## zone.internal.shared_subscription = true +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +## zone.internal.exclusive_subscription = false + ## See zone.$name.max_subscriptions. ## ## Value: Integer diff --git a/priv/emqx.schema b/priv/emqx.schema index b210b9be7..b3fd916a8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -946,8 +946,8 @@ end}. ]}. %% @doc Whether the Server supports Exclusive Subscriptions. -{mapping, "mqtt.shared_subscription", "emqx.exclusive_subscription", [ - {default, true}, +{mapping, "mqtt.exclusive_subscription", "emqx.exclusive_subscription", [ + {default, false}, {datatype, {enum, [true, false]}} ]}. @@ -1207,6 +1207,7 @@ end}. %% @doc Whether the Server supports Exclusive Subscriptions. {mapping, "zone.$name.exclusive_subscription", "emqx.zones", [ + {default, false}, {datatype, {enum, [true, false]}} ]}. diff --git a/src/emqx_exclusive_subscription.erl b/src/emqx_exclusive_subscription.erl index 165629a86..ab26a6bd6 100644 --- a/src/emqx_exclusive_subscription.erl +++ b/src/emqx_exclusive_subscription.erl @@ -86,7 +86,7 @@ unsubscribe(_Topic, _SubOpts) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -try_subscribe(Topic, ClientId) -> +try_subscribe(ClientId, Topic) -> case mnesia:wread({?TAB, Topic}) of [] -> mnesia:write( From fc4794613c33a2264c31bc144440f70ea73af91c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 23 Jun 2022 16:26:29 +0800 Subject: [PATCH 4/5] test(mqtt_caps): fix t_check_sub error --- test/emqx_mqtt_caps_SUITE.erl | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 992e7743e..c2b1a477b 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -53,11 +53,12 @@ t_check_sub(_) -> }, emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), timer:sleep(50), - ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), + ClientInfo = #{zone => zone}, + ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), + emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)), ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), + emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), + emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})), emqx_zone:unset_env(zone, '$mqtt_pub_caps'). From a90f91d3e8af77cd55dc04418e400634ac5c9f6c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 23 Jun 2022 16:45:30 +0800 Subject: [PATCH 5/5] chore: update emqx.appup.src --- src/emqx.appup.src | 260 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 209 insertions(+), 51 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 827a4663b..db68145cb 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,13 +2,32 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.3.16", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.3.15", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -26,7 +45,11 @@ {update,emqx_os_mon,{advanced,[]}}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.3.14", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -47,7 +70,11 @@ {update,emqx_os_mon,{advanced,[]}}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -75,7 +102,11 @@ {update,emqx_os_mon,{advanced,[]}}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -107,7 +138,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.11", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -141,7 +176,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.10", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -175,7 +214,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -213,7 +256,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -251,7 +298,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -289,7 +340,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -327,7 +382,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -365,7 +424,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -403,7 +466,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -441,7 +508,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -479,7 +550,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -519,7 +594,11 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{add_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {add_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -563,13 +642,31 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.3.16", - [{load_module,emqx_channel,brutal_purge,soft_purge,[]}, + [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, + {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, - {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.15", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -584,9 +681,13 @@ {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, - {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_app,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.14", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, @@ -604,9 +705,13 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, - {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.13", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -631,9 +736,13 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, - {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.12", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -661,9 +770,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.11", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -693,9 +806,13 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.10", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -725,9 +842,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.9", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -761,9 +882,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.8", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -797,9 +922,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.7", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -833,9 +962,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.6", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -869,9 +1002,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.5", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -905,9 +1042,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.4", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, @@ -941,9 +1082,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.3", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -977,9 +1122,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.2", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, @@ -1013,9 +1162,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.1", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -1051,9 +1204,13 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {"4.3.0", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {delete_module,emqx_calendar}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -1091,5 +1248,6 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}]}, {<<".*">>,[]}]}.