test(persistent_sessions): stabilize flaky tests

This commit is contained in:
Tobias Lindahl 2021-10-27 14:12:03 +02:00
parent 7ae6e04582
commit ec68d7fc58
2 changed files with 51 additions and 26 deletions

View File

@ -58,7 +58,9 @@
, lookup_channels/2
]).
-export([all_channels/0]).
-export([ all_channels/0
, all_client_ids/0
]).
%% gen_server callbacks
-export([ init/1
@ -400,6 +402,11 @@ all_channels() ->
Pat = [{{'_', '$1'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
all_client_ids() ->
Pat = [{{'$1', '_'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat).
%% @doc Lookup channels.
-spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
lookup_channels(ClientId) ->

View File

@ -113,6 +113,8 @@ init_per_group(snabbkaffe, Config) ->
[ {kill_connection_process, true} | Config];
init_per_group(gc_tests, Config) ->
%% We need to make sure the system does not interfere with this test group.
[maybe_kill_connection_process(ClientId, [{kill_connection_process, true}])
|| ClientId <- emqx_cm:all_client_ids()],
emqx_common_test_helpers:stop_apps([]),
SessionMsgEts = gc_tests_session_store,
MsgEts = gc_tests_msg_store,
@ -230,32 +232,48 @@ receive_messages(Count, Msgs) ->
maybe_kill_connection_process(ClientId, Config) ->
case ?config(kill_connection_process, Config) of
true ->
[ConnectionPid] = emqx_cm:lookup_channels(ClientId),
?assert(is_pid(ConnectionPid)),
Ref = monitor(process, ConnectionPid),
ConnectionPid ! die_if_test,
receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
after 3000 -> error(process_did_not_die)
case emqx_cm:lookup_channels(ClientId) of
[] ->
ok;
[ConnectionPid] ->
?assert(is_pid(ConnectionPid)),
Ref = monitor(process, ConnectionPid),
ConnectionPid ! die_if_test,
receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
after 3000 -> error(process_did_not_die)
end,
wait_for_cm_unregister(ClientId)
end;
false ->
ok
end.
snabbkaffe_sync_publish(Topic, Payloads, Config) ->
wait_for_cm_unregister(ClientId) ->
wait_for_cm_unregister(ClientId, 10).
wait_for_cm_unregister(_ClientId, 0) ->
error(cm_did_not_unregister);
wait_for_cm_unregister(ClientId, N) ->
case emqx_cm:lookup_channels(ClientId) of
[] -> ok;
[_] -> timer:sleep(100), wait_for_cm_unregister(ClientId, N - 1)
end.
snabbkaffe_sync_publish(Topic, Payloads) ->
Fun = fun(Client, Payload) ->
?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
, #{?snk_kind := ps_persist_msg, payload := Payload}
)
end,
do_publish(Payloads, Fun, Config).
do_publish(Payloads, Fun).
publish(Topic, Payloads, Config) ->
publish(Topic, Payloads) ->
Fun = fun(Client, Payload) ->
{ok, _} = emqtt:publish(Client, Topic, Payload, 2)
end,
do_publish(Payloads, Fun, Config).
do_publish(Payloads, Fun).
do_publish(Payloads = [_|_], PublishFun, Config) ->
do_publish(Payloads = [_|_], PublishFun) ->
%% Publish from another process to avoid connection confusion.
{Pid, Ref} =
spawn_monitor(
@ -272,8 +290,8 @@ do_publish(Payloads = [_|_], PublishFun, Config) ->
{'DOWN', Ref, process, Pid, normal} -> ok;
{'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
end;
do_publish(Payload, PublishFun, Config) ->
do_publish([Payload], PublishFun, Config).
do_publish(Payload, PublishFun) ->
do_publish([Payload], PublishFun).
%%--------------------------------------------------------------------
%% Test Cases
@ -297,7 +315,7 @@ t_connect_session_expiry_interval(Config) ->
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payload, Config),
publish(Topic, Payload),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
@ -424,7 +442,7 @@ t_process_dies_session_expires(Config) ->
maybe_kill_connection_process(ClientId, Config),
ok = publish(Topic, [Payload], Config),
ok = publish(Topic, [Payload]),
SessionId =
case ?config(persistent_store_enabled, Config) of
@ -498,7 +516,7 @@ t_publish_while_client_is_gone(Config) ->
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
ok = publish(Topic, [Payload1, Payload2], Config),
ok = publish(Topic, [Payload1, Payload2]),
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
{clientid, ClientId},
@ -544,7 +562,7 @@ t_clean_start_drops_subscriptions(Config) ->
maybe_kill_connection_process(ClientId, Config),
%% 2.
ok = publish(Topic, Payload1, Config),
ok = publish(Topic, Payload1),
%% 3.
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
@ -556,7 +574,7 @@ t_clean_start_drops_subscriptions(Config) ->
?assertEqual(0, client_info(session_present, Client2)),
{ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
ok = publish(Topic, Payload2, Config),
ok = publish(Topic, Payload2),
[Msg1] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
@ -571,7 +589,7 @@ t_clean_start_drops_subscriptions(Config) ->
| Config]),
{ok, _} = emqtt:ConnFun(Client3),
ok = publish(Topic, Payload3, Config),
ok = publish(Topic, Payload3),
[Msg2] = receive_messages(1),
?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
@ -625,7 +643,7 @@ t_multiple_subscription_matches(Config) ->
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payload, Config),
publish(Topic, Payload),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{proto_ver, v5},
@ -675,9 +693,9 @@ t_lost_messages_because_of_gc(Config) ->
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payload1, Config),
publish(Topic, Payload1),
timer:sleep(2 * Retain),
publish(Topic, Payload2, Config),
publish(Topic, Payload2),
emqx_persistent_session_gc:message_gc_worker(),
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
{clean_start, false},
@ -790,7 +808,7 @@ t_snabbkaffe_pending_messages(Config) ->
?check_trace(
begin
snabbkaffe_sync_publish(Topic, Payloads, Config),
snabbkaffe_sync_publish(Topic, Payloads),
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client2),
Msgs = receive_messages(length(Payloads)),
@ -829,7 +847,7 @@ t_snabbkaffe_buffered_messages(Config) ->
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
publish(Topic, Payloads1, Config),
publish(Topic, Payloads1),
?check_trace(
begin
@ -838,7 +856,7 @@ t_snabbkaffe_buffered_messages(Config) ->
#{ ?snk_kind := ps_resume_end }),
spawn_link(fun() ->
?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000),
publish(Topic, Payloads2, Config)
publish(Topic, Payloads2)
end),
{ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
{ok, _} = emqtt:ConnFun(Client2),