Merge pull request #8315 from lafirest/v5.0

feat: add exclusive subscription
This commit is contained in:
JianBo He 2022-07-01 16:00:10 +08:00 committed by GitHub
commit a13d726d17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 189 additions and 20 deletions

View File

@ -754,6 +754,17 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 <code>zone<
}
}
mqtt_exclusive_subscription {
desc {
en: """Whether to enable support for MQTT exclusive subscription."""
zh: """是否启用对 MQTT 排它订阅的支持。"""
}
label: {
en: """Exclusive Subscription"""
zh: """排它订阅"""
}
}
mqtt_ignore_loop_deliver {
desc {
en: """Ignore loop delivery of messages for MQTT v3.1.1/v3.1.0, similar to <code>No Local</code> subscription option in MQTT 5.0"""
@ -2066,7 +2077,7 @@ Type of the rate limit.
base_listener_enable_authn {
desc {
en: """
Set <code>true</code> (default) to enable client authentication on this listener.
Set <code>true</code> (default) to enable client authentication on this listener.
When set to <code>false</code> clients will be allowed to connect without authentication.
"""
zh: """

View File

@ -5,17 +5,25 @@
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_schema,brutal_purge,soft_purge,[]},
{load_module,emqx_release,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_authentication,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{apply, {emqx_exclusive_subscription, on_add_module, []}},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{<<".*">>,[]}],
[{"5.0.0",
[{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_schema,brutal_purge,soft_purge,[]},
{load_module,emqx_release,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_authentication,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{apply, {emqx_exclusive_subscription, on_delete_module, []}},
{delete_module,emqx_exclusive_subscription},
{load_module,emqx_relup}]},
{<<".*">>,[]}]}.

View File

@ -196,7 +196,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts).
do_unsubscribe(Group, Topic, SubPid, SubOpts),
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
case maps:get(shard, SubOpts, 0) of

View File

@ -1865,8 +1865,8 @@ check_sub_authzs([], _Channel, Acc) ->
%%--------------------------------------------------------------------
%% Check Sub Caps
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
%%--------------------------------------------------------------------
%% Enrich SubId

View File

@ -0,0 +1,119 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exclusive_subscription).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[exclusive]").
%% Mnesia bootstrap
-export([mnesia/1]).
%% For upgrade
-export([on_add_module/0, on_delete_module/0]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([
check_subscribe/2,
unsubscribe/2
]).
-record(exclusive_subscription, {
topic :: emqx_types:topic(),
clientid :: emqx_types:clientid()
}).
-define(TAB, emqx_exclusive_subscription).
-define(EXCLUSIVE_SHARD, emqx_exclusive_shard).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
StoreProps = [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
],
ok = mria:create_table(?TAB, [
{rlog_shard, ?EXCLUSIVE_SHARD},
{type, set},
{storage, ram_copies},
{record_name, exclusive_subscription},
{attributes, record_info(fields, exclusive_subscription)},
{storage_properties, StoreProps}
]),
ok = mria_rlog:wait_for_shards([?EXCLUSIVE_SHARD], infinity).
%%--------------------------------------------------------------------
%% Upgrade
%%--------------------------------------------------------------------
on_add_module() ->
mnesia(boot).
on_delete_module() ->
mria:clear_table(?EXCLUSIVE_SHARD).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
allow | deny.
check_subscribe(#{clientid := ClientId}, Topic) ->
Fun = fun() ->
try_subscribe(ClientId, Topic)
end,
case mria:transaction(?EXCLUSIVE_SHARD, Fun) of
{atomic, Res} ->
Res;
{aborted, Reason} ->
?SLOG(warning, #{
msg => "Cannot check subscribe ~p due to ~p.", topic => Topic, reason => Reason
}),
deny
end.
unsubscribe(Topic, #{is_exclusive := true}) ->
_ = mria:transaction(?EXCLUSIVE_SHARD, fun() -> mnesia:delete({?TAB, Topic}) end),
ok;
unsubscribe(_Topic, _SubOpts) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
try_subscribe(ClientId, Topic) ->
case mnesia:wread({?TAB, Topic}) of
[] ->
mnesia:write(
?TAB,
#exclusive_subscription{
clientid = ClientId,
topic = Topic
},
write
),
allow;
[_] ->
deny
end.

View File

@ -38,7 +38,8 @@
retain_available => boolean(),
wildcard_subscription => boolean(),
subscription_identifiers => boolean(),
shared_subscription => boolean()
shared_subscription => boolean(),
exclusive_subscription => boolean()
}.
-define(MAX_TOPIC_LEVELS, 65535).
@ -53,7 +54,8 @@
max_topic_levels,
max_qos_allowed,
wildcard_subscription,
shared_subscription
shared_subscription,
exclusive_subscription
]).
-define(DEFAULT_CAPS, #{
@ -65,7 +67,8 @@
retain_available => true,
wildcard_subscription => true,
subscription_identifiers => true,
shared_subscription => true
shared_subscription => true,
exclusive_subscription => false
}).
-spec check_pub(
@ -102,12 +105,12 @@ do_check_pub(_Flags, _Caps) ->
ok.
-spec check_sub(
emqx_types:zone(),
emqx_types:clientinfo(),
emqx_types:topic(),
emqx_types:subopts()
) ->
ok_or_error(emqx_types:reason_code()).
check_sub(Zone, Topic, SubOpts) ->
check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
Caps = maps:with(?SUBCAP_KEYS, get_caps(Zone)),
Flags = lists:foldl(
fun
@ -117,6 +120,8 @@ check_sub(Zone, Topic, SubOpts) ->
Map#{is_wildcard => emqx_topic:wildcard(Topic)};
(shared_subscription, Map) ->
Map#{is_shared => maps:is_key(share, SubOpts)};
(exclusive_subscription, Map) ->
Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)};
%% Ignore
(_Key, Map) ->
Map
@ -124,17 +129,26 @@ check_sub(Zone, Topic, SubOpts) ->
#{},
maps:keys(Caps)
),
do_check_sub(Flags, Caps).
do_check_sub(Flags, Caps, ClientInfo, Topic).
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) when
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when
Limit > 0, Levels > Limit
->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) ->
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(_Flags, _Caps) ->
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
deny ->
{error, ?RC_QUOTA_EXCEEDED};
_ ->
ok
end;
do_check_sub(_Flags, _Caps, _, _) ->
ok.
get_caps(Zone) ->

View File

@ -439,6 +439,14 @@ fields("mqtt") ->
desc => ?DESC(mqtt_shared_subscription)
}
)},
{"exclusive_subscription",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_exclusive_subscription)
}
)},
{"ignore_loop_deliver",
sc(
boolean(),

View File

@ -226,5 +226,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
_ -> error({invalid_topic_filter, TopicFilter})
end
end;
parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
case Topic of
<<>> ->
error({invalid_topic_filter, TopicFilter});
_ ->
{Topic, Options#{is_exclusive => true}}
end;
parse(TopicFilter, Options) ->
{TopicFilter, Options}.

View File

@ -55,17 +55,18 @@ t_check_sub(_) ->
emqx_config:put_zone_conf(default, [mqtt, shared_subscription], false),
emqx_config:put_zone_conf(default, [mqtt, wildcard_subscription], false),
timer:sleep(50),
ok = emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts),
ClientInfo = #{zone => default},
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
?assertEqual(
{error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(default, <<"a/b/c/d">>, SubOpts)
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)
),
?assertEqual(
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(default, <<"+/#">>, SubOpts)
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)
),
?assertEqual(
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(default, <<"topic">>, SubOpts#{share => true})
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})
),
emqx_config:put([zones], OldConf).