From e422f492ef716c694e8b6d4eea08fd392aae6bb1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 18 Sep 2023 23:29:52 +0400 Subject: [PATCH] test(sessds): reuse and expand persistent session test suite --- .../test/emqx_persistent_session_SUITE.erl | 65 +++++++++++++++++-- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index f866636af..c1ba6a60c 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -19,7 +19,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include_lib("../include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -33,8 +33,8 @@ all() -> % NOTE % Tests are disabled while existing session persistence impl is being % phased out. - % {group, persistent_store_enabled}, - {group, persistent_store_disabled} + {group, persistent_store_disabled}, + {group, persistent_store_ds} ]. %% A persistent session can be resumed in two ways: @@ -52,6 +52,7 @@ groups() -> TCs = emqx_common_test_helpers:all(?MODULE), [ {persistent_store_disabled, [{group, no_kill_connection_process}]}, + {persistent_store_ds, [{group, no_kill_connection_process}]}, {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]}, {tcp, [], TCs}, {quic, [], TCs}, @@ -59,7 +60,17 @@ groups() -> ]. init_per_group(persistent_store_disabled, Config) -> - [{emqx_config, "persistent_session_store { enabled = false }"} | Config]; + [ + {emqx_config, "persistent_session_store { enabled = false }"}, + {persistent_store, false} + | Config + ]; +init_per_group(persistent_store_ds, Config) -> + [ + {emqx_config, "persistent_session_store { ds = true }"}, + {persistent_store, ds} + | Config + ]; init_per_group(Group, Config) when Group == tcp -> Apps = emqx_cth_suite:start( [{emqx, ?config(emqx_config, Config)}], @@ -265,7 +276,36 @@ do_publish(Payload, PublishFun, WaitForUnregister) -> %% Test Cases %%-------------------------------------------------------------------- +t_connect_discards_existing_client(Config) -> + ClientId = ?config(client_id, Config), + ConnFun = ?config(conn_fun, Config), + ClientOpts = [ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config + ], + + {ok, Client1} = emqtt:start_link(ClientOpts), + true = unlink(Client1), + MRef = erlang:monitor(process, Client1), + {ok, _} = emqtt:ConnFun(Client1), + + {ok, Client2} = emqtt:start_link(ClientOpts), + {ok, _} = emqtt:ConnFun(Client2), + + receive + {'DOWN', MRef, process, Client1, Reason} -> + ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason), + ok = emqtt:stop(Client2), + ok + after 1000 -> + error({client_still_connected, Client1}) + end. + %% [MQTT-3.1.2-23] +t_connect_session_expiry_interval(init, Config) -> skip_ds_tc(Config); +t_connect_session_expiry_interval('end', _Config) -> ok. t_connect_session_expiry_interval(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), @@ -332,6 +372,7 @@ t_assigned_clientid_persistent_session(Config) -> {ok, Client2} = emqtt:start_link([ {clientid, AssignedClientId}, {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, {clean_start, false} | Config ]), @@ -402,6 +443,8 @@ t_persist_on_disconnect(Config) -> ?assertEqual(0, client_info(session_present, Client2)), ok = emqtt:disconnect(Client2). +t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config); +t_process_dies_session_expires('end', _Config) -> ok. t_process_dies_session_expires(Config) -> %% Emulate an error in the connect process, %% or that the node of the process goes down. @@ -443,6 +486,8 @@ t_process_dies_session_expires(Config) -> emqtt:disconnect(Client2). +t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config); +t_publish_while_client_is_gone('end', _Config) -> ok. t_publish_while_client_is_gone(Config) -> %% A persistent session should receive messages in its %% subscription even if the process owning the session dies. @@ -485,6 +530,8 @@ t_publish_while_client_is_gone(Config) -> ok = emqtt:disconnect(Client2). +t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config); +t_clean_start_drops_subscriptions('end', _Config) -> ok. t_clean_start_drops_subscriptions(Config) -> %% 1. A persistent session is started and disconnected. %% 2. While disconnected, a message is published and persisted. @@ -570,6 +617,8 @@ t_unsubscribe(Config) -> ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), ok = emqtt:disconnect(Client). +t_multiple_subscription_matches(init, Config) -> skip_ds_tc(Config); +t_multiple_subscription_matches('end', _Config) -> ok. t_multiple_subscription_matches(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), @@ -611,3 +660,11 @@ t_multiple_subscription_matches(Config) -> ?assertEqual({ok, 2}, maps:find(qos, Msg1)), ?assertEqual({ok, 2}, maps:find(qos, Msg2)), ok = emqtt:disconnect(Client2). + +skip_ds_tc(Config) -> + case ?config(persistent_store, Config) of + ds -> + {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"}; + _ -> + Config + end.