Merge pull request #9868 from lafirest/fix/exclusive-unsub-v4.4

fix: the exclusive topics aren't removed when the session has already been cleaned
This commit is contained in:
lafirest 2023-02-01 18:50:09 +08:00 committed by GitHub
commit b73756e4cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 220 additions and 24 deletions

View File

@ -23,3 +23,5 @@
``` ```
- Avoid crash logs in CoAP gateway when receiving liveness checking packets from Load Balancer [#9869](https://github.com/emqx/emqx/pull/9869). - Avoid crash logs in CoAP gateway when receiving liveness checking packets from Load Balancer [#9869](https://github.com/emqx/emqx/pull/9869).
- Fix the exclusive topics aren't removed when the session has already been cleaned [#9868](https://github.com/emqx/emqx/pull/9868).

View File

@ -22,3 +22,5 @@
``` ```
- 修复 CoAP 网关在收到负载均衡的心跳检查报文时产生的崩溃日志 [#9869](https://github.com/emqx/emqx/pull/9869)。 - 修复 CoAP 网关在收到负载均衡的心跳检查报文时产生的崩溃日志 [#9869](https://github.com/emqx/emqx/pull/9869)。
- 修复会话关闭后,其持有的排他订阅主题没有被释放的问题 [#9868](https://github.com/emqx/emqx/pull/9868)。

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.4.14", [{"4.4.14",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -11,7 +12,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]},
{"4.4.13", {"4.4.13",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -21,7 +23,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.12", {"4.4.12",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -31,7 +34,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.11", {"4.4.11",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -44,7 +48,8 @@
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.4.10", {"4.4.10",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_cover}, {add_module,emqx_cover},
@ -67,7 +72,8 @@
{apply,{application,set_env, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]}, [gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.9", {"4.4.9",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_cover}, {add_module,emqx_cover},
@ -96,7 +102,8 @@
{apply,{application,set_env, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]}, [gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.8", {"4.4.8",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_cover}, {add_module,emqx_cover},
@ -126,7 +133,8 @@
{apply,{application,set_env, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]}, [gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.7", {"4.4.7",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_cover}, {add_module,emqx_cover},
@ -156,7 +164,8 @@
{apply,{application,set_env, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]}, [gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.6", {"4.4.6",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_cover}, {add_module,emqx_cover},
@ -186,7 +195,8 @@
{apply,{application,set_env, {apply,{application,set_env,
[gen_rpc,insecure_auth_fallback_allowed,true]}}]}, [gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{"4.4.5", {"4.4.5",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{add_module,emqx_cover}, {add_module,emqx_cover},
@ -451,7 +461,8 @@
[gen_rpc,insecure_auth_fallback_allowed,true]}}]}, [gen_rpc,insecure_auth_fallback_allowed,true]}}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.4.14", [{"4.4.14",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -460,7 +471,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]},
{"4.4.13", {"4.4.13",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -470,7 +482,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.12", {"4.4.12",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -480,7 +493,8 @@
{load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.11", {"4.4.11",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -493,7 +507,8 @@
{load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]},
{delete_module,emqx_cover}]}, {delete_module,emqx_cover}]},
{"4.4.10", {"4.4.10",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -513,7 +528,8 @@
{delete_module,emqx_crl_cache}, {delete_module,emqx_crl_cache},
{delete_module,emqx_ocsp_cache}]}, {delete_module,emqx_ocsp_cache}]},
{"4.4.9", {"4.4.9",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -538,7 +554,8 @@
{delete_module,emqx_crl_cache}, {delete_module,emqx_crl_cache},
{delete_module,emqx_ocsp_cache}]}, {delete_module,emqx_ocsp_cache}]},
{"4.4.8", {"4.4.8",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -564,7 +581,8 @@
{delete_module,emqx_crl_cache}, {delete_module,emqx_crl_cache},
{delete_module,emqx_ocsp_cache}]}, {delete_module,emqx_ocsp_cache}]},
{"4.4.7", {"4.4.7",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -590,7 +608,8 @@
{delete_module,emqx_crl_cache}, {delete_module,emqx_crl_cache},
{delete_module,emqx_ocsp_cache}]}, {delete_module,emqx_ocsp_cache}]},
{"4.4.6", {"4.4.6",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -616,7 +635,8 @@
{delete_module,emqx_crl_cache}, {delete_module,emqx_crl_cache},
{delete_module,emqx_ocsp_cache}]}, {delete_module,emqx_ocsp_cache}]},
{"4.4.5", {"4.4.5",
[{load_module,emqx_banned,brutal_purge,soft_purge,[]}, [{load_module,emqx_broker,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]},
{load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},

View File

@ -183,8 +183,7 @@ do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete(?SUBOPTION, {SubPid, Topic}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined), Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts), do_unsubscribe(Group, Topic, SubPid, SubOpts).
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts).
do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
clean_subscribe(SubOpts, Topic, SubPid); clean_subscribe(SubOpts, Topic, SubPid);
@ -344,9 +343,12 @@ subscriber_down(SubPid) ->
clean_subscribe(SubOpts, Topic, SubPid) -> clean_subscribe(SubOpts, Topic, SubPid) ->
case maps:get(shard, SubOpts, 0) of case maps:get(shard, SubOpts, 0) of
0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), 0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
ok = emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
ok = cast(pick(Topic), {unsubscribed, Topic}); ok = cast(pick(Topic), {unsubscribed, Topic});
I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), I ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
end. end.

