Merge pull request #8306 from emqx/copy-of_main-v4.3

Merge main-v4.3 to main-v4.4
This commit is contained in:
Xinyu Liu 2022-06-23 23:11:13 +08:00 committed by GitHub
commit 23ed374055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 369 additions and 51 deletions

View File

@ -17,12 +17,18 @@ File format:
- Add the possibility of configuring the password for
password-protected private key files used for dashboard and
management HTTPS listeners. [#8129]
- Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag.
- Add supports specifying the network interface address of the cluster listener & rcp call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address.
### Bug fixes
- Avoid repeated writing `loaded_plugins` file if the plugin enable stauts has not changed [#8179]
- Correctly tally `connack.auth_error` metrics when a client uses MQTT
3.1. [#8177]
- Do not match ACL rules containing placeholders if there's no
information to fill them. [#8280]
- Fixed issue in Lua hook that didn't prevent a topic from being
subscribed to. [#8288]
## v4.3.15

View File

@ -1,6 +1,6 @@
{application, emqx_lua_hook,
[{description, "EMQ X Lua Hooks"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib]},

View File

@ -0,0 +1,5 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}],
[{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}.

View File

@ -169,8 +169,11 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties,
NewTopicFilters =
lists:foldr(fun(TopicFilter, Acc) ->
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
false -> Acc;
NewTopicFilter -> [NewTopicFilter | Acc]
false ->
{Topic, Opts} = TopicFilter,
[{Topic, Opts#{delete => true}} | Acc];
NewTopicFilter ->
[NewTopicFilter | Acc]
end
end, [], TopicFilters),
case NewTopicFilters of

View File

@ -37,7 +37,8 @@ all() ->
case101,
case110, case111, case112, case113, case114, case115,
case201, case202, case203, case204, case205,
case301, case302
case301, case302,
t_stop_sub
].
init_per_suite(Config) ->
@ -691,3 +692,26 @@ case302(_Config) ->
},
?assertEqual(allow, emqx_hooks:run_fold('client.check_acl',
[ClientInfo, publish, <<"mytopic">>], deny)).
t_stop_sub(_Config) ->
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
Code = "function on_client_subscribe(clientid, username, topic)"
"\n return false"
"\nend"
"\n"
"\nfunction register_hook()"
"\n return \"on_client_subscribe\""
"\nend",
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
ClientInfo = #{clientid => undefined,
username => <<"test">>,
peerhost => {127, 0, 0, 1},
password => <<"mqtt">>
},
OriginalTopicFilters = [{Topic = <<"u">>,
Opts = #{nl => 0,qos => 0,rap => 0,rh => 0}}],
Props = #{},
Expected = [{Topic, Opts#{delete => true}}],
?assertEqual(Expected, emqx_hooks:run_fold('client.subscribe',
[ClientInfo, Props],
OriginalTopicFilters)).

View File

@ -218,6 +218,22 @@ import_rule(#{<<"id">> := RuleId,
map_to_actions(Maps) ->
[map_to_action(M) || M <- Maps].
map_to_action(Map = #{<<"id">> := ActionInstId,
<<"name">> := <<"data_to_kafka">>,
<<"args">> := Args}) ->
NArgs =
case maps:get(<<"strategy">>, Args, undefined) of
<<"first_key_dispatch">> ->
%% Old version(4.2.x) is first_key_dispatch.
%% Now is key_dispatch.
Args#{<<"strategy">> => <<"key_dispatch">>};
_ ->
Args
end,
#{id => ActionInstId,
name => 'data_to_kafka',
args => NArgs,
fallbacks => map_to_actions(maps:get(<<"fallbacks">>, Map, []))};
map_to_action(Map = #{<<"id">> := ActionInstId, <<"name">> := Name, <<"args">> := Args}) ->
#{id => ActionInstId,
name => any_to_atom(Name),

View File

@ -747,6 +747,11 @@ mqtt.wildcard_subscription = true
## Value: boolean
mqtt.shared_subscription = true
## Whether the Server supports MQTT Exclusive Subscriptions.
##
## Value: boolean
mqtt.exclusive_subscription = false
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## Value: true | false
@ -852,6 +857,11 @@ zone.external.force_gc_policy = 16000|16MB
## Value: boolean
## zone.external.shared_subscription = false
## Whether the Server supports MQTT Exclusive Subscriptions.
##
## Value: boolean
## zone.external.exclusive_subscription = false
## Server Keep Alive
##
## Value: Number
@ -1054,6 +1064,11 @@ zone.internal.acl_deny_action = ignore
## Value: boolean
## zone.internal.shared_subscription = true
## Whether the Server supports MQTT Exclusive Subscriptions.
##
## Value: boolean
## zone.internal.exclusive_subscription = false
## See zone.$name.max_subscriptions.
##
## Value: Integer

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.4.5-beta.1"}).
-define(EMQX_RELEASE, {opensource, "4.4.5-beta.2"}).
-else.

View File

@ -980,6 +980,12 @@ end}.
{datatype, string}
]}.
%% @doc Whether the Server supports Exclusive Subscriptions.
{mapping, "mqtt.exclusive_subscription", "emqx.exclusive_subscription", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------
@ -1241,6 +1247,12 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Whether the Server supports Exclusive Subscriptions.
{mapping, "zone.$name.exclusive_subscription", "emqx.zones", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.zones", fun(Conf) ->
Ratelimit = fun(Val) ->
[L, D] = string:tokens(Val, ", "),

View File

@ -2,12 +2,24 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.4",
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{add_module,emqx_calendar},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
@ -27,6 +39,11 @@
{load_module,emqx_relup}]},
{"4.4.2",
[{add_module,emqx_calendar},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -49,6 +66,10 @@
{load_module,emqx_relup}]},
{"4.4.1",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{add_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
@ -81,6 +102,10 @@
{"4.4.0",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_calendar},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{add_module,emqx_exclusive_subscription},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -115,12 +140,24 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.4",
[{load_module,emqx_relup,brutal_purge,soft_purge,[]},
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
@ -138,7 +175,12 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.2",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -159,7 +201,11 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_relup}]},
{"4.4.1",
[{delete_module,emqx_calendar},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
@ -189,7 +235,11 @@
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{delete_module,emqx_relup}]},
{"4.4.0",
[{load_module,emqx_packet,brutal_purge,soft_purge,[]},
[{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]},
{load_module,emqx_topic,brutal_purge,soft_purge,[]},
{delete_module,emqx_exclusive_subscription},
{load_module,emqx_packet,brutal_purge,soft_purge,[]},
{delete_module,emqx_calendar},
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},

View File

@ -133,9 +133,13 @@ match_who(_ClientInfo, _Who) ->
match_topics(_ClientInfo, _Topic, []) ->
false;
match_topics(ClientInfo, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(ClientInfo, PatternFilter),
case feed_var(ClientInfo, PatternFilter) of
nomatch ->
false;
TopicFilter ->
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(ClientInfo, Topic, Filters);
orelse match_topics(ClientInfo, Topic, Filters)
end;
match_topics(ClientInfo, Topic, [TopicFilter|Filters]) ->
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(ClientInfo, Topic, Filters).
@ -149,12 +153,16 @@ feed_var(ClientInfo, Pattern) ->
feed_var(ClientInfo, Pattern, []).
feed_var(_ClientInfo, [], Acc) ->
lists:reverse(Acc);
feed_var(ClientInfo = #{clientid := undefined}, [<<"%c">>|Words], Acc) ->
feed_var(ClientInfo, Words, [<<"%c">>|Acc]);
feed_var(#{clientid := undefined}, [<<"%c">>|_Words], _Acc) ->
%% we return an impossible to match value to avoid allowing a
%% client to pub/sub to the literal `%c' topic unintentionally.
nomatch;
feed_var(ClientInfo = #{clientid := ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(ClientInfo, Words, [ClientId |Acc]);
feed_var(ClientInfo = #{username := undefined}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [<<"%u">>|Acc]);
feed_var(#{username := undefined}, [<<"%u">>|_Words], _Acc) ->
%% we return an impossible to match value to avoid allowing a
%% client to pub/sub to the literal `%u' topic unintentionally.
nomatch;
feed_var(ClientInfo = #{username := Username}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [Username|Acc]);
feed_var(ClientInfo, [W|Words], Acc) ->

View File

@ -183,7 +183,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts).
do_unsubscribe(Group, Topic, SubPid, SubOpts),
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
clean_subscribe(SubOpts, Topic, SubPid);

View File

@ -441,11 +441,6 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
end, TupleTopicFilters0) of
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false ->
Replace = fun
_Fun(TupleList, [ Tuple = {Key, _Value} | More]) ->
_Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More);
_Fun(TupleList, []) -> TupleList
end,
TopicFilters2 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0],
TopicFilters3 = run_hooks('client.subscribe',
[ClientInfo, Properties],
@ -453,7 +448,16 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
{TupleTopicFilters1, NChannel} = process_subscribe(TopicFilters3,
Properties,
Channel),
TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1),
TupleTopicFilters2 =
lists:foldl(
fun({{Topic, Opts = #{delete := true}}, _QoS}, Acc) ->
Key = {Topic, maps:without([delete], Opts)},
lists:keydelete(Key, 1, Acc);
(Tuple = {Key, _Value}, Acc) ->
lists:keyreplace(Key, 1, Acc, Tuple)
end,
TupleTopicFilters0,
TupleTopicFilters1),
ReasonCodes2 = [ ReasonCode
|| {_TopicFilter, ReasonCode} <- TupleTopicFilters2],
handle_out(suback, {PacketId, ReasonCodes2}, NChannel)
@ -1517,8 +1521,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) ->
%%--------------------------------------------------------------------
%% Check Sub Caps
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) ->
emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts).
check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) ->
emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts).
%%--------------------------------------------------------------------
%% Enrich SubId

View File

@ -181,7 +181,6 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
[] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] ->
?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]),
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
end,
{reply, ok, next_seq(State)};

View File

@ -0,0 +1,103 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exclusive_subscription).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[exclusive]").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([
check_subscribe/2,
unsubscribe/2
]).
-record(exclusive_subscription, {
topic :: emqx_types:topic(),
clientid :: emqx_types:clientid()
}).
-define(TAB, emqx_exclusive_subscription).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
StoreProps = [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
],
ok = ekka_mnesia:create_table(?TAB, [
{type, set},
{ram_copies, [node()]},
{record_name, exclusive_subscription},
{attributes, record_info(fields, exclusive_subscription)},
{storage_properties, StoreProps}
]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, ram_copies).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
allow | deny.
check_subscribe(#{clientid := ClientId}, Topic) ->
Fun = fun() ->
try_subscribe(ClientId, Topic)
end,
case mnesia:transaction(Fun) of
{atomic, Res} ->
Res;
{aborted, Reason} ->
?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]),
deny
end.
unsubscribe(Topic, #{is_exclusive := true}) ->
_ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end),
ok;
unsubscribe(_Topic, _SubOpts) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
try_subscribe(ClientId, Topic) ->
case mnesia:wread({?TAB, Topic}) of
[] ->
mnesia:write(
?TAB,
#exclusive_subscription{
clientid = ClientId,
topic = Topic
},
write
),
allow;
[_] ->
deny
end.

