fix(test): flaky mqtt expiry test case. (#6099)

This commit is contained in:
zhongwencool 2021-11-10 09:55:36 +08:00 committed by GitHub
parent ac23214447
commit 2d159ad9a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 64 additions and 39 deletions

View File

@ -62,79 +62,104 @@ t_conn_stats(_) ->
t_tcp_sock_passive(_) ->
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
t_message_expiry_interval_1(_) ->
ClientA = message_expiry_interval_init(),
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
emqtt:stop(ClientA).
t_message_expiry_interval(_) ->
{CPublish, CControl} = message_expiry_interval_init(),
[message_expiry_interval_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]],
emqtt:stop(CPublish),
emqtt:stop(CControl).
t_message_expiry_interval_2(_) ->
ClientA = message_expiry_interval_init(),
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
emqtt:stop(ClientA).
t_message_not_expiry_interval(_) ->
{CPublish, CControl} = message_expiry_interval_init(),
[message_expiry_interval_not_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]],
emqtt:stop(CPublish),
emqtt:stop(CControl).
message_expiry_interval_init() ->
{ok, ClientA} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"client-a">>},
{ok, CPublish} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"Client-Publish">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, ClientB} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"client-b">>},
{ok, CVerify} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"Client-Verify">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(ClientA),
{ok, _} = emqtt:connect(ClientB),
%% subscribe and disconnect client-b
emqtt:subscribe(ClientB, <<"t/a">>, 1),
emqtt:stop(ClientB),
ClientA.
{ok, CControl} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"Client-Control">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(CPublish),
{ok, _} = emqtt:connect(CVerify),
{ok, _} = emqtt:connect(CControl),
%% subscribe and disconnect Client-verify
emqtt:subscribe(CControl, <<"t/a">>, 1),
emqtt:subscribe(CVerify, <<"t/a">>, 1),
emqtt:stop(CVerify),
{CPublish, CControl}.
message_expiry_interval_exipred(ClientA, QoS) ->
message_expiry_interval_exipred(CPublish, CControl, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a and waiting for the message expired
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
ct:sleep(1500),
emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 1},
<<"this will be purged in 1s">>, [{qos, QoS}]),
%% CControl make sure publish already store in broker.
receive
{publish,#{client_pid := CControl, topic := <<"t/a">>}} ->
ok
after 1000 ->
ct:fail(should_receive_publish)
end,
ct:sleep(1100),
%% resume the session for client-b
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"client-b">>},
%% resume the session for Client-Verify
{ok, CVerify} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"Client-Verify">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(ClientB1),
{ok, _} = emqtt:connect(CVerify),
%% verify client-b could not receive the publish message
%% verify Client-Verify could not receive the publish message
receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
{publish,#{client_pid := CVerify, topic := <<"t/a">>}} ->
ct:fail(should_have_expired)
after 300 ->
ok
end,
emqtt:stop(ClientB1).
emqtt:stop(CVerify).
message_expiry_interval_not_exipred(ClientA, QoS) ->
message_expiry_interval_not_exipred(CPublish, CControl, QoS) ->
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
%% publish to t/a
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 20},
<<"this will be purged in 20s">>, [{qos, QoS}]),
%% wait for 1s and then resume the session for client-b, the message should not expires
%% CControl make sure publish already store in broker.
receive
{publish,#{client_pid := CControl, topic := <<"t/a">>}} ->
ok
after 1000 ->
ct:fail(should_receive_publish)
end,
%% wait for 1.2s and then resume the session for Client-Verify, the message should not expires
%% as Message-Expiry-Interval = 20s
ct:sleep(1000),
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"client-b">>},
ct:sleep(1200),
{ok, CVerify} = emqtt:start_link([{proto_ver,v5},
{clientid, <<"Client-Verify">>},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 360}}]),
{ok, _} = emqtt:connect(ClientB1),
{ok, _} = emqtt:connect(CVerify),
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
%% verify Client-Verify could receive the publish message and the Message-Expiry-Interval is set
receive
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
{publish,#{client_pid := CVerify, topic := <<"t/a">>,
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
when MsgExpItvl < 20 -> ok;
when MsgExpItvl =< 20 -> ok;
{publish, _} = Msg ->
ct:fail({incorrect_publish, Msg})
after 300 ->
ct:fail(no_publish_received)
end,
emqtt:stop(ClientB1).
emqtt:stop(CVerify).
with_client(TestFun, _Options) ->
ClientId = <<"t_conn">>,