test(sessds): add complex testcase for session replay

This commit is contained in:
Andrew Mayorov 2023-11-17 20:21:53 +07:00
parent 1246d714c5
commit a5ff4144fe
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 164 additions and 24 deletions

View File

@ -181,18 +181,23 @@ client_info(Key, Client) ->
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined). maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
receive_messages(Count) -> receive_messages(Count) ->
receive_messages(Count, []). receive_messages(Count, 15000).
receive_messages(0, Msgs) -> receive_messages(Count, Timeout) ->
Msgs; Deadline = erlang:monotonic_time(millisecond) + Timeout,
receive_messages(Count, Msgs) -> receive_message_loop(Count, Deadline).
receive_message_loop(0, _Deadline) ->
[];
receive_message_loop(Count, Deadline) ->
Timeout = max(0, Deadline - erlang:monotonic_time(millisecond)),
receive receive
{publish, Msg} -> {publish, Msg} ->
receive_messages(Count - 1, [Msg | Msgs]); [Msg | receive_message_loop(Count - 1, Deadline)];
_Other -> _Other ->
receive_messages(Count, Msgs) receive_message_loop(Count, Deadline)
after 15000 -> after Timeout ->
Msgs []
end. end.
maybe_kill_connection_process(ClientId, Config) -> maybe_kill_connection_process(ClientId, Config) ->
@ -229,16 +234,28 @@ wait_for_cm_unregister(ClientId, N) ->
wait_for_cm_unregister(ClientId, N - 1) wait_for_cm_unregister(ClientId, N - 1)
end. end.
publish(Topic, Payloads) -> messages(Topic, Payloads) ->
publish(Topic, Payloads, false, 2). messages(Topic, Payloads, ?QOS_2).
publish(Topic, Payloads, WaitForUnregister, QoS) -> messages(Topic, Payloads, QoS) ->
Fun = fun(Client, Payload) -> [#mqtt_msg{topic = Topic, payload = P, qos = QoS} || P <- Payloads].
{ok, _} = emqtt:publish(Client, Topic, Payload, QoS)
publish(Topic, Payload) ->
publish(Topic, Payload, ?QOS_2).
publish(Topic, Payload, QoS) ->
publish_many(messages(Topic, [Payload], QoS)).
publish_many(Messages) ->
publish_many(Messages, false).
publish_many(Messages, WaitForUnregister) ->
Fun = fun(Client, Message) ->
{ok, _} = emqtt:publish(Client, Message)
end, end,
do_publish(Payloads, Fun, WaitForUnregister). do_publish(Messages, Fun, WaitForUnregister).
do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> do_publish(Messages = [_ | _], PublishFun, WaitForUnregister) ->
%% Publish from another process to avoid connection confusion. %% Publish from another process to avoid connection confusion.
{Pid, Ref} = {Pid, Ref} =
spawn_monitor( spawn_monitor(
@ -252,7 +269,7 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
{port, 1883} {port, 1883}
]), ]),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), lists:foreach(fun(Message) -> PublishFun(Client, Message) end, Messages),
ok = emqtt:disconnect(Client), ok = emqtt:disconnect(Client),
%% Snabbkaffe sometimes fails unless all processes are gone. %% Snabbkaffe sometimes fails unless all processes are gone.
case WaitForUnregister of case WaitForUnregister of
@ -277,9 +294,7 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) ->
receive receive
{'DOWN', Ref, process, Pid, normal} -> ok; {'DOWN', Ref, process, Pid, normal} -> ok;
{'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
end; end.
do_publish(Payload, PublishFun, WaitForUnregister) ->
do_publish([Payload], PublishFun, WaitForUnregister).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases %% Test Cases
@ -494,7 +509,7 @@ t_process_dies_session_expires(Config) ->
maybe_kill_connection_process(ClientId, Config), maybe_kill_connection_process(ClientId, Config),
ok = publish(Topic, [Payload]), ok = publish(Topic, Payload),
timer:sleep(1100), timer:sleep(1100),
@ -535,7 +550,7 @@ t_publish_while_client_is_gone_qos1(Config) ->
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config), maybe_kill_connection_process(ClientId, Config),
ok = publish(Topic, [Payload1, Payload2], false, 1), ok = publish_many(messages(Topic, [Payload1, Payload2], ?QOS_1)),
{ok, Client2} = emqtt:start_link([ {ok, Client2} = emqtt:start_link([
{proto_ver, v5}, {proto_ver, v5},
@ -547,7 +562,7 @@ t_publish_while_client_is_gone_qos1(Config) ->
{ok, _} = emqtt:ConnFun(Client2), {ok, _} = emqtt:ConnFun(Client2),
Msgs = receive_messages(2), Msgs = receive_messages(2),
?assertMatch([_, _], Msgs), ?assertMatch([_, _], Msgs),
[Msg2, Msg1] = Msgs, [Msg1, Msg2] = Msgs,
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
?assertEqual({ok, 1}, maps:find(qos, Msg1)), ?assertEqual({ok, 1}, maps:find(qos, Msg1)),
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
@ -555,6 +570,131 @@ t_publish_while_client_is_gone_qos1(Config) ->
ok = emqtt:disconnect(Client2). ok = emqtt:disconnect(Client2).
t_publish_many_while_client_is_gone_qos1(Config) ->
%% A persistent session should receive all of the still unacked messages
%% for its subscriptions after the client dies or reconnects, in addition
%% to new messages that were published while the client was gone. The order
%% of the messages should be consistent across reconnects.
ClientId = ?config(client_id, Config),
ConnFun = ?config(conn_fun, Config),
{ok, Client1} = emqtt:start_link([
{proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, true},
{auto_ack, false}
| Config
]),
{ok, _} = emqtt:ConnFun(Client1),
STopics = [
<<"t/+/foo">>,
<<"msg/feed/#">>,
<<"loc/+/+/+">>
],
[{ok, _, [?QOS_1]} = emqtt:subscribe(Client1, ST, ?QOS_1) || ST <- STopics],
Pubs1 = [
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1},
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 1},
#mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1},
#mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1},
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1}
],
ok = publish_many(Pubs1),
NPubs1 = length(Pubs1),
Msgs1 = receive_messages(NPubs1),
NMsgs1 = length(Msgs1),
?assertEqual(NPubs1, NMsgs1),
ct:pal("Msgs1 = ~p", [Msgs1]),
%% TODO
%% This assertion doesn't currently hold because `emqx_ds` doesn't enforce
%% strict ordering reflecting client publishing order. Instead, per-topic
%% ordering is guaranteed per each client. In fact, this violates the MQTT
%% specification, but we deemed it acceptable for now.
%% ?assertMatch([
%% #{payload := <<"M1">>},
%% #{payload := <<"M2">>},
%% #{payload := <<"M3">>},
%% #{payload := <<"M4">>},
%% #{payload := <<"M5">>},
%% #{payload := <<"M6">>},
%% #{payload := <<"M7">>}
%% ], Msgs1),
?assertEqual(
get_topicwise_order(Pubs1),
get_topicwise_order(Msgs1),
Msgs1
),
NAcked = 4,
[ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)],
%% Ensure that PUBACKs are propagated to the channel.
pong = emqtt:ping(Client1),
ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config),
Pubs2 = [
#mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M8">>, qos = 1},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1},
#mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1},
#mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1}
],
ok = publish_many(Pubs2),
NPubs2 = length(Pubs2),
{ok, Client2} = emqtt:start_link([
{proto_ver, v5},
{clientid, ClientId},
{properties, #{'Session-Expiry-Interval' => 30}},
{clean_start, false},
{auto_ack, false}
| Config
]),
{ok, _} = emqtt:ConnFun(Client2),
%% Try to receive _at most_ `NPubs` messages.
%% There shouldn't be that much unacked messages in the replay anyway,
%% but it's an easy number to pick.
NPubs = NPubs1 + NPubs2,
Msgs2 = receive_messages(NPubs, _Timeout = 2000),
NMsgs2 = length(Msgs2),
ct:pal("Msgs2 = ~p", [Msgs2]),
?assert(NMsgs2 < NPubs, Msgs2),
?assert(NMsgs2 > NPubs2, Msgs2),
?assert(NMsgs2 >= NPubs - NAcked, Msgs2),
NSame = NMsgs2 - NPubs2,
?assertEqual(
[maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)],
[maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)]
),
ok = emqtt:disconnect(Client2).
get_topicwise_order(Msgs) ->
maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
get_msgpub_topic(#mqtt_msg{topic = Topic}) ->
Topic;
get_msgpub_topic(#{topic := Topic}) ->
Topic.
get_msgpub_payload(#mqtt_msg{payload = Payload}) ->
Payload;
get_msgpub_payload(#{payload := Payload}) ->
Payload.
t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config); t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config);
t_publish_while_client_is_gone('end', _Config) -> ok. t_publish_while_client_is_gone('end', _Config) -> ok.
t_publish_while_client_is_gone(Config) -> t_publish_while_client_is_gone(Config) ->
@ -579,7 +719,7 @@ t_publish_while_client_is_gone(Config) ->
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
maybe_kill_connection_process(ClientId, Config), maybe_kill_connection_process(ClientId, Config),
ok = publish(Topic, [Payload1, Payload2]), ok = publish_many(messages(Topic, [Payload1, Payload2])),
{ok, Client2} = emqtt:start_link([ {ok, Client2} = emqtt:start_link([
{proto_ver, v5}, {proto_ver, v5},
@ -591,7 +731,7 @@ t_publish_while_client_is_gone(Config) ->
{ok, _} = emqtt:ConnFun(Client2), {ok, _} = emqtt:ConnFun(Client2),
Msgs = receive_messages(2), Msgs = receive_messages(2),
?assertMatch([_, _], Msgs), ?assertMatch([_, _], Msgs),
[Msg2, Msg1] = Msgs, [Msg1, Msg2] = Msgs,
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
?assertEqual({ok, 2}, maps:find(qos, Msg1)), ?assertEqual({ok, 2}, maps:find(qos, Msg1)),
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),