From 5914b8ad3d131a5ed1eed6187e16ac2034d93cd1 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Wed, 17 Nov 2021 11:55:11 +0100 Subject: [PATCH] Fix flaky tests for persistent sessions (#6202) * test(persistent_session_SUITE): remove redundant sleep and trap_exits * test(persistent_session_SUITE): fix race for started snabbkaffe Sometimes snabbkaffe was not started when publishing from a different process. Wrap the publishing code in a ?check_trace to make sure it is started, and make sure the publish process is truly down before ending the trace. * test(persistent_session_SUITE): fix takeover race Make sure the previous session is unregistered before trying to connect again. Sometimes the new session was trying to take over the session that was shutting down still. --- .../test/emqx_persistent_session_SUITE.erl | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index af59343e7..0f54222f5 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -262,9 +262,13 @@ wait_for_cm_unregister(ClientId, N) -> snabbkaffe_sync_publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> - ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) - , #{?snk_kind := ps_persist_msg, payload := Payload} - ) + ?check_trace( + begin + ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) + , #{?snk_kind := ps_persist_msg, payload := Payload} + ) + end, + fun(_, _Trace) -> ok end) end, do_publish(Payloads, Fun, true). @@ -289,7 +293,22 @@ do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) -> lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), ok = emqtt:disconnect(Client), %% Snabbkaffe sometimes fails unless all processes are gone. - [wait_for_cm_unregister(ClientID) || WaitForUnregister] + case WaitForUnregister of + false -> + ok; + true -> + case emqx_cm:lookup_channels(ClientID) of + [] -> + ok; + [ConnectionPid] -> + ?assert(is_pid(ConnectionPid)), + Ref1 = monitor(process, ConnectionPid), + receive {'DOWN', Ref1, process, ConnectionPid, _} -> ok + after 3000 -> error(process_did_not_die) + end, + wait_for_cm_unregister(ClientID) + end + end end), receive {'DOWN', Ref, process, Pid, normal} -> ok; @@ -407,13 +426,14 @@ t_persist_on_disconnect(Config) -> %% Strangely enough, the disconnect is reported as successful by emqtt. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}), + wait_for_cm_unregister(ClientId), + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 30}} | Config]), {ok, _} = emqtt:ConnFun(Client2), - timer:sleep(10), %% The session should not be known, since it wasn't persisted because of the %% changed expiry interval in the disconnect call. ?assertEqual(0, client_info(session_present, Client2)), @@ -775,7 +795,6 @@ check_snabbkaffe_vanilla(Trace) -> t_snabbkaffe_vanilla_stages(Config) -> %% Test that all stages of session resume works ok in the simplest case - process_flag(trap_exit, true), ConnFun = ?config(conn_fun, Config), ClientId = ?config(client_id, Config), EmqttOpts = [ {proto_ver, v5}, @@ -800,7 +819,6 @@ t_snabbkaffe_vanilla_stages(Config) -> t_snabbkaffe_pending_messages(Config) -> %% Make sure there are pending messages are fetched during the init stage. - process_flag(trap_exit, true), ConnFun = ?config(conn_fun, Config), ClientId = ?config(client_id, Config), Topic = ?config(topic, Config), @@ -840,7 +858,6 @@ t_snabbkaffe_pending_messages(Config) -> t_snabbkaffe_buffered_messages(Config) -> %% Make sure to buffer messages during startup. - process_flag(trap_exit, true), ConnFun = ?config(conn_fun, Config), ClientId = ?config(client_id, Config), Topic = ?config(topic, Config),