171 lines
4.5 KiB
Erlang
171 lines
4.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_exclusive_sub_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-define(TAB, emqx_exclusive_subscription).
|
|
-define(EXCLUSIVE_TOPIC, <<"$exclusive/t/1">>).
|
|
-define(NORMAL_TOPIC, <<"t/1">>).
|
|
|
|
-define(CHECK_SUB(Client, Code), ?CHECK_SUB(Client, ?EXCLUSIVE_TOPIC, Code)).
|
|
-define(CHECK_SUB(Client, Topic, Code),
|
|
{ok, _, [Code]} = emqtt:subscribe(Client, Topic, [])
|
|
).
|
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:start_apps([]),
|
|
enable_exclusive_sub(true),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
reset_zone_env(),
|
|
emqx_ct_helpers:stop_apps([]).
|
|
|
|
end_per_testcase(_TestCase, _Config) ->
|
|
mnesia:clear_table(?TAB).
|
|
|
|
%% test that this feature is working during the whole session life cycle
|
|
t_exclusive_sub(_) ->
|
|
{ok, C1} = emqtt:start_link([
|
|
{clientid, <<"client1">>},
|
|
{clean_start, false},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 100}}
|
|
]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
?CHECK_SUB(C1, 0),
|
|
|
|
{ok, C2} = emqtt:start_link([
|
|
{clientid, <<"client2">>},
|
|
{clean_start, false},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
|
|
|
%% keep exclusive even disconnected
|
|
ok = emqtt:disconnect(C1),
|
|
timer:sleep(1000),
|
|
|
|
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
|
|
|
ok = emqtt:disconnect(C2).
|
|
|
|
%% test this feature does not interfere with normal subs
|
|
t_allow_normal_sub(_) ->
|
|
{ok, C1} = emqtt:start_link([
|
|
{clientid, <<"client1">>},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
?CHECK_SUB(C1, 0),
|
|
|
|
{ok, C2} = emqtt:start_link([
|
|
{clientid, <<"client2">>},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
?CHECK_SUB(C2, ?NORMAL_TOPIC, 0),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
ok = emqtt:disconnect(C2).
|
|
|
|
%% test the exclusive topics can be released correctly
|
|
t_unsub(_) ->
|
|
{ok, C1} = emqtt:start_link([
|
|
{clientid, <<"client1">>},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
?CHECK_SUB(C1, 0),
|
|
|
|
{ok, C2} = emqtt:start_link([
|
|
{clientid, <<"client2">>},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
|
|
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, ?EXCLUSIVE_TOPIC),
|
|
|
|
?CHECK_SUB(C2, 0),
|
|
|
|
ok = emqtt:disconnect(C1),
|
|
ok = emqtt:disconnect(C2).
|
|
|
|
%% test whether the exclusive topics would auto-clean after the session was cleaned
|
|
t_clean_session(_) ->
|
|
erlang:process_flag(trap_exit, true),
|
|
{ok, C1} = emqtt:start_link([
|
|
{clientid, <<"client1">>},
|
|
{clean_start, true},
|
|
{proto_ver, v5},
|
|
{properties, #{'Session-Expiry-Interval' => 0}}
|
|
]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
?CHECK_SUB(C1, 0),
|
|
|
|
{ok, C2} = emqtt:start_link([
|
|
{clientid, <<"client2">>},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C2),
|
|
?CHECK_SUB(C2, ?RC_QUOTA_EXCEEDED),
|
|
|
|
%% auto clean when session was cleand
|
|
ok = emqtt:disconnect(C1),
|
|
|
|
timer:sleep(1000),
|
|
|
|
?CHECK_SUB(C2, 0),
|
|
|
|
ok = emqtt:disconnect(C2).
|
|
|
|
%% test the feature switch
|
|
t_feat_disabled(_) ->
|
|
enable_exclusive_sub(false),
|
|
|
|
{ok, C1} = emqtt:start_link([
|
|
{clientid, <<"client1">>},
|
|
{proto_ver, v5}
|
|
]),
|
|
{ok, _} = emqtt:connect(C1),
|
|
?CHECK_SUB(C1, ?RC_TOPIC_FILTER_INVALID),
|
|
ok = emqtt:disconnect(C1),
|
|
|
|
enable_exclusive_sub(true).
|
|
|
|
enable_exclusive_sub(Enable) ->
|
|
emqx_zone:set_env(
|
|
external,
|
|
'$mqtt_sub_caps',
|
|
#{exclusive_subscription => Enable}
|
|
),
|
|
timer:sleep(50).
|
|
|
|
reset_zone_env() ->
|
|
emqx_zone:unset_env(external, '$mqtt_sub_caps'),
|
|
timer:sleep(50).
|