From ec68d7fc589b2b7d2f5de8fdf963ebede9511077 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Wed, 27 Oct 2021 14:12:03 +0200 Subject: [PATCH] test(persistent_sessions): stabilize flaky tests --- apps/emqx/src/emqx_cm.erl | 9 ++- .../test/emqx_persistent_session_SUITE.erl | 68 ++++++++++++------- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 83f77050a..dbd8dfa63 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -58,7 +58,9 @@ , lookup_channels/2 ]). --export([all_channels/0]). +-export([ all_channels/0 + , all_client_ids/0 + ]). %% gen_server callbacks -export([ init/1 @@ -400,6 +402,11 @@ all_channels() -> Pat = [{{'_', '$1'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). +all_client_ids() -> + Pat = [{{'$1', '_'}, [], ['$1']}], + ets:select(?CHAN_TAB, Pat). + + %% @doc Lookup channels. -spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())). lookup_channels(ClientId) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index ac51636f0..3e992ce2d 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -113,6 +113,8 @@ init_per_group(snabbkaffe, Config) -> [ {kill_connection_process, true} | Config]; init_per_group(gc_tests, Config) -> %% We need to make sure the system does not interfere with this test group. + [maybe_kill_connection_process(ClientId, [{kill_connection_process, true}]) + || ClientId <- emqx_cm:all_client_ids()], emqx_common_test_helpers:stop_apps([]), SessionMsgEts = gc_tests_session_store, MsgEts = gc_tests_msg_store, @@ -230,32 +232,48 @@ receive_messages(Count, Msgs) -> maybe_kill_connection_process(ClientId, Config) -> case ?config(kill_connection_process, Config) of true -> - [ConnectionPid] = emqx_cm:lookup_channels(ClientId), - ?assert(is_pid(ConnectionPid)), - Ref = monitor(process, ConnectionPid), - ConnectionPid ! die_if_test, - receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok - after 3000 -> error(process_did_not_die) + case emqx_cm:lookup_channels(ClientId) of + [] -> + ok; + [ConnectionPid] -> + ?assert(is_pid(ConnectionPid)), + Ref = monitor(process, ConnectionPid), + ConnectionPid ! die_if_test, + receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok + after 3000 -> error(process_did_not_die) + end, + wait_for_cm_unregister(ClientId) end; false -> ok end. -snabbkaffe_sync_publish(Topic, Payloads, Config) -> +wait_for_cm_unregister(ClientId) -> + wait_for_cm_unregister(ClientId, 10). + +wait_for_cm_unregister(_ClientId, 0) -> + error(cm_did_not_unregister); +wait_for_cm_unregister(ClientId, N) -> + case emqx_cm:lookup_channels(ClientId) of + [] -> ok; + [_] -> timer:sleep(100), wait_for_cm_unregister(ClientId, N - 1) + end. + +snabbkaffe_sync_publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) , #{?snk_kind := ps_persist_msg, payload := Payload} ) end, - do_publish(Payloads, Fun, Config). + do_publish(Payloads, Fun). -publish(Topic, Payloads, Config) -> +publish(Topic, Payloads) -> Fun = fun(Client, Payload) -> {ok, _} = emqtt:publish(Client, Topic, Payload, 2) end, - do_publish(Payloads, Fun, Config). + do_publish(Payloads, Fun). -do_publish(Payloads = [_|_], PublishFun, Config) -> +do_publish(Payloads = [_|_], PublishFun) -> %% Publish from another process to avoid connection confusion. {Pid, Ref} = spawn_monitor( @@ -272,8 +290,8 @@ do_publish(Payloads = [_|_], PublishFun, Config) -> {'DOWN', Ref, process, Pid, normal} -> ok; {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) end; -do_publish(Payload, PublishFun, Config) -> - do_publish([Payload], PublishFun, Config). +do_publish(Payload, PublishFun) -> + do_publish([Payload], PublishFun). %%-------------------------------------------------------------------- %% Test Cases @@ -297,7 +315,7 @@ t_connect_session_expiry_interval(Config) -> maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload, Config), + publish(Topic, Payload), {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, @@ -424,7 +442,7 @@ t_process_dies_session_expires(Config) -> maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload], Config), + ok = publish(Topic, [Payload]), SessionId = case ?config(persistent_store_enabled, Config) of @@ -498,7 +516,7 @@ t_publish_while_client_is_gone(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload1, Payload2], Config), + ok = publish(Topic, [Payload1, Payload2]), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, {clientid, ClientId}, @@ -544,7 +562,7 @@ t_clean_start_drops_subscriptions(Config) -> maybe_kill_connection_process(ClientId, Config), %% 2. - ok = publish(Topic, Payload1, Config), + ok = publish(Topic, Payload1), %% 3. {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, @@ -556,7 +574,7 @@ t_clean_start_drops_subscriptions(Config) -> ?assertEqual(0, client_info(session_present, Client2)), {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2), - ok = publish(Topic, Payload2, Config), + ok = publish(Topic, Payload2), [Msg1] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)), @@ -571,7 +589,7 @@ t_clean_start_drops_subscriptions(Config) -> | Config]), {ok, _} = emqtt:ConnFun(Client3), - ok = publish(Topic, Payload3, Config), + ok = publish(Topic, Payload3), [Msg2] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)), @@ -625,7 +643,7 @@ t_multiple_subscription_matches(Config) -> maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload, Config), + publish(Topic, Payload), {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {proto_ver, v5}, @@ -675,9 +693,9 @@ t_lost_messages_because_of_gc(Config) -> {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payload1, Config), + publish(Topic, Payload1), timer:sleep(2 * Retain), - publish(Topic, Payload2, Config), + publish(Topic, Payload2), emqx_persistent_session_gc:message_gc_worker(), {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, {clean_start, false}, @@ -790,7 +808,7 @@ t_snabbkaffe_pending_messages(Config) -> ?check_trace( begin - snabbkaffe_sync_publish(Topic, Payloads, Config), + snabbkaffe_sync_publish(Topic, Payloads), {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), {ok, _} = emqtt:ConnFun(Client2), Msgs = receive_messages(length(Payloads)), @@ -829,7 +847,7 @@ t_snabbkaffe_buffered_messages(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - publish(Topic, Payloads1, Config), + publish(Topic, Payloads1), ?check_trace( begin @@ -838,7 +856,7 @@ t_snabbkaffe_buffered_messages(Config) -> #{ ?snk_kind := ps_resume_end }), spawn_link(fun() -> ?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000), - publish(Topic, Payloads2, Config) + publish(Topic, Payloads2) end), {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), {ok, _} = emqtt:ConnFun(Client2),