test(sessds): reuse and expand persistent session test suite

This commit is contained in:
Andrew Mayorov 2023-09-18 23:29:52 +04:00
parent 3383ae19a9
commit e422f492ef
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 61 additions and 4 deletions

View File

@ -19,7 +19,7 @@
-include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("../include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-compile(export_all).
-compile(nowarn_export_all).
@ -33,8 +33,8 @@ all() ->
% NOTE
% Tests are disabled while existing session persistence impl is being
% phased out.
% {group, persistent_store_enabled},
{group, persistent_store_disabled}
{group, persistent_store_disabled},
{group, persistent_store_ds}
].
%% A persistent session can be resumed in two ways:
@ -52,6 +52,7 @@ groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{persistent_store_disabled, [{group, no_kill_connection_process}]},
{persistent_store_ds, [{group, no_kill_connection_process}]},
{no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]},
{tcp, [], TCs},
{quic, [], TCs},
@ -59,7 +60,17 @@ groups() ->
].
init_per_group(persistent_store_disabled, Config) ->
[{emqx_config, "persistent_session_store { enabled = false }"} | Config];
[
{emqx_config, "persistent_session_store { enabled = false }"},
{persistent_store, false}
| Config
];
init_per_group(persistent_store_ds, Config) ->
[
{emqx_config, "persistent_session_store { ds = true }"},
{persistent_store, ds}
| Config
];
init_per_group(Group, Config) when Group == tcp ->
Apps = emqx_cth_suite:start(
[{emqx, ?config(emqx_config, Config)}],
@ -265,7 +276,36 @@ do_publish(Payload, PublishFun, WaitForUnregister) ->
%% Test Cases
%%--------------------------------------------------------------------
t_connect_discards_existing_client(Config) ->
ClientId = ?config(client_id, Config),
ConnFun = ?config(conn_fun, Config),
ClientOpts = [
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
| Config
],
{ok, Client1} = emqtt:start_link(ClientOpts),
true = unlink(Client1),
MRef = erlang:monitor(process, Client1),
{ok, _} = emqtt:ConnFun(Client1),
{ok, Client2} = emqtt:start_link(ClientOpts),
{ok, _} = emqtt:ConnFun(Client2),
receive
{'DOWN', MRef, process, Client1, Reason} ->
ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
ok = emqtt:stop(Client2),
ok
after 1000 ->
error({client_still_connected, Client1})
end.
%% [MQTT-3.1.2-23]
t_connect_session_expiry_interval(init, Config) -> skip_ds_tc(Config);
t_connect_session_expiry_interval('end', _Config) -> ok.
t_connect_session_expiry_interval(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
@ -332,6 +372,7 @@ t_assigned_clientid_persistent_session(Config) ->
{ok, Client2} = emqtt:start_link([
{clientid, AssignedClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false}
| Config
]),
@ -402,6 +443,8 @@ t_persist_on_disconnect(Config) ->
?assertEqual(0, client_info(session_present, Client2)),
ok = emqtt:disconnect(Client2).
t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config);
t_process_dies_session_expires('end', _Config) -> ok.
t_process_dies_session_expires(Config) ->
%% Emulate an error in the connect process,
%% or that the node of the process goes down.
@ -443,6 +486,8 @@ t_process_dies_session_expires(Config) ->
emqtt:disconnect(Client2).
t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config);
t_publish_while_client_is_gone('end', _Config) -> ok.
t_publish_while_client_is_gone(Config) ->
%% A persistent session should receive messages in its
%% subscription even if the process owning the session dies.
@ -485,6 +530,8 @@ t_publish_while_client_is_gone(Config) ->
ok = emqtt:disconnect(Client2).
t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
t_clean_start_drops_subscriptions('end', _Config) -> ok.
t_clean_start_drops_subscriptions(Config) ->
%% 1. A persistent session is started and disconnected.
%% 2. While disconnected, a message is published and persisted.
@ -570,6 +617,8 @@ t_unsubscribe(Config) ->
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
ok = emqtt:disconnect(Client).
t_multiple_subscription_matches(init, Config) -> skip_ds_tc(Config);
t_multiple_subscription_matches('end', _Config) -> ok.
t_multiple_subscription_matches(Config) ->
ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config),
@ -611,3 +660,11 @@ t_multiple_subscription_matches(Config) ->
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
ok = emqtt:disconnect(Client2).
skip_ds_tc(Config) ->
case ?config(persistent_store, Config) of
ds ->
{skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
_ ->
Config
end.