diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 0fc9975cc..76e0997a1 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -183,8 +183,7 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts), - emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> clean_subscribe(SubOpts, Topic, SubPid); @@ -344,9 +343,12 @@ subscriber_down(SubPid) -> clean_subscribe(SubOpts, Topic, SubPid) -> case maps:get(shard, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + 0 -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = emqx_exclusive_subscription:unsubscribe(Topic, SubOpts), ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + I -> + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end. diff --git a/test/emqx_exclusive_sub_SUITE.erl b/test/emqx_exclusive_sub_SUITE.erl new file mode 100644 index 000000000..e5eec83e9 --- /dev/null +++ b/test/emqx_exclusive_sub_SUITE.erl @@ -0,0 +1,170 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_sub_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TAB, emqx_exclusive_subscription). +-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>). +-define(NORMAL_TOPIC, <<"t/1">>). + +-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)). +-define(CHECK_SUB(Client, Topic, Code), + {ok, _, [Code]} = emqtt:subscribe(Client, Topic, []) +). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + enable_exclusive_sub(true), + Config. + +end_per_suite(_Config) -> + reset_zone_env(), + emqx_ct_helpers:stop_apps([]). + +end_per_testcase(_TestCase, _Config) -> + mnesia:clear_table(?TAB). + +%% test that this feature is working during the whole session life cycle +t_exclusive_sub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {clean_start, false}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 100}} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {clean_start, false}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + %% keep exclusive even disconnected + ok = emqtt:disconnect(C1), + timer:sleep(1000), + + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + ok = emqtt:disconnect(C2). + +%% test this feature does not interfere with normal subs +t_allow_normal_sub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?NORMAL_TOPIC, 0), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + +%% test the exclusive topics can be released correctly +t_unsub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + {ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC), + + ?CHECK_SUB(C2, 0), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + +%% test whether the exclusive topics would auto-clean after the session was cleaned +t_clean_session(_) -> + erlang:process_flag(trap_exit, true), + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {clean_start, true}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 0}} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + %% auto clean when session was cleand + ok = emqtt:disconnect(C1), + + timer:sleep(1000), + + ?CHECK_SUB(C2, 0), + + ok = emqtt:disconnect(C2). + +%% test the feature switch +t_feat_disabled(_) -> + enable_exclusive_sub(false), + + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID), + ok = emqtt:disconnect(C1), + + enable_exclusive_sub(true). + +enable_exclusive_sub(Enable) -> + emqx_zone:set_env( + external, + '$mqtt_sub_caps', + #{exclusive_subscription => Enable} + ), + timer:sleep(50). + +reset_zone_env() -> + emqx_zone:unset_env(external, '$mqtt_sub_caps'), + timer:sleep(50).