diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 6ccbcc16b..211edc2b1 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -205,7 +205,8 @@ t_handle_in_qos2_publish_with_error_return(_) -> ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), - Channel = channel(#{conn_state => connected, session => Session}), + Channel = emqx_channel:ensure_timer(await_timer, 2000, + channel(#{conn_state => connected, session => Session})), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = emqx_channel:handle_in(Publish1, Channel), diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index d45e7a8cf..83fe09e13 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -41,12 +41,24 @@ <<"TopicA/#">> ]). +-define(do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS), + receive + {tcp, SOCKET, PACKET} -> EXPRESS + after TIMEOUT -> + ct:fail({receive_timeout, TIMEOUT}) + end). + +-define(receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS), + fun() -> + ?do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS) + end()). all() -> [{group, mqttv3}, {group, mqttv4}, {group, mqttv5}, - {group, others} + {group, others}, + {group, bugfixes} ]. groups() -> @@ -73,6 +85,10 @@ groups() -> t_certcn_as_clientid_default_config_tls, t_certcn_as_clientid_tlsv1_3, t_certcn_as_clientid_tlsv1_2 + ]}, + {bugfixes, [non_parallel_tests], + [ t_qos2_no_pubrel_received + , t_retry_timer_after_session_taken_over ]} ]. @@ -84,6 +100,18 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +init_per_testcase(Func, Cfg) -> + maybe_run_fun(setup, Func, Cfg). + +end_per_testcase(Func, Cfg) -> + maybe_run_fun(teardown, Func, Cfg). + +maybe_run_fun(Tag, Func, Cfg) -> + try ?MODULE:Func(Tag, Cfg) + catch + error:undef -> Cfg + end. + set_special_confs(emqx) -> emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]); set_special_confs(_) -> @@ -297,12 +325,193 @@ t_certcn_as_clientid_tlsv1_3(_) -> t_certcn_as_clientid_tlsv1_2(_) -> tls_certcn_as_clientid('tlsv1.2'). +t_qos2_no_pubrel_received(setup, Cfg) -> + OldMaxAwaitRel = emqx_zone:get_env(external, max_awaiting_rel), + OldWaitTimeout = emqx_zone:get_env(external, await_rel_timeout), + emqx_zone:set_env(external, max_awaiting_rel, 2), + emqx_zone:set_env(external, await_rel_timeout, 1), + [{old_max_await_rel, OldMaxAwaitRel}, + {old_wait_timeout, OldWaitTimeout} | Cfg]; +t_qos2_no_pubrel_received(teardown, Cfg) -> + OldMaxAwaitRel = ?config(old_max_await_rel, Cfg), + OldWaitTimeout = ?config(old_wait_timeout, Cfg), + emqx_zone:set_env(external, max_awaiting_rel, OldMaxAwaitRel), + emqx_zone:set_env(external, await_rel_timeout, OldWaitTimeout). +t_qos2_no_pubrel_received(_) -> + %% [The Scenario]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client --- PUB: QoS2 ---> EMQX + %% Client --- PUB: QoS2 ---> EMQX + %% Client --- ... ---> EMQX + %% Client <---- PUBREC ---- EMQX + %% Client <---- PUBREC ---- EMQX + %% Client <---- ... ---- EMQX + %% Client --- PUBREL --X--> EMQX (PUBREL not received) + %% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX + %% + %% [A few hours later..]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client --- PUB: QoS2 ---> EMQX + %% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX (we should clear the awaiting_rel queue but it is still full). + ct:pal("1. reconnect after awaiting_rel is cleared"), + qos2_no_pubrel_received(fun + (1) -> timer:sleep(1500); %% reconnect 1.5s later, ensure the await_rel_timeout triggered + (2) -> ok + end), + ct:pal("2. reconnect before awaiting_rel is cleared"), + qos2_no_pubrel_received(fun + (1) -> ok; %% reconnect as fast as possiable, ensure the await_rel_timeout NOT triggered + (2) -> timer:sleep(1500) %% send msgs 1.5s later, ensure the await_rel_timeout triggered + end). +qos2_no_pubrel_received(Actions) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Topic = <<"t/foo">>, + {ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [], 3000), + gen_tcp:send(Sock, make_connect_packet(ClientId, false)), + timer:sleep(100), + ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 100, <<"foo">>)), + ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 101, <<"foo">>)), + %% Send the 3rd publish with id 103 will got disconnected: + %% - "Dropped the qos2 packet 103 due to awaiting_rel is full". + ?assertMatch(ok, gen_tcp:send(Sock, make_publish_packet(Topic, 2, 103, <<"foo">>))), + %% not the connections should be closed by the server due to ?RC_RECEIVE_MAXIMUM_EXCEEDED + receive + {tcp_closed, Sock} -> ok + after 500 -> + ct:fail({wait_tcp_close_timeout, 500}) + end, + + %% before reconnecting + action_point(Actions, 1), + + {ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, [], 3000), + gen_tcp:send(Sock1, make_connect_packet(ClientId, false)), + + %% after connected and before sending any msgs + action_point(Actions, 2), + + ok = gen_tcp:send(Sock1, make_publish_packet(Topic, 2, 104, <<"foo">>)), + + receive + {tcp_closed, Sock1} -> + %% this is the bug, the wait_rel queue should have expired but not cleared + ct:fail(unexpected_disconnect) + after 500 -> ok + end, + gen_tcp:close(Sock1), + ok. + +t_retry_timer_after_session_taken_over(setup, Cfg) -> + %emqx_logger:set_log_level(debug), + OldRetryInterval = emqx_zone:get_env(external, retry_interval), + emqx_zone:set_env(external, retry_interval, 1), + [{old_retry_interval, OldRetryInterval} | Cfg]; +t_retry_timer_after_session_taken_over(teardown, Cfg) -> + %emqx_logger:set_log_level(warning), + OldRetryInterval = ?config(old_retry_interval, Cfg), + emqx_zone:set_env(external, retry_interval, OldRetryInterval). +t_retry_timer_after_session_taken_over(_) -> + %% [The Scenario]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client <--- PUB: QoS1 --- EMQX + %% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK) + %% Client <---- PUB: QoS1 ---- EMQX (resend the PUBLISH msg) + %% Client --- DISCONN ---> EMQX + %% [A few seconds later..]: + %% Client --- CONN: clean_session=false --> EMQX + %% Client <--- PUB: QoS1 --- EMQX (resume session and resend the inflight messages) + %% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK) + TcpOpts = [binary, {active, true}], + + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Topic = <<"t/foo">>, + Payload = <<"DO NOT REPLY!">>, + + %% CONNECT + {ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000), + ok = gen_tcp:send(Sock1, make_connect_packet(ClientId, false)), + ?receive_tcp_packet(1000, Sock1, _, ok), + + %% SUBSCRIBE + ok = gen_tcp:send(Sock1, make_subscribe_packet(Topic, 2, 1)), + ?receive_tcp_packet(200, Sock1, _, ok), + + emqx_broker:publish(emqx_message:make(<<"publisher">>, 1, Topic, Payload)), + %% note that here we don't reply the publish with puback + ?receive_tcp_packet(200, Sock1, PubPacket1, + begin + ?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _}, + emqx_frame:parse(PubPacket1)) + end), + + ok = gen_tcp:close(Sock1), + timer:sleep(100), + + %% CONNECT again + {ok, Sock2} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000), + ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)), + ConnAckRem = ?do_receive_tcp_packet(200, Sock2, ConnPack2, begin + ConnAck = iolist_to_binary(make_connack_packet(?CONNACK_ACCEPT, 1)), + ct:pal("--- connack: ~p, got: ~p", [ConnAck, ConnPack2]), + <> = ConnPack2, + Rem + end), + + %% emqx should resend the non-ACKed messages now + PubPacket2 = case ConnAckRem of + <<>> -> + ?receive_tcp_packet(200, Sock2, Packet, Packet); + _ -> + ConnAckRem + end, + ?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _}, + emqx_frame:parse(PubPacket2)), + + %% ... and emqx should resend the message 1s later, as we didn't ACK it. + ?receive_tcp_packet(1200, Sock2, Packet, + begin + ?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _}, + emqx_frame:parse(Packet)) + end), + + %% we ACK it now, then emqx should stop the resending + ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)), + receive + {tcp, Sock2, _PACKET} -> + {ok, MqttPacket, <<>>, _} = emqx_frame:parse(_PACKET), + ?assertNotMatch(?PUBLISH_PACKET(_), MqttPacket) + after 1200 -> ok + end, + ok = gen_tcp:close(Sock1). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- +action_point(Action, N) -> + Action(N). + +make_connect_packet(ClientId, CleanStart) -> + emqx_frame:serialize(?CONNECT_PACKET( + #mqtt_packet_connect{ + proto_name = <<"MQTT">>, + clean_start = CleanStart, + keepalive = 0, + clientid = ClientId + } + )). +make_connack_packet(Code, SP) -> + emqx_frame:serialize(?CONNACK_PACKET(Code, SP)). + +make_publish_packet(Topic, QoS, PacketId, Payload) -> + emqx_frame:serialize( + ?PUBLISH_PACKET(QoS, Topic, PacketId, Payload)). + +make_subscribe_packet(TopicFileter, QoS, PacketId) -> + emqx_frame:serialize( + ?SUBSCRIBE_PACKET(PacketId, [{TopicFileter, #{rh => 0, rap => 0, nl => 0, qos => QoS}}])). + recv_msgs(Count) -> recv_msgs(Count, []).