diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 537c60876..92b95c7c3 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -91,6 +91,7 @@ clean_down/1, mark_channel_connected/1, mark_channel_disconnected/1, + is_channel_connected/1, get_connected_client_count/0 ]). diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index d835fb944..66bb8dcf5 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -238,6 +238,24 @@ wait_connection_process_unregistered(ClientId) -> ?assertEqual([], emqx_cm:lookup_channels(ClientId)) ). +wait_channel_disconnected(ClientId) -> + ?retry( + _Timeout = 100, + _Retries = 20, + case emqx_cm:lookup_channels(ClientId) of + [] -> + false; + [ChanPid] -> + false = emqx_cm:is_channel_connected(ChanPid) + end + ). + +disconnect_client(ClientPid) -> + ClientId = proplists:get_value(clientid, emqtt:info(ClientPid)), + ok = emqtt:disconnect(ClientPid), + false = wait_channel_disconnected(ClientId), + ok. + messages(Topic, Payloads) -> messages(Topic, Payloads, ?QOS_2). @@ -661,7 +679,7 @@ t_publish_many_while_client_is_gone_qos1(Config) -> %% Ensure that PUBACKs are propagated to the channel. pong = emqtt:ping(Client1), - ok = emqtt:disconnect(Client1), + ok = disconnect_client(Client1), maybe_kill_connection_process(ClientId, Config), Pubs2 = [ @@ -708,7 +726,7 @@ t_publish_many_while_client_is_gone_qos1(Config) -> [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)] ), - ok = emqtt:disconnect(Client2). + ok = disconnect_client(Client2). t_publish_while_client_is_gone(Config) -> %% A persistent session should receive messages in its @@ -823,7 +841,7 @@ t_publish_many_while_client_is_gone(Config) -> PubRels1 ), - ok = emqtt:disconnect(Client1), + ok = disconnect_client(Client1), maybe_kill_connection_process(ClientId, Config), Pubs2 = [ @@ -887,7 +905,7 @@ t_publish_many_while_client_is_gone(Config) -> %% Ensure that PUBCOMPs are propagated to the channel. pong = emqtt:ping(Client2), - ok = emqtt:disconnect(Client2), + ok = disconnect_client(Client2), maybe_kill_connection_process(ClientId, Config), {ok, Client3} = emqtt:start_link([{clean_start, false} | ClientOpts]), @@ -901,7 +919,7 @@ t_publish_many_while_client_is_gone(Config) -> Msgs3 ), - ok = emqtt:disconnect(Client3). + ok = disconnect_client(Client3). t_clean_start_drops_subscriptions(Config) -> %% 1. A persistent session is started and disconnected.