View File

@ -43,7 +43,8 @@
retain_available => boolean(),
wildcard_subscription => boolean(),
subscription_identifiers => boolean(),
shared_subscription => boolean()
shared_subscription => boolean(),
exclusive_subscription => boolean()
}).
-define(UNLIMITED, 0).
@ -56,7 +57,8 @@
-define(SUBCAP_KEYS, [max_topic_levels,
max_qos_allowed,
wildcard_subscription,
shared_subscription
shared_subscription,
exclusive_subscription
]).
-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
@ -67,7 +69,8 @@
retain_available => true,
wildcard_subscription => true,
subscription_identifiers => true,
shared_subscription => true
shared_subscription => true,
exclusive_subscription => false
}).
-spec(check_pub(emqx_types:zone(),
@ -93,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) ->
{error, ?RC_RETAIN_NOT_SUPPORTED};
do_check_pub(_Flags, _Caps) -> ok.
-spec(check_sub(emqx_types:zone(),
-spec(check_sub(emqx_types:clientinfo(),
emqx_types:topic(),
emqx_types:subopts())
-> ok_or_error(emqx_types:reason_code())).
check_sub(Zone, Topic, SubOpts) ->
check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) ->
Caps = get_caps(Zone, subscribe),
Flags = lists:foldl(
fun(max_topic_levels, Map) ->
@ -106,18 +109,29 @@ check_sub(Zone, Topic, SubOpts) ->
Map#{is_wildcard => emqx_topic:wildcard(Topic)};
(shared_subscription, Map) ->
Map#{is_shared => maps:is_key(share, SubOpts)};
(exclusive_subscription, Map) ->
Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)};
(_Key, Map) -> Map %% Ignore
end, #{}, maps:keys(Caps)),
do_check_sub(Flags, Caps).
do_check_sub(Flags, Caps, ClientInfo, Topic).
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit})
do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _)
when Limit > 0, Levels > Limit ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) ->
do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) ->
{error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(#{is_shared := true}, #{shared_subscription := false}) ->
do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
do_check_sub(_Flags, _Caps) -> ok.
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) ->
{error, ?RC_TOPIC_FILTER_INVALID};
do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) ->
case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of
deny ->
{error, ?RC_QUOTA_EXCEEDED};
_ ->
ok
end;
do_check_sub(_Flags, _Caps, _, _) -> ok.
default_caps() ->
?DEFAULT_CAPS.

