chore: sync test/emqx_client_SUITE.erl from ee to ce

This commit is contained in:
Zaiming (Stone) Shi 2022-12-28 14:34:54 +01:00
parent 79f51e46a4
commit 2662c52a88
2 changed files with 212 additions and 2 deletions

View File

@ -205,7 +205,8 @@ t_handle_in_qos2_publish_with_error_return(_) ->
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}), Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
Channel = channel(#{conn_state => connected, session => Session}), Channel = emqx_channel:ensure_timer(await_timer, 2000,
channel(#{conn_state => connected, session => Session})),
Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} = {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
emqx_channel:handle_in(Publish1, Channel), emqx_channel:handle_in(Publish1, Channel),

View File

@ -41,12 +41,24 @@
<<"TopicA/#">> <<"TopicA/#">>
]). ]).
-define(do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
receive
{tcp, SOCKET, PACKET} -> EXPRESS
after TIMEOUT ->
ct:fail({receive_timeout, TIMEOUT})
end).
-define(receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS),
fun() ->
?do_receive_tcp_packet(TIMEOUT, SOCKET, PACKET, EXPRESS)
end()).
all() -> all() ->
[{group, mqttv3}, [{group, mqttv3},
{group, mqttv4}, {group, mqttv4},
{group, mqttv5}, {group, mqttv5},
{group, others} {group, others},
{group, bugfixes}
]. ].
groups() -> groups() ->
@ -73,6 +85,10 @@ groups() ->
t_certcn_as_clientid_default_config_tls, t_certcn_as_clientid_default_config_tls,
t_certcn_as_clientid_tlsv1_3, t_certcn_as_clientid_tlsv1_3,
t_certcn_as_clientid_tlsv1_2 t_certcn_as_clientid_tlsv1_2
]},
{bugfixes, [non_parallel_tests],
[ t_qos2_no_pubrel_received
, t_retry_timer_after_session_taken_over
]} ]}
]. ].
@ -84,6 +100,18 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_testcase(Func, Cfg) ->
maybe_run_fun(setup, Func, Cfg).
end_per_testcase(Func, Cfg) ->
maybe_run_fun(teardown, Func, Cfg).
maybe_run_fun(Tag, Func, Cfg) ->
try ?MODULE:Func(Tag, Cfg)
catch
error:undef -> Cfg
end.
set_special_confs(emqx) -> set_special_confs(emqx) ->
emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]); emqx_ct_helpers:change_emqx_opts(ssl_twoway, [{peer_cert_as_username, cn}]);
set_special_confs(_) -> set_special_confs(_) ->
@ -297,12 +325,193 @@ t_certcn_as_clientid_tlsv1_3(_) ->
t_certcn_as_clientid_tlsv1_2(_) -> t_certcn_as_clientid_tlsv1_2(_) ->
tls_certcn_as_clientid('tlsv1.2'). tls_certcn_as_clientid('tlsv1.2').
t_qos2_no_pubrel_received(setup, Cfg) ->
OldMaxAwaitRel = emqx_zone:get_env(external, max_awaiting_rel),
OldWaitTimeout = emqx_zone:get_env(external, await_rel_timeout),
emqx_zone:set_env(external, max_awaiting_rel, 2),
emqx_zone:set_env(external, await_rel_timeout, 1),
[{old_max_await_rel, OldMaxAwaitRel},
{old_wait_timeout, OldWaitTimeout} | Cfg];
t_qos2_no_pubrel_received(teardown, Cfg) ->
OldMaxAwaitRel = ?config(old_max_await_rel, Cfg),
OldWaitTimeout = ?config(old_wait_timeout, Cfg),
emqx_zone:set_env(external, max_awaiting_rel, OldMaxAwaitRel),
emqx_zone:set_env(external, await_rel_timeout, OldWaitTimeout).
t_qos2_no_pubrel_received(_) ->
%% [The Scenario]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client --- PUB: QoS2 ---> EMQX
%% Client --- PUB: QoS2 ---> EMQX
%% Client --- ... ---> EMQX
%% Client <---- PUBREC ---- EMQX
%% Client <---- PUBREC ---- EMQX
%% Client <---- ... ---- EMQX
%% Client --- PUBREL --X--> EMQX (PUBREL not received)
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX
%%
%% [A few hours later..]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client --- PUB: QoS2 ---> EMQX
%% Client <--- DISCONN: RC_RECEIVE_MAXIMUM_EXCEEDED --- EMQX (we should clear the awaiting_rel queue but it is still full).
ct:pal("1. reconnect after awaiting_rel is cleared"),
qos2_no_pubrel_received(fun
(1) -> timer:sleep(1500); %% reconnect 1.5s later, ensure the await_rel_timeout triggered
(2) -> ok
end),
ct:pal("2. reconnect before awaiting_rel is cleared"),
qos2_no_pubrel_received(fun
(1) -> ok; %% reconnect as fast as possiable, ensure the await_rel_timeout NOT triggered
(2) -> timer:sleep(1500) %% send msgs 1.5s later, ensure the await_rel_timeout triggered
end).
qos2_no_pubrel_received(Actions) ->
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Topic = <<"t/foo">>,
{ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
gen_tcp:send(Sock, make_connect_packet(ClientId, false)),
timer:sleep(100),
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 100, <<"foo">>)),
ok = gen_tcp:send(Sock, make_publish_packet(Topic, 2, 101, <<"foo">>)),
%% Send the 3rd publish with id 103 will got disconnected:
%% - "Dropped the qos2 packet 103 due to awaiting_rel is full".
?assertMatch(ok, gen_tcp:send(Sock, make_publish_packet(Topic, 2, 103, <<"foo">>))),
%% not the connections should be closed by the server due to ?RC_RECEIVE_MAXIMUM_EXCEEDED
receive
{tcp_closed, Sock} -> ok
after 500 ->
ct:fail({wait_tcp_close_timeout, 500})
end,
%% before reconnecting
action_point(Actions, 1),
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, [], 3000),
gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
%% after connected and before sending any msgs
action_point(Actions, 2),
ok = gen_tcp:send(Sock1, make_publish_packet(Topic, 2, 104, <<"foo">>)),
receive
{tcp_closed, Sock1} ->
%% this is the bug, the wait_rel queue should have expired but not cleared
ct:fail(unexpected_disconnect)
after 500 -> ok
end,
gen_tcp:close(Sock1),
ok.
t_retry_timer_after_session_taken_over(setup, Cfg) ->
%emqx_logger:set_log_level(debug),
OldRetryInterval = emqx_zone:get_env(external, retry_interval),
emqx_zone:set_env(external, retry_interval, 1),
[{old_retry_interval, OldRetryInterval} | Cfg];
t_retry_timer_after_session_taken_over(teardown, Cfg) ->
%emqx_logger:set_log_level(warning),
OldRetryInterval = ?config(old_retry_interval, Cfg),
emqx_zone:set_env(external, retry_interval, OldRetryInterval).
t_retry_timer_after_session_taken_over(_) ->
%% [The Scenario]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client <--- PUB: QoS1 --- EMQX
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
%% Client <---- PUB: QoS1 ---- EMQX (resend the PUBLISH msg)
%% Client --- DISCONN ---> EMQX
%% [A few seconds later..]:
%% Client --- CONN: clean_session=false --> EMQX
%% Client <--- PUB: QoS1 --- EMQX (resume session and resend the inflight messages)
%% Client --X-- PUBACK ----> EMQX (the client doesn't send PUBACK)
TcpOpts = [binary, {active, true}],
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Topic = <<"t/foo">>,
Payload = <<"DO NOT REPLY!">>,
%% CONNECT
{ok, Sock1} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
ok = gen_tcp:send(Sock1, make_connect_packet(ClientId, false)),
?receive_tcp_packet(1000, Sock1, _, ok),
%% SUBSCRIBE
ok = gen_tcp:send(Sock1, make_subscribe_packet(Topic, 2, 1)),
?receive_tcp_packet(200, Sock1, _, ok),
emqx_broker:publish(emqx_message:make(<<"publisher">>, 1, Topic, Payload)),
%% note that here we don't reply the publish with puback
?receive_tcp_packet(200, Sock1, PubPacket1,
begin
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
emqx_frame:parse(PubPacket1))
end),
ok = gen_tcp:close(Sock1),
timer:sleep(100),
%% CONNECT again
{ok, Sock2} = gen_tcp:connect("127.0.0.1", 1883, TcpOpts, 3000),
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
ConnAckRem = ?do_receive_tcp_packet(200, Sock2, ConnPack2, begin
ConnAck = iolist_to_binary(make_connack_packet(?CONNACK_ACCEPT, 1)),
ct:pal("--- connack: ~p, got: ~p", [ConnAck, ConnPack2]),
<<ConnAck:4/binary, Rem/binary>> = ConnPack2,
Rem
end),
%% emqx should resend the non-ACKed messages now
PubPacket2 = case ConnAckRem of
<<>> ->
?receive_tcp_packet(200, Sock2, Packet, Packet);
_ ->
ConnAckRem
end,
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
emqx_frame:parse(PubPacket2)),
%% ... and emqx should resend the message 1s later, as we didn't ACK it.
?receive_tcp_packet(1200, Sock2, Packet,
begin
?assertMatch({ok, ?PUBLISH_PACKET(1, Topic, 1, Payload), <<>>, _},
emqx_frame:parse(Packet))
end),
%% we ACK it now, then emqx should stop the resending
ok = gen_tcp:send(Sock2, make_connect_packet(ClientId, false)),
receive
{tcp, Sock2, _PACKET} ->
{ok, MqttPacket, <<>>, _} = emqx_frame:parse(_PACKET),
?assertNotMatch(?PUBLISH_PACKET(_), MqttPacket)
after 1200 -> ok
end,
ok = gen_tcp:close(Sock1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
action_point(Action, N) ->
Action(N).
make_connect_packet(ClientId, CleanStart) ->
emqx_frame:serialize(?CONNECT_PACKET(
#mqtt_packet_connect{
proto_name = <<"MQTT">>,
clean_start = CleanStart,
keepalive = 0,
clientid = ClientId
}
)).
make_connack_packet(Code, SP) ->
emqx_frame:serialize(?CONNACK_PACKET(Code, SP)).
make_publish_packet(Topic, QoS, PacketId, Payload) ->
emqx_frame:serialize(
?PUBLISH_PACKET(QoS, Topic, PacketId, Payload)).
make_subscribe_packet(TopicFileter, QoS, PacketId) ->
emqx_frame:serialize(
?SUBSCRIBE_PACKET(PacketId, [{TopicFileter, #{rh => 0, rap => 0, nl => 0, qos => QoS}}])).
recv_msgs(Count) -> recv_msgs(Count) ->
recv_msgs(Count, []). recv_msgs(Count, []).