diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index bd7ca1c46..3f4cbcd28 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -181,18 +181,23 @@ client_info(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined). receive_messages(Count) -> - receive_messages(Count, []). + receive_messages(Count, 15000). -receive_messages(0, Msgs) -> - Msgs; -receive_messages(Count, Msgs) -> +receive_messages(Count, Timeout) -> + Deadline = erlang:monotonic_time(millisecond) + Timeout, + receive_message_loop(Count, Deadline). + +receive_message_loop(0, _Deadline) -> + []; +receive_message_loop(Count, Deadline) -> + Timeout = max(0, Deadline - erlang:monotonic_time(millisecond)), receive {publish, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]); + [Msg | receive_message_loop(Count - 1, Deadline)]; _Other -> - receive_messages(Count, Msgs) - after 15000 -> - Msgs + receive_message_loop(Count, Deadline) + after Timeout -> + [] end. maybe_kill_connection_process(ClientId, Config) -> @@ -229,16 +234,28 @@ wait_for_cm_unregister(ClientId, N) -> wait_for_cm_unregister(ClientId, N - 1) end. -publish(Topic, Payloads) -> - publish(Topic, Payloads, false, 2). +messages(Topic, Payloads) -> + messages(Topic, Payloads, ?QOS_2). -publish(Topic, Payloads, WaitForUnregister, QoS) -> - Fun = fun(Client, Payload) -> - {ok, _} = emqtt:publish(Client, Topic, Payload, QoS) +messages(Topic, Payloads, QoS) -> + [#mqtt_msg{topic = Topic, payload = P, qos = QoS} || P <- Payloads]. + +publish(Topic, Payload) -> + publish(Topic, Payload, ?QOS_2). + +publish(Topic, Payload, QoS) -> + publish_many(messages(Topic, [Payload], QoS)). + +publish_many(Messages) -> + publish_many(Messages, false). + +publish_many(Messages, WaitForUnregister) -> + Fun = fun(Client, Message) -> + {ok, _} = emqtt:publish(Client, Message) end, - do_publish(Payloads, Fun, WaitForUnregister). + do_publish(Messages, Fun, WaitForUnregister). -do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> +do_publish(Messages = [_ | _], PublishFun, WaitForUnregister) -> %% Publish from another process to avoid connection confusion. {Pid, Ref} = spawn_monitor( @@ -252,7 +269,7 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> {port, 1883} ]), {ok, _} = emqtt:connect(Client), - lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), + lists:foreach(fun(Message) -> PublishFun(Client, Message) end, Messages), ok = emqtt:disconnect(Client), %% Snabbkaffe sometimes fails unless all processes are gone. case WaitForUnregister of @@ -277,9 +294,7 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> receive {'DOWN', Ref, process, Pid, normal} -> ok; {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) - end; -do_publish(Payload, PublishFun, WaitForUnregister) -> - do_publish([Payload], PublishFun, WaitForUnregister). + end. %%-------------------------------------------------------------------- %% Test Cases @@ -494,7 +509,7 @@ t_process_dies_session_expires(Config) -> maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload]), + ok = publish(Topic, Payload), timer:sleep(1100), @@ -535,7 +550,7 @@ t_publish_while_client_is_gone_qos1(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload1, Payload2], false, 1), + ok = publish_many(messages(Topic, [Payload1, Payload2], ?QOS_1)), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, @@ -547,7 +562,7 @@ t_publish_while_client_is_gone_qos1(Config) -> {ok, _} = emqtt:ConnFun(Client2), Msgs = receive_messages(2), ?assertMatch([_, _], Msgs), - [Msg2, Msg1] = Msgs, + [Msg1, Msg2] = Msgs, ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), ?assertEqual({ok, 1}, maps:find(qos, Msg1)), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)), @@ -555,6 +570,131 @@ t_publish_while_client_is_gone_qos1(Config) -> ok = emqtt:disconnect(Client2). +t_publish_many_while_client_is_gone_qos1(Config) -> + %% A persistent session should receive all of the still unacked messages + %% for its subscriptions after the client dies or reconnects, in addition + %% to new messages that were published while the client was gone. The order + %% of the messages should be consistent across reconnects. + ClientId = ?config(client_id, Config), + ConnFun = ?config(conn_fun, Config), + {ok, Client1} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true}, + {auto_ack, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client1), + + STopics = [ + <<"t/+/foo">>, + <<"msg/feed/#">>, + <<"loc/+/+/+">> + ], + [{ok, _, [?QOS_1]} = emqtt:subscribe(Client1, ST, ?QOS_1) || ST <- STopics], + + Pubs1 = [ + #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M1">>, qos = 1}, + #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M2">>, qos = 1}, + #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M3">>, qos = 1}, + #mqtt_msg{topic = <<"loc/1/2/42">>, payload = <<"M4">>, qos = 1}, + #mqtt_msg{topic = <<"t/42/foo">>, payload = <<"M5">>, qos = 1}, + #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M6">>, qos = 1}, + #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M7">>, qos = 1} + ], + ok = publish_many(Pubs1), + NPubs1 = length(Pubs1), + + Msgs1 = receive_messages(NPubs1), + NMsgs1 = length(Msgs1), + ?assertEqual(NPubs1, NMsgs1), + + ct:pal("Msgs1 = ~p", [Msgs1]), + + %% TODO + %% This assertion doesn't currently hold because `emqx_ds` doesn't enforce + %% strict ordering reflecting client publishing order. Instead, per-topic + %% ordering is guaranteed per each client. In fact, this violates the MQTT + %% specification, but we deemed it acceptable for now. + %% ?assertMatch([ + %% #{payload := <<"M1">>}, + %% #{payload := <<"M2">>}, + %% #{payload := <<"M3">>}, + %% #{payload := <<"M4">>}, + %% #{payload := <<"M5">>}, + %% #{payload := <<"M6">>}, + %% #{payload := <<"M7">>} + %% ], Msgs1), + + ?assertEqual( + get_topicwise_order(Pubs1), + get_topicwise_order(Msgs1), + Msgs1 + ), + + NAcked = 4, + [ok = emqtt:puback(Client1, PktId) || #{packet_id := PktId} <- lists:sublist(Msgs1, NAcked)], + + %% Ensure that PUBACKs are propagated to the channel. + pong = emqtt:ping(Client1), + + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + Pubs2 = [ + #mqtt_msg{topic = <<"loc/3/4/5">>, payload = <<"M8">>, qos = 1}, + #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M9">>, qos = 1}, + #mqtt_msg{topic = <<"t/100/foo">>, payload = <<"M10">>, qos = 1}, + #mqtt_msg{topic = <<"msg/feed/friend">>, payload = <<"M11">>, qos = 1}, + #mqtt_msg{topic = <<"msg/feed/me">>, payload = <<"M12">>, qos = 1} + ], + ok = publish_many(Pubs2), + NPubs2 = length(Pubs2), + + {ok, Client2} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false}, + {auto_ack, false} + | Config + ]), + {ok, _} = emqtt:ConnFun(Client2), + + %% Try to receive _at most_ `NPubs` messages. + %% There shouldn't be that much unacked messages in the replay anyway, + %% but it's an easy number to pick. + NPubs = NPubs1 + NPubs2, + Msgs2 = receive_messages(NPubs, _Timeout = 2000), + NMsgs2 = length(Msgs2), + + ct:pal("Msgs2 = ~p", [Msgs2]), + + ?assert(NMsgs2 < NPubs, Msgs2), + ?assert(NMsgs2 > NPubs2, Msgs2), + ?assert(NMsgs2 >= NPubs - NAcked, Msgs2), + NSame = NMsgs2 - NPubs2, + ?assertEqual( + [maps:with([packet_id, topic, payload], M) || M <- lists:nthtail(NMsgs1 - NSame, Msgs1)], + [maps:with([packet_id, topic, payload], M) || M <- lists:sublist(Msgs2, NSame)] + ), + + ok = emqtt:disconnect(Client2). + +get_topicwise_order(Msgs) -> + maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs). + +get_msgpub_topic(#mqtt_msg{topic = Topic}) -> + Topic; +get_msgpub_topic(#{topic := Topic}) -> + Topic. + +get_msgpub_payload(#mqtt_msg{payload = Payload}) -> + Payload; +get_msgpub_payload(#{payload := Payload}) -> + Payload. + t_publish_while_client_is_gone(init, Config) -> skip_ds_tc(Config); t_publish_while_client_is_gone('end', _Config) -> ok. t_publish_while_client_is_gone(Config) -> @@ -579,7 +719,7 @@ t_publish_while_client_is_gone(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload1, Payload2]), + ok = publish_many(messages(Topic, [Payload1, Payload2])), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, @@ -591,7 +731,7 @@ t_publish_while_client_is_gone(Config) -> {ok, _} = emqtt:ConnFun(Client2), Msgs = receive_messages(2), ?assertMatch([_, _], Msgs), - [Msg2, Msg1] = Msgs, + [Msg1, Msg2] = Msgs, ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), ?assertEqual({ok, 2}, maps:find(qos, Msg1)), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),