diff --git a/changes/v4.4.15-en.md b/changes/v4.4.15-en.md index 56f107a95..1dfe74aac 100644 --- a/changes/v4.4.15-en.md +++ b/changes/v4.4.15-en.md @@ -23,3 +23,5 @@ ``` - Avoid crash logs in CoAP gateway when receiving liveness checking packets from Load Balancer [#9869](https://github.com/emqx/emqx/pull/9869). + +- Fix the exclusive topics aren't removed when the session has already been cleaned [#9868](https://github.com/emqx/emqx/pull/9868). diff --git a/changes/v4.4.15-zh.md b/changes/v4.4.15-zh.md index e23e71977..a4bb0a50d 100644 --- a/changes/v4.4.15-zh.md +++ b/changes/v4.4.15-zh.md @@ -22,3 +22,5 @@ ``` - 修复 CoAP 网关在收到负载均衡的心跳检查报文时产生的崩溃日志 [#9869](https://github.com/emqx/emqx/pull/9869)。 + +- 修复会话关闭后,其持有的排他订阅主题没有被释放的问题 [#9868](https://github.com/emqx/emqx/pull/9868)。 diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 8f5a51133..1770bce7c 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.14", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -11,7 +12,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -21,7 +23,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -31,7 +34,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -44,7 +48,8 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}]}, {"4.4.10", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -67,7 +72,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.9", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -96,7 +102,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.8", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -126,7 +133,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.7", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -156,7 +164,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.6", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -186,7 +195,8 @@ {apply,{application,set_env, [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {"4.4.5", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {add_module,emqx_cover}, @@ -451,7 +461,8 @@ [gen_rpc,insecure_auth_fallback_allowed,true]}}]}, {<<".*">>,[]}], [{"4.4.14", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -460,7 +471,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}]}, {"4.4.13", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -470,7 +482,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.12", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -480,7 +493,8 @@ {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -493,7 +507,8 @@ {load_module,emqx_session,brutal_purge,soft_purge,[]}, {delete_module,emqx_cover}]}, {"4.4.10", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -513,7 +528,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.9", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -538,7 +554,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.8", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -564,7 +581,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.7", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -590,7 +608,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.6", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -616,7 +635,8 @@ {delete_module,emqx_crl_cache}, {delete_module,emqx_ocsp_cache}]}, {"4.4.5", - [{load_module,emqx_banned,brutal_purge,soft_purge,[]}, + [{load_module,emqx_broker,brutal_purge,soft_purge,[]}, + {load_module,emqx_banned,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions_trans,brutal_purge,soft_purge,[]}, {load_module,emqx_packet,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 0fc9975cc..76e0997a1 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -183,8 +183,7 @@ do_unsubscribe(Topic, SubPid, SubOpts) -> true = ets:delete(?SUBOPTION, {SubPid, Topic}), true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), Group = maps:get(share, SubOpts, undefined), - do_unsubscribe(Group, Topic, SubPid, SubOpts), - emqx_exclusive_subscription:unsubscribe(Topic, SubOpts). + do_unsubscribe(Group, Topic, SubPid, SubOpts). do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> clean_subscribe(SubOpts, Topic, SubPid); @@ -344,9 +343,12 @@ subscriber_down(SubPid) -> clean_subscribe(SubOpts, Topic, SubPid) -> case maps:get(shard, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + 0 -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = emqx_exclusive_subscription:unsubscribe(Topic, SubOpts), ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + I -> + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end. diff --git a/test/emqx_exclusive_sub_SUITE.erl b/test/emqx_exclusive_sub_SUITE.erl new file mode 100644 index 000000000..e5eec83e9 --- /dev/null +++ b/test/emqx_exclusive_sub_SUITE.erl @@ -0,0 +1,170 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exclusive_sub_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TAB, emqx_exclusive_subscription). +-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>). +-define(NORMAL_TOPIC, <<"t/1">>). + +-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)). +-define(CHECK_SUB(Client, Topic, Code), + {ok, _, [Code]} = emqtt:subscribe(Client, Topic, []) +). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:start_apps([]), + enable_exclusive_sub(true), + Config. + +end_per_suite(_Config) -> + reset_zone_env(), + emqx_ct_helpers:stop_apps([]). + +end_per_testcase(_TestCase, _Config) -> + mnesia:clear_table(?TAB). + +%% test that this feature is working during the whole session life cycle +t_exclusive_sub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {clean_start, false}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 100}} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {clean_start, false}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + %% keep exclusive even disconnected + ok = emqtt:disconnect(C1), + timer:sleep(1000), + + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + ok = emqtt:disconnect(C2). + +%% test this feature does not interfere with normal subs +t_allow_normal_sub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?NORMAL_TOPIC, 0), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + +%% test the exclusive topics can be released correctly +t_unsub(_) -> + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + {ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC), + + ?CHECK_SUB(C2, 0), + + ok = emqtt:disconnect(C1), + ok = emqtt:disconnect(C2). + +%% test whether the exclusive topics would auto-clean after the session was cleaned +t_clean_session(_) -> + erlang:process_flag(trap_exit, true), + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {clean_start, true}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 0}} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, 0), + + {ok, C2} = emqtt:start_link([ + {clientid, <<"client2">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C2), + ?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED), + + %% auto clean when session was cleand + ok = emqtt:disconnect(C1), + + timer:sleep(1000), + + ?CHECK_SUB(C2, 0), + + ok = emqtt:disconnect(C2). + +%% test the feature switch +t_feat_disabled(_) -> + enable_exclusive_sub(false), + + {ok, C1} = emqtt:start_link([ + {clientid, <<"client1">>}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C1), + ?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID), + ok = emqtt:disconnect(C1), + + enable_exclusive_sub(true). + +enable_exclusive_sub(Enable) -> + emqx_zone:set_env( + external, + '$mqtt_sub_caps', + #{exclusive_subscription => Enable} + ), + timer:sleep(50). + +reset_zone_env() -> + emqx_zone:unset_env(external, '$mqtt_sub_caps'), + timer:sleep(50).