diff --git a/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl b/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl new file mode 100644 index 000000000..a7f36bdd6 --- /dev/null +++ b/lib-ce/emqx_modules/src/emqx_mod_exclusive.erl @@ -0,0 +1,183 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mod_exclusive). + +-behaviour(emqx_gen_mod). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +%% emqx_gen_mod callbacks +-export([ + load/1, + unload/1, + description/0 +]). + +-export([ + exclusive_subscribe/3, + exclusive_unsubscribe/3 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% Load/Unload +%%-------------------------------------------------------------------- + +-spec load(list()) -> ok. +load(_Env) -> + emqx_hooks:put('client.subscribe', {?MODULE, exclusive_subscribe, []}), + emqx_hooks:put('client.unsubscribe', {?MODULE, exclusive_unsubscribe, []}). + +-spec unload(list()) -> ok. +unload(_Env) -> + emqx_hooks:del('client.subscribe', {?MODULE, exclusive_subscribe}), + emqx_hooks:del('client.unsubscribe', {?MODULE, exclusive_unsubscribe}). + +description() -> + "EMQ X Exclusive Subscription Module". +%%-------------------------------------------------------------------- +%% Hooks +%%-------------------------------------------------------------------- +exclusive_subscribe(ClientInfo, _Prop, TopicFilters) -> + {ok, check_is_enabled(ClientInfo, TopicFilters, fun on_subscribe/2)}. + +exclusive_unsubscribe(ClientInfo, _Prop, TopicFilters) -> + check_is_enabled(ClientInfo, TopicFilters, fun on_unsubscribe/2). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +check_is_enabled(#{zone := Zone} = ClientInfo, TopicFilters, Cont) -> + case emqx_zone:get_env(Zone, exclusive_subscription) of + false -> + TopicFilters; + _ -> + case lists:any(fun is_exclusive_subscribe/1, TopicFilters) of + false -> + TopicFilters; + _ -> + Cont(ClientInfo, TopicFilters) + end + end. + +on_subscribe(#{clientid := ClientId}, TopicFilters) -> + Fun = fun() -> + try_subscribe(ClientId, TopicFilters) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [TopicFilters, Reason]), + lists:map( + fun({Filter, SubOpts} = TopicFilter) -> + case is_exclusive_subscribe(Filter) of + false -> + TopicFilter; + _ -> + {Filter, SubOpts#{is_exclusive => true}} + end + end, + TopicFilters + ) + end. + +try_subscribe(ClientId, TopicFilters) -> + try_subscribe(TopicFilters, ClientId, []). + +try_subscribe([{<<"$exclusive/", _/binary>> = Topic, SubOpts} = TopicFilters | T], ClientId, Acc) -> + try_subscribe( + T, + ClientId, + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + [TopicFilters | Acc]; + [_] -> + [{Topic, SubOpts#{is_exclusive => true}} | Acc] + end + ); +try_subscribe([H | T], ClientId, Acc) -> + try_subscribe(T, ClientId, [H | Acc]); +try_subscribe([], _ClientId, Acc) -> + lists:reverse(Acc). + +on_unsubscribe(#{clientid := ClientId}, TopicFilters) -> + _ = mnesia:transaction(fun() -> try_unsubscribe(TopicFilters, ClientId) end). + +try_unsubscribe([{<<"$exclusive/", _/binary>> = Topic, _} | T], ClientId) -> + case mnesia:wread({?TAB, Topic}) of + [#exclusive_subscription{clientid = ClientId}] -> + mnesia:delete({?TAB, Topic}); + _ -> + ok + end, + try_unsubscribe(T, ClientId); +try_unsubscribe([H | T], ClientId) -> + try_unsubscribe(T, ClientId); +try_unsubscribe([], _) -> + ok. + +is_exclusive_subscribe({<<"$exclusive/", Rest/binary>>, _SubOpt}) when Rest =/= <<>> -> + true; +is_exclusive_subscribe(<<"$exclusive/", Rest/binary>>) when Rest =/= <<>> -> + true; +is_exclusive_subscribe(_) -> + false. diff --git a/priv/emqx.schema b/priv/emqx.schema index 576177c5d..b210b9be7 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -945,6 +945,12 @@ end}. {datatype, string} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "mqtt.shared_subscription", "emqx.exclusive_subscription", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -1199,6 +1205,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "zone.$name.exclusive_subscription", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx.zones", fun(Conf) -> Ratelimit = fun(Val) -> [L, D] = string:tokens(Val, ", "), diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 8e94d25a7..196227d3c 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -44,6 +44,7 @@ wildcard_subscription => boolean(), subscription_identifiers => boolean(), shared_subscription => boolean() + exclusive_subscription => boolean() }). -define(UNLIMITED, 0). @@ -56,7 +57,8 @@ -define(SUBCAP_KEYS, [max_topic_levels, max_qos_allowed, wildcard_subscription, - shared_subscription + shared_subscription, + exclusive_subscription ]). -define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, @@ -67,7 +69,8 @@ retain_available => true, wildcard_subscription => true, subscription_identifiers => true, - shared_subscription => true + shared_subscription => true, + exclusive_subscription => false }). -spec(check_pub(emqx_types:zone(), @@ -106,6 +109,8 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> Map#{is_shared => maps:is_key(share, SubOpts)}; + (exclusive_subscription, Map) -> + Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), do_check_sub(Flags, Caps). @@ -117,6 +122,8 @@ do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}) -> + {error, ?RC_QUOTA_EXCEEDED}; do_check_sub(_Flags, _Caps) -> ok. default_caps() ->