View File

@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
_ -> error({invalid_topic_filter, TopicFilter})
end
end;
parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) ->
case Topic of
<<>> ->
error({invalid_topic_filter, TopicFilter});
_ ->
{Topic, Options#{is_exclusive => true}}
end;
parse(TopicFilter, Options) ->
{TopicFilter, Options}.

View File

@ -57,6 +57,37 @@ t_compile(_) ->
?assertEqual(Compile4, emqx_access_rule:compile(Rule4)),
?assertEqual(Compile5, emqx_access_rule:compile(Rule5)).
t_unmatching_placeholders(_Config) ->
EmptyClientInfo = #{ clientid => undefined
, username => undefined
},
Topic1 = <<"%u">>,
Rule1 = {allow, all, pubsub, <<"%u">>},
Compiled1 = emqx_access_rule:compile(Rule1),
?assertEqual(
nomatch,
emqx_access_rule:match(EmptyClientInfo, Topic1, Compiled1)),
Rule2 = {allow, all, pubsub, [{eq, <<"%u">>}]},
Compiled2 = emqx_access_rule:compile(Rule2),
?assertEqual(
{matched, allow},
emqx_access_rule:match(EmptyClientInfo, Topic1, Compiled2)),
Topic2 = <<"%c">>,
Rule3 = {allow, all, pubsub, <<"%c">>},
Compiled3 = emqx_access_rule:compile(Rule3),
?assertEqual(
nomatch,
emqx_access_rule:match(EmptyClientInfo, Topic2, Compiled3)),
Rule4 = {allow, all, pubsub, [{eq, <<"%c">>}]},
Compiled4 = emqx_access_rule:compile(Rule4),
?assertEqual(
{matched, allow},
emqx_access_rule:match(EmptyClientInfo, Topic2, Compiled4)),
ok.
t_match(_) ->
ClientInfo1 = #{zone => external,
clientid => <<"testClient">>,

View File

@ -619,6 +619,26 @@ t_connack_auth_error(Config) when is_list(Config) ->
?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')),
ok.
t_handle_in_empty_client_subscribe_hook({init, Config}) ->
Config;
t_handle_in_empty_client_subscribe_hook({'end', _Config}) ->
ok;
t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) ->
Hook = fun(_ClientInfo, _Username, TopicFilter) ->
EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter],
{stop, EmptyFilters}
end,
ok = emqx:hook('client.subscribe', Hook, []),
try
{ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C),
{ok, _, RCs} = emqtt:subscribe(C, <<"t">>),
?assertEqual([], RCs),
ok
after
ok = emqx:unhook('client.subscribe', Hook)
end.
wait_for_events(Action, Kinds) ->
wait_for_events(Action, Kinds, 500).

View File

@ -53,11 +53,12 @@ t_check_sub(_) ->
},
emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps),
timer:sleep(50),
ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts),
ClientInfo = #{zone => zone},
ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts),
?assertEqual({error, ?RC_TOPIC_FILTER_INVALID},
emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)),
emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)),
?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)),
emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)),
?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED},
emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})),
emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})),
emqx_zone:unset_env(zone, '$mqtt_pub_caps').