View File

@ -0,0 +1,170 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exclusive_sub_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(TAB, emqx_exclusive_subscription).
-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>).
-define(NORMAL_TOPIC, <<"t/1">>).
-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)).
-define(CHECK_SUB(Client, Topic, Code),
{ok, _, [Code]} = emqtt:subscribe(Client, Topic, [])
).
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([]),
enable_exclusive_sub(true),
Config.
end_per_suite(_Config) ->
reset_zone_env(),
emqx_ct_helpers:stop_apps([]).
end_per_testcase(_TestCase, _Config) ->
mnesia:clear_table(?TAB).
%% test that this feature is working during the whole session life cycle
t_exclusive_sub(_) ->
{ok, C1} = emqtt:start_link([
{clientid, <<"client1">>},
{clean_start, false},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 100}}
]),
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{clean_start, false},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C2),
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
%% keep exclusive even disconnected
ok = emqtt:disconnect(C1),
timer:sleep(1000),
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
ok = emqtt:disconnect(C2).
%% test this feature does not interfere with normal subs
t_allow_normal_sub(_) ->
{ok, C1} = emqtt:start_link([
{clientid, <<"client1">>},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C2),
?CHECK_SUB(C2, ?NORMAL_TOPIC, 0),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2).
%% test the exclusive topics can be released correctly
t_unsub(_) ->
{ok, C1} = emqtt:start_link([
{clientid, <<"client1">>},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C2),
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC),
?CHECK_SUB(C2, 0),
ok = emqtt:disconnect(C1),
ok = emqtt:disconnect(C2).
%% test whether the exclusive topics would auto-clean after the session was cleaned
t_clean_session(_) ->
erlang:process_flag(trap_exit, true),
{ok, C1} = emqtt:start_link([
{clientid, <<"client1">>},
{clean_start, true},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 0}}
]),
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C2),
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
%% auto clean when session was cleand
ok = emqtt:disconnect(C1),
timer:sleep(1000),
?CHECK_SUB(C2, 0),
ok = emqtt:disconnect(C2).
%% test the feature switch
t_feat_disabled(_) ->
enable_exclusive_sub(false),
{ok, C1} = emqtt:start_link([
{clientid, <<"client1">>},
{proto_ver, v5}
]),
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID),
ok = emqtt:disconnect(C1),
enable_exclusive_sub(true).
enable_exclusive_sub(Enable) ->
emqx_zone:set_env(
external,
'$mqtt_sub_caps',
#{exclusive_subscription => Enable}
),
timer:sleep(50).
reset_zone_env() ->
emqx_zone:unset_env(external, '$mqtt_sub_caps'),
timer:sleep(50).