From 3bdffca4888cf6739600f9f595ec286d9785d1f7 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 30 Jan 2023 18:17:00 +0800 Subject: [PATCH] fix: the exclusive topics aren't removed when the session has already been cleaned --- apps/emqx/src/emqx_broker.erl | 13 +- apps/emqx/src/emqx_exclusive_subscription.erl | 8 +- apps/emqx/test/emqx_exclusive_sub_SUITE.erl | 159 ++++++++++++++++++ 3 files changed, 168 insertions(+), 12 deletions(-) create mode 100644 apps/emqx/test/emqx_exclusive_sub_SUITE.erl diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1c31d86c2..d56620123 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -196,13 +196,13 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {Topic, SubPid}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts), - emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + emqx_exclusive_subscription:unsubscribe(Topic, SubOpts), cast(pick(Topic), {unsubscribed, Topic}); I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), @@ -366,14 +366,7 @@ subscriber_down(SubPid) -> SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), true = ets:delete(?SUBOPTION, {Topic, SubPid}), - case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; + do_unsubscribe(undefined, Topic, SubPid, SubOpts); undefined -> ok end diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl index afb6317b7..a1f7f76ae 100644 --- a/apps/emqx/src/emqx_exclusive_subscription.erl +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -32,7 +32,8 @@ -export([ check_subscribe/2, - unsubscribe/2 + unsubscribe/2, + clear/0 ]). %% Internal exports (RPC) @@ -77,7 +78,7 @@ on_add_module() -> mnesia(boot). on_delete_module() -> - mria:clear_table(?EXCLUSIVE_SHARD). + clear(). %%-------------------------------------------------------------------- %% APIs @@ -101,6 +102,9 @@ unsubscribe(Topic, #{is_exclusive := true}) -> unsubscribe(_Topic, _SubOpts) -> ok. +clear() -> + mria:clear_table(?TAB). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_exclusive_sub_SUITE.erl b/apps/emqx/test/emqx_exclusive_sub_SUITE.erl new file mode 100644 index 000000000..79dfc9de6 --- /dev/null +++ b/apps/emqx/test/emqx_exclusive_sub_SUITE.erl @@ -0,0 +1,159 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_sub_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>). +-define(NORMAL_TOPIC, <<"t/1">>). + +-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)). +-define(CHECK_SUB(Client, Topic, Code), + {ok, _, [Code]} = emqtt:subscribe(Client, Topic, []) +). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:start_apps([]), + ok = ekka:start(), + OldConf = emqx:get_config([zones], #{}), + emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], true), + timer:sleep(50), + [{old_conf, OldConf} | Config]. + +end_per_suite(Config) -> + emqx_config:put([zones], proplists:get_value(old_conf, Config)), + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + emqx_common_test_helpers:stop_apps([]). + +end_per_testcase(_TestCase, _Config) -> + emqx_exclusive_subscription:clear(). + +t_exclusive_sub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {clean_start, false}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 100}} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {clean_start, false}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + %% keep exclusive even disconnected + ok = emqtt:disconnect(C1), + timer:sleep(1000), + + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + ok = emqtt:disconnect(C2). + +t_allow_normal_sub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?NORMAL_TOPIC, 0), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + +t_unsub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + {ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC), + + ?CHECK_SUB(C2, 0), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + +t_clean_session(_) -> + erlang:process_flag(trap_exit, true), + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {clean_start, true}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 0}} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + %% auto clean when session was cleand + ok = emqtt:disconnect(C1), + + timer:sleep(1000), + + ?CHECK_SUB(C2, 0), + + ok = emqtt:disconnect(C2). + +t_feat_disabled(_) -> + OldConf = emqx:get_config([zones], #{}), + emqx_config:put_zone_conf(default, [mqtt, exclusive_subscription], false), + + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID), + ok = emqtt:disconnect(C1), + + emqx_config:put([zones], OldConf).