diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index af59343e7..0f54222f5 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -262,9 +262,13 @@ wait_for_cm_unregister(ClientId, N) -> snabbkaffe_sync_publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> - ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) - , #{?snk_kind := ps_persist_msg, payload := Payload} - ) + ?check_trace( + begin + ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) + , #{?snk_kind := ps_persist_msg, payload := Payload} + ) + end, + fun(_, _Trace) -> ok end) end, do_publish(Payloads, Fun, true). @@ -289,7 +293,22 @@ do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) -> lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), ok = emqtt:disconnect(Client), %% Snabbkaffe sometimes fails unless all processes are gone. - [wait_for_cm_unregister(ClientID) || WaitForUnregister] + case WaitForUnregister of + false -> + ok; + true -> + case emqx_cm:lookup_channels(ClientID) of + [] -> + ok; + [ConnectionPid] -> + ?assert(is_pid(ConnectionPid)), + Ref1 = monitor(process, ConnectionPid), + receive {'DOWN', Ref1, process, ConnectionPid, _} -> ok + after 3000 -> error(process_did_not_die) + end, + wait_for_cm_unregister(ClientID) + end + end end), receive {'DOWN', Ref, process, Pid, normal} -> ok; @@ -407,13 +426,14 @@ t_persist_on_disconnect(Config) -> %% Strangely enough, the disconnect is reported as successful by emqtt. ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}), + wait_for_cm_unregister(ClientId), + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 30}} | Config]), {ok, _} = emqtt:ConnFun(Client2), - timer:sleep(10), %% The session should not be known, since it wasn't persisted because of the %% changed expiry interval in the disconnect call. ?assertEqual(0, client_info(session_present, Client2)), @@ -775,7 +795,6 @@ check_snabbkaffe_vanilla(Trace) -> t_snabbkaffe_vanilla_stages(Config) -> %% Test that all stages of session resume works ok in the simplest case - process_flag(trap_exit, true), ConnFun = ?config(conn_fun, Config), ClientId = ?config(client_id, Config), EmqttOpts = [ {proto_ver, v5}, @@ -800,7 +819,6 @@ t_snabbkaffe_vanilla_stages(Config) -> t_snabbkaffe_pending_messages(Config) -> %% Make sure there are pending messages are fetched during the init stage. - process_flag(trap_exit, true), ConnFun = ?config(conn_fun, Config), ClientId = ?config(client_id, Config), Topic = ?config(topic, Config), @@ -840,7 +858,6 @@ t_snabbkaffe_pending_messages(Config) -> t_snabbkaffe_buffered_messages(Config) -> %% Make sure to buffer messages during startup. - process_flag(trap_exit, true), ConnFun = ?config(conn_fun, Config), ClientId = ?config(client_id, Config), Topic = ?config(topic, Config),