diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 7cc4ac787..818087c37 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -17,12 +17,18 @@ File format: - Add the possibility of configuring the password for password-protected private key files used for dashboard and management HTTPS listeners. [#8129] +- Add message republish supports using placeholder variables to specify QoS and Retain values. Set `${qos}` and `${flags.retain}` use the original QoS & Retain flag. +- Add supports specifying the network interface address of the cluster listener & rcp call listener. Specify `0.0.0.0` use all network interfaces, or a particular network interface IP address. ### Bug fixes - Avoid repeated writing `loaded_plugins` file if the plugin enable stauts has not changed [#8179] - Correctly tally `connack.auth_error` metrics when a client uses MQTT 3.1. [#8177] +- Do not match ACL rules containing placeholders if there's no + information to fill them. [#8280] +- Fixed issue in Lua hook that didn't prevent a topic from being + subscribed to. [#8288] ## v4.3.15 diff --git a/apps/emqx_lua_hook/src/emqx_lua_hook.app.src b/apps/emqx_lua_hook/src/emqx_lua_hook.app.src index 627c8e29d..6f60c4a1a 100644 --- a/apps/emqx_lua_hook/src/emqx_lua_hook.app.src +++ b/apps/emqx_lua_hook/src/emqx_lua_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_lua_hook, [{description, "EMQ X Lua Hooks"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src b/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src new file mode 100644 index 000000000..7e91ac8c4 --- /dev/null +++ b/apps/emqx_lua_hook/src/emqx_lua_hook.appup.src @@ -0,0 +1,5 @@ +%% -*- mode: erlang -*- +%% Unless you know what you are doing, DO NOT edit manually!! +{VSN, + [{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}], + [{"4.3.0",[{load_module,emqx_lua_script,brutal_purge,soft_purge,[]}]}]}. diff --git a/apps/emqx_lua_hook/src/emqx_lua_script.erl b/apps/emqx_lua_hook/src/emqx_lua_script.erl index 6e0752d53..b54eaf571 100644 --- a/apps/emqx_lua_hook/src/emqx_lua_script.erl +++ b/apps/emqx_lua_hook/src/emqx_lua_script.erl @@ -169,8 +169,11 @@ on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, NewTopicFilters = lists:foldr(fun(TopicFilter, Acc) -> case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of - false -> Acc; - NewTopicFilter -> [NewTopicFilter | Acc] + false -> + {Topic, Opts} = TopicFilter, + [{Topic, Opts#{delete => true}} | Acc]; + NewTopicFilter -> + [NewTopicFilter | Acc] end end, [], TopicFilters), case NewTopicFilters of diff --git a/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl b/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl index dc1889f1f..ea75b138c 100644 --- a/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl +++ b/apps/emqx_lua_hook/test/emqx_lua_hook_SUITE.erl @@ -37,7 +37,8 @@ all() -> case101, case110, case111, case112, case113, case114, case115, case201, case202, case203, case204, case205, - case301, case302 + case301, case302, + t_stop_sub ]. init_per_suite(Config) -> @@ -214,8 +215,8 @@ case31(_Config) -> "\n return \"on_client_connected\"" "\nend", ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(), - ?assertEqual(ok, - emqx_hooks:run('client.connected', + ?assertEqual(ok, + emqx_hooks:run('client.connected', [#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])). case32(_Config) -> @@ -228,8 +229,8 @@ case32(_Config) -> "\n return \"on_client_connected\"" "\nend", ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(), - ?assertEqual(ok, - emqx_hooks:run('client.connected', + ?assertEqual(ok, + emqx_hooks:run('client.connected', [#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])). case41(_Config) -> @@ -336,8 +337,8 @@ case61(_Config) -> "\nend", ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(), - ?assertEqual(ok, - emqx_hooks:run('client.disconnected', + ?assertEqual(ok, + emqx_hooks:run('client.disconnected', [#{clientid => <<"myclient">>, username => <<"tester">>}, 0])). case62(_Config) -> @@ -351,8 +352,8 @@ case62(_Config) -> "\nend", ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(), - ?assertEqual(ok, - emqx_hooks:run('client.disconnected', + ?assertEqual(ok, + emqx_hooks:run('client.disconnected', [#{clientid => <<"myclient">>, username => <<"tester">>}, 0])). case71(_Config) -> @@ -691,3 +692,26 @@ case302(_Config) -> }, ?assertEqual(allow, emqx_hooks:run_fold('client.check_acl', [ClientInfo, publish, <<"mytopic">>], deny)). + +t_stop_sub(_Config) -> + ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]), + Code = "function on_client_subscribe(clientid, username, topic)" + "\n return false" + "\nend" + "\n" + "\nfunction register_hook()" + "\n return \"on_client_subscribe\"" + "\nend", + ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(), + ClientInfo = #{clientid => undefined, + username => <<"test">>, + peerhost => {127, 0, 0, 1}, + password => <<"mqtt">> + }, + OriginalTopicFilters = [{Topic = <<"u">>, + Opts = #{nl => 0,qos => 0,rap => 0,rh => 0}}], + Props = #{}, + Expected = [{Topic, Opts#{delete => true}}], + ?assertEqual(Expected, emqx_hooks:run_fold('client.subscribe', + [ClientInfo, Props], + OriginalTopicFilters)). diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 7b31dd8be..87663f2be 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -218,6 +218,22 @@ import_rule(#{<<"id">> := RuleId, map_to_actions(Maps) -> [map_to_action(M) || M <- Maps]. +map_to_action(Map = #{<<"id">> := ActionInstId, + <<"name">> := <<"data_to_kafka">>, + <<"args">> := Args}) -> + NArgs = + case maps:get(<<"strategy">>, Args, undefined) of + <<"first_key_dispatch">> -> + %% Old version(4.2.x) is first_key_dispatch. + %% Now is key_dispatch. + Args#{<<"strategy">> => <<"key_dispatch">>}; + _ -> + Args + end, + #{id => ActionInstId, + name => 'data_to_kafka', + args => NArgs, + fallbacks => map_to_actions(maps:get(<<"fallbacks">>, Map, []))}; map_to_action(Map = #{<<"id">> := ActionInstId, <<"name">> := Name, <<"args">> := Args}) -> #{id => ActionInstId, name => any_to_atom(Name), diff --git a/etc/emqx.conf b/etc/emqx.conf index 39f0d3b85..2942362d7 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -747,6 +747,11 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +mqtt.exclusive_subscription = false + ## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) ## ## Value: true | false @@ -852,6 +857,11 @@ zone.external.force_gc_policy = 16000|16MB ## Value: boolean ## zone.external.shared_subscription = false +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +## zone.external.exclusive_subscription = false + ## Server Keep Alive ## ## Value: Number @@ -1054,6 +1064,11 @@ zone.internal.acl_deny_action = ignore ## Value: boolean ## zone.internal.shared_subscription = true +## Whether the Server supports MQTT Exclusive Subscriptions. +## +## Value: boolean +## zone.internal.exclusive_subscription = false + ## See zone.$name.max_subscriptions. ## ## Value: Integer diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 49266ff42..5e5771aae 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.4.5-beta.1"}). +-define(EMQX_RELEASE, {opensource, "4.4.5-beta.2"}). -else. diff --git a/priv/emqx.schema b/priv/emqx.schema index b42b40ea6..eccb58c0d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -980,6 +980,12 @@ end}. {datatype, string} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "mqtt.exclusive_subscription", "emqx.exclusive_subscription", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -1241,6 +1247,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Whether the Server supports Exclusive Subscriptions. +{mapping, "zone.$name.exclusive_subscription", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqx.zones", fun(Conf) -> Ratelimit = fun(Val) -> [L, D] = string:tokens(Val, ", "), diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 272b5b58d..7d2ffc119 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,12 +2,24 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.4", - [{load_module,emqx_relup,brutal_purge,soft_purge,[]}, + [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.4.3", [{add_module,emqx_calendar}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, @@ -27,6 +39,11 @@ {load_module,emqx_relup}]}, {"4.4.2", [{add_module,emqx_calendar}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -49,6 +66,10 @@ {load_module,emqx_relup}]}, {"4.4.1", [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, {add_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -81,6 +102,10 @@ {"4.4.0", [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_calendar}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {add_module,emqx_exclusive_subscription}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -115,12 +140,24 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.4", - [{load_module,emqx_relup,brutal_purge,soft_purge,[]}, + [{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, + {load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_channel,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}, + {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}]}, {"4.4.3", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, @@ -138,7 +175,12 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {"4.4.2", - [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}, + {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -159,7 +201,11 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_relup}]}, {"4.4.1", - [{delete_module,emqx_calendar}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}, + {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, @@ -189,7 +235,11 @@ {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {delete_module,emqx_relup}]}, {"4.4.0", - [{load_module,emqx_packet,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqtt_caps,brutal_purge,soft_purge,[]}, + {load_module,emqx_topic,brutal_purge,soft_purge,[]}, + {delete_module,emqx_exclusive_subscription}, + {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {delete_module,emqx_calendar}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index dbf4c66f3..decebb6b0 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -133,9 +133,13 @@ match_who(_ClientInfo, _Who) -> match_topics(_ClientInfo, _Topic, []) -> false; match_topics(ClientInfo, Topic, [{pattern, PatternFilter}|Filters]) -> - TopicFilter = feed_var(ClientInfo, PatternFilter), - match_topic(emqx_topic:words(Topic), TopicFilter) - orelse match_topics(ClientInfo, Topic, Filters); + case feed_var(ClientInfo, PatternFilter) of + nomatch -> + false; + TopicFilter -> + match_topic(emqx_topic:words(Topic), TopicFilter) + orelse match_topics(ClientInfo, Topic, Filters) + end; match_topics(ClientInfo, Topic, [TopicFilter|Filters]) -> match_topic(emqx_topic:words(Topic), TopicFilter) orelse match_topics(ClientInfo, Topic, Filters). @@ -149,12 +153,16 @@ feed_var(ClientInfo, Pattern) -> feed_var(ClientInfo, Pattern, []). feed_var(_ClientInfo, [], Acc) -> lists:reverse(Acc); -feed_var(ClientInfo = #{clientid := undefined}, [<<"%c">>|Words], Acc) -> - feed_var(ClientInfo, Words, [<<"%c">>|Acc]); +feed_var(#{clientid := undefined}, [<<"%c">>|_Words], _Acc) -> + %% we return an impossible to match value to avoid allowing a + %% client to pub/sub to the literal `%c' topic unintentionally. + nomatch; feed_var(ClientInfo = #{clientid := ClientId}, [<<"%c">>|Words], Acc) -> feed_var(ClientInfo, Words, [ClientId |Acc]); -feed_var(ClientInfo = #{username := undefined}, [<<"%u">>|Words], Acc) -> - feed_var(ClientInfo, Words, [<<"%u">>|Acc]); +feed_var(#{username := undefined}, [<<"%u">>|_Words], _Acc) -> + %% we return an impossible to match value to avoid allowing a + %% client to pub/sub to the literal `%u' topic unintentionally. + nomatch; feed_var(ClientInfo = #{username := Username}, [<<"%u">>|Words], Acc) -> feed_var(ClientInfo, Words, [Username|Acc]); feed_var(ClientInfo, [W|Words], Acc) -> diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 50a5f2288..e4aafd957 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -183,7 +183,8 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> clean_subscribe(SubOpts, Topic, SubPid); diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 5eaf88704..e14597f67 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -441,11 +441,6 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), end, TupleTopicFilters0) of true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> - Replace = fun - _Fun(TupleList, [ Tuple = {Key, _Value} | More]) -> - _Fun(lists:keyreplace(Key, 1, TupleList, Tuple), More); - _Fun(TupleList, []) -> TupleList - end, TopicFilters2 = [ TopicFilter || {TopicFilter, 0} <- TupleTopicFilters0], TopicFilters3 = run_hooks('client.subscribe', [ClientInfo, Properties], @@ -453,7 +448,16 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), {TupleTopicFilters1, NChannel} = process_subscribe(TopicFilters3, Properties, Channel), - TupleTopicFilters2 = Replace(TupleTopicFilters0, TupleTopicFilters1), + TupleTopicFilters2 = + lists:foldl( + fun({{Topic, Opts = #{delete := true}}, _QoS}, Acc) -> + Key = {Topic, maps:without([delete], Opts)}, + lists:keydelete(Key, 1, Acc); + (Tuple = {Key, _Value}, Acc) -> + lists:keyreplace(Key, 1, Acc, Tuple) + end, + TupleTopicFilters0, + TupleTopicFilters1), ReasonCodes2 = [ ReasonCode || {_TopicFilter, ReasonCode} <- TupleTopicFilters2], handle_out(suback, {PacketId, ReasonCodes2}, NChannel) @@ -1517,8 +1521,8 @@ check_sub_acl(TopicFilter, #channel{clientinfo = ClientInfo}) -> %%-------------------------------------------------------------------- %% Check Sub Caps -check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> - emqx_mqtt_caps:check_sub(Zone, TopicFilter, SubOpts). +check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = ClientInfo}) -> + emqx_mqtt_caps:check_sub(ClientInfo, TopicFilter, SubOpts). %%-------------------------------------------------------------------- %% Enrich SubId diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 094d97135..bf22e562a 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -181,7 +181,6 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq}) case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of [] -> ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> - ?LOG(warning, "CMD ~s is overidden by ~p", [Cmd, MF]), true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}) end, {reply, ok, next_seq(State)}; diff --git a/src/emqx_exclusive_subscription.erl b/src/emqx_exclusive_subscription.erl new file mode 100644 index 000000000..ab26a6bd6 --- /dev/null +++ b/src/emqx_exclusive_subscription.erl @@ -0,0 +1,103 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_subscription). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-logger_header("[exclusive]"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). +-copy_mnesia({mnesia, [copy]}). + +-export([ + check_subscribe/2, + unsubscribe/2 +]). + +-record(exclusive_subscription, { + topic :: emqx_types:topic(), + clientid :: emqx_types:clientid() +}). + +-define(TAB, emqx_exclusive_subscription). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + StoreProps = [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ], + ok = ekka_mnesia:create_table(?TAB, [ + {type, set}, + {ram_copies, [node()]}, + {record_name, exclusive_subscription}, + {attributes, record_info(fields, exclusive_subscription)}, + {storage_properties, StoreProps} + ]); +mnesia(copy) -> + ok = ekka_mnesia:copy_table(?TAB, ram_copies). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +-spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) -> + allow | deny. +check_subscribe(#{clientid := ClientId}, Topic) -> + Fun = fun() -> + try_subscribe(ClientId, Topic) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?LOG(warning, "Cannot check subscribe ~p due to ~p.", [Topic, Reason]), + deny + end. + +unsubscribe(Topic, #{is_exclusive := true}) -> + _ = mnesia:transaction(fun() -> mnesia:delete({?TAB, Topic}) end), + ok; +unsubscribe(_Topic, _SubOpts) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +try_subscribe(ClientId, Topic) -> + case mnesia:wread({?TAB, Topic}) of + [] -> + mnesia:write( + ?TAB, + #exclusive_subscription{ + clientid = ClientId, + topic = Topic + }, + write + ), + allow; + [_] -> + deny + end. diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 8e94d25a7..f218fa795 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -43,7 +43,8 @@ retain_available => boolean(), wildcard_subscription => boolean(), subscription_identifiers => boolean(), - shared_subscription => boolean() + shared_subscription => boolean(), + exclusive_subscription => boolean() }). -define(UNLIMITED, 0). @@ -56,7 +57,8 @@ -define(SUBCAP_KEYS, [max_topic_levels, max_qos_allowed, wildcard_subscription, - shared_subscription + shared_subscription, + exclusive_subscription ]). -define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE, @@ -67,7 +69,8 @@ retain_available => true, wildcard_subscription => true, subscription_identifiers => true, - shared_subscription => true + shared_subscription => true, + exclusive_subscription => false }). -spec(check_pub(emqx_types:zone(), @@ -93,11 +96,11 @@ do_check_pub(#{retain := true}, #{retain_available := false}) -> {error, ?RC_RETAIN_NOT_SUPPORTED}; do_check_pub(_Flags, _Caps) -> ok. --spec(check_sub(emqx_types:zone(), +-spec(check_sub(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts()) -> ok_or_error(emqx_types:reason_code())). -check_sub(Zone, Topic, SubOpts) -> +check_sub(ClientInfo = #{zone := Zone}, Topic, SubOpts) -> Caps = get_caps(Zone, subscribe), Flags = lists:foldl( fun(max_topic_levels, Map) -> @@ -106,18 +109,29 @@ check_sub(Zone, Topic, SubOpts) -> Map#{is_wildcard => emqx_topic:wildcard(Topic)}; (shared_subscription, Map) -> Map#{is_shared => maps:is_key(share, SubOpts)}; + (exclusive_subscription, Map) -> + Map#{is_exclusive => maps:get(is_exclusive, SubOpts, false)}; (_Key, Map) -> Map %% Ignore end, #{}, maps:keys(Caps)), - do_check_sub(Flags, Caps). + do_check_sub(Flags, Caps, ClientInfo, Topic). -do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}) +do_check_sub(#{topic_levels := Levels}, #{max_topic_levels := Limit}, _, _) when Limit > 0, Levels > Limit -> {error, ?RC_TOPIC_FILTER_INVALID}; -do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}) -> +do_check_sub(#{is_wildcard := true}, #{wildcard_subscription := false}, _, _) -> {error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> +do_check_sub(#{is_shared := true}, #{shared_subscription := false}, _, _) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; -do_check_sub(_Flags, _Caps) -> ok. +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := false}, _, _) -> + {error, ?RC_TOPIC_FILTER_INVALID}; +do_check_sub(#{is_exclusive := true}, #{exclusive_subscription := true}, ClientInfo, Topic) -> + case emqx_exclusive_subscription:check_subscribe(ClientInfo, Topic) of + deny -> + {error, ?RC_QUOTA_EXCEEDED}; + _ -> + ok + end; +do_check_sub(_Flags, _Caps, _, _) -> ok. default_caps() -> ?DEFAULT_CAPS. diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 8c6ac7225..3d758d32e 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -216,6 +216,12 @@ parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> _ -> error({invalid_topic_filter, TopicFilter}) end end; +parse(TopicFilter = <<"$exclusive/", Topic/binary>>, Options) -> + case Topic of + <<>> -> + error({invalid_topic_filter, TopicFilter}); + _ -> + {Topic, Options#{is_exclusive => true}} + end; parse(TopicFilter, Options) -> {TopicFilter, Options}. - diff --git a/test/emqx_access_rule_SUITE.erl b/test/emqx_access_rule_SUITE.erl index 9259307a7..449427590 100644 --- a/test/emqx_access_rule_SUITE.erl +++ b/test/emqx_access_rule_SUITE.erl @@ -57,6 +57,37 @@ t_compile(_) -> ?assertEqual(Compile4, emqx_access_rule:compile(Rule4)), ?assertEqual(Compile5, emqx_access_rule:compile(Rule5)). +t_unmatching_placeholders(_Config) -> + EmptyClientInfo = #{ clientid => undefined + , username => undefined + }, + + Topic1 = <<"%u">>, + Rule1 = {allow, all, pubsub, <<"%u">>}, + Compiled1 = emqx_access_rule:compile(Rule1), + ?assertEqual( + nomatch, + emqx_access_rule:match(EmptyClientInfo, Topic1, Compiled1)), + Rule2 = {allow, all, pubsub, [{eq, <<"%u">>}]}, + Compiled2 = emqx_access_rule:compile(Rule2), + ?assertEqual( + {matched, allow}, + emqx_access_rule:match(EmptyClientInfo, Topic1, Compiled2)), + + Topic2 = <<"%c">>, + Rule3 = {allow, all, pubsub, <<"%c">>}, + Compiled3 = emqx_access_rule:compile(Rule3), + ?assertEqual( + nomatch, + emqx_access_rule:match(EmptyClientInfo, Topic2, Compiled3)), + Rule4 = {allow, all, pubsub, [{eq, <<"%c">>}]}, + Compiled4 = emqx_access_rule:compile(Rule4), + ?assertEqual( + {matched, allow}, + emqx_access_rule:match(EmptyClientInfo, Topic2, Compiled4)), + + ok. + t_match(_) -> ClientInfo1 = #{zone => external, clientid => <<"testClient">>, diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 9c771b705..d009acc6e 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -619,6 +619,26 @@ t_connack_auth_error(Config) when is_list(Config) -> ?assertEqual(2, emqx_metrics:val('packets.connack.auth_error')), ok. +t_handle_in_empty_client_subscribe_hook({init, Config}) -> + Config; +t_handle_in_empty_client_subscribe_hook({'end', _Config}) -> + ok; +t_handle_in_empty_client_subscribe_hook(Config) when is_list(Config) -> + Hook = fun(_ClientInfo, _Username, TopicFilter) -> + EmptyFilters = [{T, Opts#{delete => true}} || {T, Opts} <- TopicFilter], + {stop, EmptyFilters} + end, + ok = emqx:hook('client.subscribe', Hook, []), + try + {ok, C} = emqtt:start_link(), + {ok, _} = emqtt:connect(C), + {ok, _, RCs} = emqtt:subscribe(C, <<"t">>), + ?assertEqual([], RCs), + ok + after + ok = emqx:unhook('client.subscribe', Hook) + end. + wait_for_events(Action, Kinds) -> wait_for_events(Action, Kinds, 500). diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index 992e7743e..c2b1a477b 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -53,11 +53,12 @@ t_check_sub(_) -> }, emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), timer:sleep(50), - ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), + ClientInfo = #{zone => zone}, + ok = emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, - emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), + emqx_mqtt_caps:check_sub(ClientInfo, <<"a/b/c/d">>, SubOpts)), ?assertEqual({error, ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), + emqx_mqtt_caps:check_sub(ClientInfo, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, - emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), + emqx_mqtt_caps:check_sub(ClientInfo, <<"topic">>, SubOpts#{share => true})), emqx_zone:unset_env(zone, '$mqtt_pub_caps').