fix(test): flaky mqtt expiry test case. (#6112)
This commit is contained in:
parent
a81140fd00
commit
fa34d8353e
|
@ -62,79 +62,104 @@ t_conn_stats(_) ->
|
||||||
t_tcp_sock_passive(_) ->
|
t_tcp_sock_passive(_) ->
|
||||||
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
|
with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []).
|
||||||
|
|
||||||
t_message_expiry_interval_1(_) ->
|
t_message_expiry_interval(_) ->
|
||||||
ClientA = message_expiry_interval_init(),
|
{CPublish, CControl} = message_expiry_interval_init(),
|
||||||
[message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
[message_expiry_interval_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]],
|
||||||
emqtt:stop(ClientA).
|
emqtt:stop(CPublish),
|
||||||
|
emqtt:stop(CControl).
|
||||||
|
|
||||||
t_message_expiry_interval_2(_) ->
|
t_message_not_expiry_interval(_) ->
|
||||||
ClientA = message_expiry_interval_init(),
|
{CPublish, CControl} = message_expiry_interval_init(),
|
||||||
[message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]],
|
[message_expiry_interval_not_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]],
|
||||||
emqtt:stop(ClientA).
|
emqtt:stop(CPublish),
|
||||||
|
emqtt:stop(CControl).
|
||||||
|
|
||||||
message_expiry_interval_init() ->
|
message_expiry_interval_init() ->
|
||||||
{ok, ClientA} = emqtt:start_link([{proto_ver,v5},
|
{ok, CPublish} = emqtt:start_link([{proto_ver,v5},
|
||||||
{clientid, <<"client-a">>},
|
{clientid, <<"Client-Publish">>},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, ClientB} = emqtt:start_link([{proto_ver,v5},
|
{ok, CVerify} = emqtt:start_link([{proto_ver,v5},
|
||||||
{clientid, <<"client-b">>},
|
{clientid, <<"Client-Verify">>},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, _} = emqtt:connect(ClientA),
|
{ok, CControl} = emqtt:start_link([{proto_ver,v5},
|
||||||
{ok, _} = emqtt:connect(ClientB),
|
{clientid, <<"Client-Control">>},
|
||||||
%% subscribe and disconnect client-b
|
{clean_start, false},
|
||||||
emqtt:subscribe(ClientB, <<"t/a">>, 1),
|
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
emqtt:stop(ClientB),
|
{ok, _} = emqtt:connect(CPublish),
|
||||||
ClientA.
|
{ok, _} = emqtt:connect(CVerify),
|
||||||
|
{ok, _} = emqtt:connect(CControl),
|
||||||
|
%% subscribe and disconnect Client-verify
|
||||||
|
emqtt:subscribe(CControl, <<"t/a">>, 1),
|
||||||
|
emqtt:subscribe(CVerify, <<"t/a">>, 1),
|
||||||
|
emqtt:stop(CVerify),
|
||||||
|
{CPublish, CControl}.
|
||||||
|
|
||||||
message_expiry_interval_exipred(ClientA, QoS) ->
|
message_expiry_interval_exipred(CPublish, CControl, QoS) ->
|
||||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||||
%% publish to t/a and waiting for the message expired
|
%% publish to t/a and waiting for the message expired
|
||||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 1},
|
||||||
ct:sleep(2000),
|
<<"this will be purged in 1s">>, [{qos, QoS}]),
|
||||||
|
%% CControl make sure publish already store in broker.
|
||||||
|
receive
|
||||||
|
{publish,#{client_pid := CControl, topic := <<"t/a">>}} ->
|
||||||
|
ok
|
||||||
|
after 1000 ->
|
||||||
|
ct:fail(should_receive_publish)
|
||||||
|
end,
|
||||||
|
ct:sleep(1100),
|
||||||
|
|
||||||
%% resume the session for client-b
|
%% resume the session for Client-Verify
|
||||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
|
{ok, CVerify} = emqtt:start_link([{proto_ver,v5},
|
||||||
{clientid, <<"client-b">>},
|
{clientid, <<"Client-Verify">>},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, _} = emqtt:connect(ClientB1),
|
{ok, _} = emqtt:connect(CVerify),
|
||||||
|
|
||||||
%% verify client-b could not receive the publish message
|
%% verify Client-Verify could not receive the publish message
|
||||||
receive
|
receive
|
||||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>}} ->
|
{publish,#{client_pid := CVerify, topic := <<"t/a">>}} ->
|
||||||
ct:fail(should_have_expired)
|
ct:fail(should_have_expired)
|
||||||
after 300 ->
|
after 300 ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
emqtt:stop(ClientB1).
|
emqtt:stop(CVerify).
|
||||||
|
|
||||||
message_expiry_interval_not_exipred(ClientA, QoS) ->
|
message_expiry_interval_not_exipred(CPublish, CControl, QoS) ->
|
||||||
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
ct:pal("~p ~p", [?FUNCTION_NAME, QoS]),
|
||||||
%% publish to t/a
|
%% publish to t/a
|
||||||
emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]),
|
emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 20},
|
||||||
|
<<"this will be purged in 20s">>, [{qos, QoS}]),
|
||||||
|
|
||||||
%% wait for 1s and then resume the session for client-b, the message should not expires
|
%% CControl make sure publish already store in broker.
|
||||||
|
receive
|
||||||
|
{publish,#{client_pid := CControl, topic := <<"t/a">>}} ->
|
||||||
|
ok
|
||||||
|
after 1000 ->
|
||||||
|
ct:fail(should_receive_publish)
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% wait for 1.2s and then resume the session for Client-Verify, the message should not expires
|
||||||
%% as Message-Expiry-Interval = 20s
|
%% as Message-Expiry-Interval = 20s
|
||||||
ct:sleep(1000),
|
ct:sleep(1200),
|
||||||
{ok, ClientB1} = emqtt:start_link([{proto_ver,v5},
|
{ok, CVerify} = emqtt:start_link([{proto_ver,v5},
|
||||||
{clientid, <<"client-b">>},
|
{clientid, <<"Client-Verify">>},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
{properties, #{'Session-Expiry-Interval' => 360}}]),
|
||||||
{ok, _} = emqtt:connect(ClientB1),
|
{ok, _} = emqtt:connect(CVerify),
|
||||||
|
|
||||||
%% verify client-b could receive the publish message and the Message-Expiry-Interval is set
|
%% verify Client-Verify could receive the publish message and the Message-Expiry-Interval is set
|
||||||
receive
|
receive
|
||||||
{publish,#{client_pid := ClientB1, topic := <<"t/a">>,
|
{publish,#{client_pid := CVerify, topic := <<"t/a">>,
|
||||||
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
properties := #{'Message-Expiry-Interval' := MsgExpItvl}}}
|
||||||
when MsgExpItvl < 20 -> ok;
|
when MsgExpItvl =< 20 -> ok;
|
||||||
{publish, _} = Msg ->
|
{publish, _} = Msg ->
|
||||||
ct:fail({incorrect_publish, Msg})
|
ct:fail({incorrect_publish, Msg})
|
||||||
after 300 ->
|
after 300 ->
|
||||||
ct:fail(no_publish_received)
|
ct:fail(no_publish_received)
|
||||||
end,
|
end,
|
||||||
emqtt:stop(ClientB1).
|
emqtt:stop(CVerify).
|
||||||
|
|
||||||
with_client(TestFun, _Options) ->
|
with_client(TestFun, _Options) ->
|
||||||
ClientId = <<"t_conn">>,
|
ClientId = <<"t_conn">>,
|
||||||
|
@ -156,6 +181,15 @@ t_async_set_keepalive('end', _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_async_set_keepalive(_) ->
|
t_async_set_keepalive(_) ->
|
||||||
|
case os:type() of
|
||||||
|
{unix, darwin} ->
|
||||||
|
%% Mac OSX don't support the feature
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
do_async_set_keepalive()
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_async_set_keepalive() ->
|
||||||
ClientID = <<"client-tcp-keepalive">>,
|
ClientID = <<"client-tcp-keepalive">>,
|
||||||
{ok, Client} = emqtt:start_link([{host, "localhost"},
|
{ok, Client} = emqtt:start_link([{host, "localhost"},
|
||||||
{proto_ver,v5},
|
{proto_ver,v5},
|
||||||
|
|
Loading…
Reference in New Issue