diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 7647f088f..54b759e87 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -265,33 +265,37 @@ snabbkaffe_sync_publish(Topic, Payloads) -> , #{?snk_kind := ps_persist_msg, payload := Payload} ) end, - do_publish(Payloads, Fun). + do_publish(Payloads, Fun, true). publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> {ok, _} = emqtt:publish(Client, Topic, Payload, 2) end, - do_publish(Payloads, Fun). + do_publish(Payloads, Fun, false). -do_publish(Payloads = [_|_], PublishFun) -> +do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) -> %% Publish from another process to avoid connection confusion. {Pid, Ref} = spawn_monitor( fun() -> %% For convenience, always publish using tcp. %% The publish path is not what we are testing. + ClientID = <<"ps_SUITE_publisher">>, {ok, Client} = emqtt:start_link([ {proto_ver, v5} + , {clientid, ClientID} , {port, 1883} ]), {ok, _} = emqtt:connect(Client), lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), - ok = emqtt:disconnect(Client) + ok = emqtt:disconnect(Client), + %% Snabbkaffe sometimes fails unless all processes are gone. + [wait_for_cm_unregister(ClientID) || WaitForUnregister] end), receive {'DOWN', Ref, process, Pid, normal} -> ok; {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) end; -do_publish(Payload, PublishFun) -> - do_publish([Payload], PublishFun). +do_publish(Payload, PublishFun, WaitForUnregister) -> + do_publish([Payload], PublishFun, WaitForUnregister). %%-------------------------------------------------------------------- %% Test Cases