From 7084e510d6f2e3a656939ab0a08ea21ba9a57366 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 23 Jun 2022 14:16:48 +0800 Subject: [PATCH] feat(exclusive): refactor the implementation --- .../emqx_modules/src/emqx_mod_exclusive.erl | 183 ------------------ src/emqx_broker.erl | 4 +- src/emqx_channel.erl | 4 +- src/emqx_exclusive_subscription.erl | 103 ++++++++++ src/emqx_mqtt_caps.erl | 27 ++- src/emqx_topic.erl | 8 +- 6 files changed, 131 insertions(+), 198 deletions(-) delete mode 100644 lib-ce/emqx_modules/src/emqx_mod_exclusive.erl create mode 100644 src/emqx_exclusive_subscription.erl diff --git a/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl b/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl deleted file mode 100644 index a7f36bdd6..000000000 --- a/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl +++ /dev/null @@ -1,183 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_exclusive). - --behaviour(emqx_gen_mod). - --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - --logger_header("[exclusive]"). - -%% Mnesia bootstrap --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% emqx_gen_mod callbacks --export([ - load/1, - unload/1, - description/0 -]). - --export([ - exclusive_subscribe/3, - exclusive_unsubscribe/3 -]). - --record(exclusive_subscription, { - topic :: emqx_types:topic(), - clientid :: emqx_types:clientid() -}). - --define(TAB, emqx_exclusive_subscription). - -%%-------------------------------------------------------------------- -%% Mnesia bootstrap -%%-------------------------------------------------------------------- - -mnesia(boot) -> - StoreProps = [ - {ets, [ - {read_concurrency, true}, - {write_concurrency, true} - ]} - ], - ok = ekka_mnesia:create_table(?TAB, [ - {type, set}, - {ram_copies, [node()]}, - {record_name, exclusive_subscription}, - {attributes, record_info(fields, exclusive_subscription)}, - {storage_properties, StoreProps} - ]); -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?TAB, ram_copies). - -%%-------------------------------------------------------------------- -%% Load/Unload -%%-------------------------------------------------------------------- - --spec load(list()) -> ok. -load(_Env) -> - emqx_hooks:put('client.subscribe', {?MODULE, exclusive_subscribe, []}), - emqx_hooks:put('client.unsubscribe', {?MODULE, exclusive_unsubscribe, []}). - --spec unload(list()) -> ok. -unload(_Env) -> - emqx_hooks:del('client.subscribe', {?MODULE, exclusive_subscribe}), - emqx_hooks:del('client.unsubscribe', {?MODULE, exclusive_unsubscribe}). - -description() -> - "EMQ X Exclusive Subscription Module". -%%-------------------------------------------------------------------- -%% Hooks -%%-------------------------------------------------------------------- -exclusive_subscribe(ClientInfo, _Prop, TopicFilters) -> - {ok, check_is_enabled(ClientInfo, TopicFilters, fun on_subscribe/2)}. - -exclusive_unsubscribe(ClientInfo, _Prop, TopicFilters) -> - check_is_enabled(ClientInfo, TopicFilters, fun on_unsubscribe/2). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- -check_is_enabled(#{zone := Zone} = ClientInfo, TopicFilters, Cont) -> - case emqx_zone:get_env(Zone, exclusive_subscription) of - false -> - TopicFilters; - _ -> - case lists:any(fun is_exclusive_subscribe/1, TopicFilters) of - false -> - TopicFilters; - _ -> - Cont(ClientInfo, TopicFilters) - end - end. - -on_subscribe(#{clientid := ClientId}, TopicFilters) -> - Fun = fun() -> - try_subscribe(ClientId, TopicFilters) - end, - case mnesia:transaction(Fun) of - {atomic, Res} -> - Res; - {aborted, Reason} -> - ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [TopicFilters, Reason]), - lists:map( - fun({Filter, SubOpts} = TopicFilter) -> - case is_exclusive_subscribe(Filter) of - false -> - TopicFilter; - _ -> - {Filter, SubOpts#{is_exclusive => true}} - end - end, - TopicFilters - ) - end. - -try_subscribe(ClientId, TopicFilters) -> - try_subscribe(TopicFilters, ClientId, []). - -try_subscribe([{<<"$exclusive/", _/binary>> = Topic, SubOpts} = TopicFilters | T], ClientId, Acc) -> - try_subscribe( - T, - ClientId, - case mnesia:wread({?TAB, Topic}) of - [] -> - mnesia:write( - ?TAB, - #exclusive_subscription{ - clientid = ClientId, - topic = Topic - }, - write - ), - [TopicFilters | Acc]; - [_] -> - [{Topic, SubOpts#{is_exclusive => true}} | Acc] - end - ); -try_subscribe([H | T], ClientId, Acc) -> - try_subscribe(T, ClientId, [H | Acc]); -try_subscribe([], _ClientId, Acc) -> - lists:reverse(Acc). - -on_unsubscribe(#{clientid := ClientId}, TopicFilters) -> - _ = mnesia:transaction(fun() -> try_unsubscribe(TopicFilters, ClientId) end). - -try_unsubscribe([{<<"$exclusive/", _/binary>> = Topic, _} | T], ClientId) -> - case mnesia:wread({?TAB, Topic}) of - [#exclusive_subscription{clientid = ClientId}] -> - mnesia:delete({?TAB, Topic}); - _ -> - ok - end, - try_unsubscribe(T, ClientId); -try_unsubscribe([H | T], ClientId) -> - try_unsubscribe(T, ClientId); -try_unsubscribe([], _) -> - ok. - -is_exclusive_subscribe({<<"$exclusive/", Rest/binary>>, _SubOpt}) when Rest =/= <<>> -> - true; -is_exclusive_subscribe(<<"$exclusive/", Rest/binary>>) when Rest =/= <<>> -> - true; -is_exclusive_subscribe(_) -> - false. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b93227990..876ba07eb 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -180,7 +180,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> case maps:get(shard, SubOpts, 0) of @@ -498,4 +499,3 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- - diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c667be30e..8ac537b69 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -1483,8 +1483,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> + emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId diff --git a/src/emqx_exclusive_subscription.erl b/src/emqx_exclusive_subscription.erl new file mode 100644 index 000000000..165629a86 --- /dev/null +++ b/src/emqx_exclusive_subscription.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_subscription). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ + check_subscribe/2, + unsubscribe/2 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> + allow | deny. +check_subscribe(#{clientid := ClientId}, Topic) -> + Fun = fun() -> + try_subscribe(ClientId, Topic) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]), + deny + end. + +unsubscribe(Topic, #{is_exclusive := true}) -> + _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + ok; +unsubscribe(_Topic, _SubOpts) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +try_subscribe(Topic, ClientId) -> + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + allow; + [_] -> + deny + end. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 196227d3c..f218fa795 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -43,7 +43,7 @@ retain_available => boolean(), wildcard_subscription => boolean(), subscription_identifiers => boolean(), - shared_subscription => boolean() + shared_subscription => boolean(), exclusive_subscription => boolean() }). @@ -96,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), +-spec(check_sub(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) -> ok_or_error(emqx_types:reason_code())). -check_sub(Zone, Topic, SubOpts) -> +check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = get_caps(Zone, subscribe), Flags = lists:foldl( fun(max_topic_levels, Map) -> @@ -113,18 +113,25 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), - do_check_sub(Flags, Caps). + do_check_sub(Flags, Caps, ClientInfo, Topic). -do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> +do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}) -> - {error, ?RC_QUOTA_EXCEEDED}; -do_check_sub(_Flags, _Caps) -> ok. +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> + case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of + deny -> + {error, ?RC_QUOTA_EXCEEDED}; + _ -> + ok + end; +do_check_sub(_Flags, _Caps, _, _) -> ok. default_caps() -> ?DEFAULT_CAPS. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 8c6ac7225..3d758d32e 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> _ -> error({invalid_topic_filter, TopicFilter}) end end; +parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> + case Topic of + <<>> -> + error({invalid_topic_filter, TopicFilter}); + _ -> + {Topic, Options#{is_exclusive => true}} + end; parse(TopicFilter, Options) -> {TopicFilter, Options}. -