diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index ae7e7382c..3a77134b8 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -62,79 +62,104 @@ t_conn_stats(_) -> t_tcp_sock_passive(_) -> with_client(fun(CPid) -> CPid ! {tcp_passive, sock} end, []). -t_message_expiry_interval_1(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). +t_message_expiry_interval(_) -> + {CPublish, CControl} = message_expiry_interval_init(), + [message_expiry_interval_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]], + emqtt:stop(CPublish), + emqtt:stop(CControl). -t_message_expiry_interval_2(_) -> - ClientA = message_expiry_interval_init(), - [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]], - emqtt:stop(ClientA). +t_message_not_expiry_interval(_) -> + {CPublish, CControl} = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(CPublish, CControl, QoS) || QoS <- [0,1,2]], + emqtt:stop(CPublish), + emqtt:stop(CControl). message_expiry_interval_init() -> - {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-a">>}, + {ok, CPublish} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Publish">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientA), - {ok, _} = emqtt:connect(ClientB), - %% subscribe and disconnect client-b - emqtt:subscribe(ClientB, <<"t/a">>, 1), - emqtt:stop(ClientB), - ClientA. + {ok, CControl} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Control">>}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(CPublish), + {ok, _} = emqtt:connect(CVerify), + {ok, _} = emqtt:connect(CControl), + %% subscribe and disconnect Client-verify + emqtt:subscribe(CControl, <<"t/a">>, 1), + emqtt:subscribe(CVerify, <<"t/a">>, 1), + emqtt:stop(CVerify), + {CPublish, CControl}. -message_expiry_interval_exipred(ClientA, QoS) -> +message_expiry_interval_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a and waiting for the message expired - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), - ct:sleep(2000), + emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, + <<"this will be purged in 1s">>, [{qos, QoS}]), + %% CControl make sure publish already store in broker. + receive + {publish,#{client_pid := CControl, topic := <<"t/a">>}} -> + ok + after 1000 -> + ct:fail(should_receive_publish) + end, + ct:sleep(1100), - %% resume the session for client-b - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + %% resume the session for Client-Verify + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + {ok, _} = emqtt:connect(CVerify), - %% verify client-b could not receive the publish message + %% verify Client-Verify could not receive the publish message receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + {publish,#{client_pid := CVerify, topic := <<"t/a">>}} -> ct:fail(should_have_expired) after 300 -> ok end, - emqtt:stop(ClientB1). + emqtt:stop(CVerify). -message_expiry_interval_not_exipred(ClientA, QoS) -> +message_expiry_interval_not_exipred(CPublish, CControl, QoS) -> ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), %% publish to t/a - emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + emqtt:publish(CPublish, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, + <<"this will be purged in 20s">>, [{qos, QoS}]), - %% wait for 1s and then resume the session for client-b, the message should not expires + %% CControl make sure publish already store in broker. + receive + {publish,#{client_pid := CControl, topic := <<"t/a">>}} -> + ok + after 1000 -> + ct:fail(should_receive_publish) + end, + + %% wait for 1.2s and then resume the session for Client-Verify, the message should not expires %% as Message-Expiry-Interval = 20s - ct:sleep(1000), - {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, - {clientid, <<"client-b">>}, + ct:sleep(1200), + {ok, CVerify} = emqtt:start_link([{proto_ver,v5}, + {clientid, <<"Client-Verify">>}, {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 360}}]), - {ok, _} = emqtt:connect(ClientB1), + {ok, _} = emqtt:connect(CVerify), - %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + %% verify Client-Verify could receive the publish message and the Message-Expiry-Interval is set receive - {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + {publish,#{client_pid := CVerify, topic := <<"t/a">>, properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} - when MsgExpItvl < 20 -> ok; + when MsgExpItvl =< 20 -> ok; {publish, _} = Msg -> ct:fail({incorrect_publish, Msg}) after 300 -> ct:fail(no_publish_received) end, - emqtt:stop(ClientB1). + emqtt:stop(CVerify). with_client(TestFun, _Options) -> ClientId = <<"t_conn">>, @@ -156,6 +181,15 @@ t_async_set_keepalive('end', _Config) -> ok. t_async_set_keepalive(_) -> + case os:type() of + {unix, darwin} -> + %% Mac OSX don't support the feature + ok; + _ -> + do_async_set_keepalive() + end. + +do_async_set_keepalive() -> ClientID = <<"client-tcp-keepalive">>, {ok, Client} = emqtt:start_link([{host, "localhost"}, {proto_ver,v5},