Merge pull request #4944 from zmstone/fix-flaky-tests-emqx-sn

test(emqx_sn): an attempt to fix flaky tests
This commit is contained in:
Zaiming (Stone) Shi 2021-06-07 09:04:32 +02:00 committed by GitHub
commit ad43209fc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 83 additions and 55 deletions

View File

@ -47,6 +47,9 @@
% FLAG NOT USED % FLAG NOT USED
-define(FNU, 0). -define(FNU, 0).
%% erlang:system_time should be unique and random enough
-define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-",
integer_to_list(erlang:system_time())])).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -55,6 +58,7 @@ all() ->
emqx_ct:all(?MODULE). emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
logger:set_module_level(emqx_sn_gateway, debug),
emqx_ct_helpers:start_apps([emqx_sn], fun set_special_confs/1), emqx_ct_helpers:start_apps([emqx_sn], fun set_special_confs/1),
Config. Config.
@ -96,7 +100,8 @@ t_connect(_) ->
t_do_2nd_connect(_) -> t_do_2nd_connect(_) ->
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
timer:sleep(100), timer:sleep(100),
send_connect_msg(Socket, <<"client_id_other">>), send_connect_msg(Socket, <<"client_id_other">>),
@ -116,7 +121,8 @@ t_subscribe(_) ->
TopicId = ?MAX_PRED_TOPIC_ID + 1, TopicId = ?MAX_PRED_TOPIC_ID + 1,
ReturnCode = 0, ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
TopicName1 = <<"abcD">>, TopicName1 = <<"abcD">>,
send_register_msg(Socket, TopicName1, MsgId), send_register_msg(Socket, TopicName1, MsgId),
@ -125,12 +131,12 @@ t_subscribe(_) ->
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16,
MsgId:16, ReturnCode>>, receive_response(Socket)), MsgId:16, ReturnCode>>, receive_response(Socket)),
?assertEqual([TopicName1], emqx_broker:topics()), ?assert(lists:member(TopicName1, emqx_broker:topics())),
send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId), send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId),
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
timer:sleep(100), timer:sleep(100),
?assertEqual([], emqx_broker:topics()), ?assertNot(lists:member(TopicName1, emqx_broker:topics())),
send_disconnect_msg(Socket, undefined), send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -146,7 +152,8 @@ t_subscribe_case01(_) ->
TopicId = ?MAX_PRED_TOPIC_ID + 1, TopicId = ?MAX_PRED_TOPIC_ID + 1,
ReturnCode = 0, ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
TopicName1 = <<"abcD">>, TopicName1 = <<"abcD">>,
@ -176,7 +183,8 @@ t_subscribe_case02(_) ->
ReturnCode = 0, ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ?CLIENTID),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic1 = ?PREDEF_TOPIC_NAME1, Topic1 = ?PREDEF_TOPIC_NAME1,
@ -206,7 +214,7 @@ t_subscribe_case03(_) ->
ReturnCode = 0, ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"ClientA">>, ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId), send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
@ -234,7 +242,8 @@ t_subscribe_case04(_) ->
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1 TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
ReturnCode = 0, ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"client_id_test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic1 = ?PREDEF_TOPIC_NAME1, Topic1 = ?PREDEF_TOPIC_NAME1,
send_register_msg(Socket, Topic1, MsgId), send_register_msg(Socket, Topic1, MsgId),
@ -263,7 +272,7 @@ t_subscribe_case05(_) ->
TopicId2 = ?MAX_PRED_TOPIC_ID + 2, TopicId2 = ?MAX_PRED_TOPIC_ID + 2,
ReturnCode = 0, ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"testu">>, ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId), send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
@ -304,8 +313,9 @@ t_subscribe_case06(_) ->
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
TopicId2 = ?MAX_PRED_TOPIC_ID + 2, TopicId2 = ?MAX_PRED_TOPIC_ID + 2,
ReturnCode = 0, ReturnCode = 0,
ClientId = ?CLIENTID,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_register_msg(Socket, <<"abc">>, MsgId), send_register_msg(Socket, <<"abc">>, MsgId),
@ -340,7 +350,8 @@ t_subscribe_case07(_) ->
TopicId1 = ?MAX_PRED_TOPIC_ID + 2, TopicId1 = ?MAX_PRED_TOPIC_ID + 2,
TopicId2 = ?MAX_PRED_TOPIC_ID + 3, TopicId2 = ?MAX_PRED_TOPIC_ID + 3,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
@ -362,7 +373,8 @@ t_subscribe_case08(_) ->
MsgId = 1, MsgId = 1,
TopicId2 = 2, TopicId2 = 2,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId), send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId),
@ -383,7 +395,8 @@ t_publish_negqos_case09(_) ->
MsgId = 1, MsgId = 1,
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic = <<"abc">>, Topic = <<"abc">>,
@ -416,7 +429,8 @@ t_publish_qos0_case01(_) ->
MsgId = 1, MsgId = 1,
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic = <<"abc">>, Topic = <<"abc">>,
@ -447,7 +461,8 @@ t_publish_qos0_case02(_) ->
MsgId = 1, MsgId = 1,
PredefTopicId = ?PREDEF_TOPIC_ID1, PredefTopicId = ?PREDEF_TOPIC_ID1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
@ -476,7 +491,8 @@ t_publish_qos0_case3(_) ->
MsgId = 1, MsgId = 1,
TopicId = ?MAX_PRED_TOPIC_ID + 1, TopicId = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic = <<"/a/b/c">>, Topic = <<"/a/b/c">>,
@ -506,7 +522,8 @@ t_publish_qos0_case04(_) ->
MsgId = 1, MsgId = 1,
TopicId0 = 0, TopicId0 = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId), send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId),
@ -536,7 +553,8 @@ t_publish_qos0_case05(_) ->
MsgId = 1, MsgId = 1,
TopicId0 = 0, TopicId0 = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId), send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
@ -556,7 +574,8 @@ t_publish_qos0_case06(_) ->
MsgId = 1, MsgId = 1,
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic = <<"abc">>, Topic = <<"abc">>,
@ -587,7 +606,8 @@ t_publish_qos1_case01(_) ->
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
Topic = <<"abc">>, Topic = <<"abc">>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
@ -613,7 +633,8 @@ t_publish_qos1_case02(_) ->
MsgId = 1, MsgId = 1,
PredefTopicId = ?PREDEF_TOPIC_ID1, PredefTopicId = ?PREDEF_TOPIC_ID1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
@ -633,7 +654,8 @@ t_publish_qos1_case03(_) ->
MsgId = 1, MsgId = 1,
TopicId5 = 5, TopicId5 = 5,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>), send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>),
?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)), ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)),
@ -651,7 +673,8 @@ t_publish_qos1_case04(_) ->
MsgId = 7, MsgId = 7,
TopicId0 = 0, TopicId0 = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId), send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
@ -677,7 +700,8 @@ t_publish_qos1_case05(_) ->
MsgId = 7, MsgId = 7,
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
@ -702,7 +726,8 @@ t_publish_qos1_case06(_) ->
MsgId = 7, MsgId = 7,
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
@ -728,7 +753,8 @@ t_publish_qos2_case01(_) ->
TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
Topic = <<"/abc">>, Topic = <<"/abc">>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId1:16, MsgId:16, ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId1:16, MsgId:16,
@ -755,7 +781,8 @@ t_publish_qos2_case02(_) ->
MsgId = 7, MsgId = 7,
PredefTopicId = ?PREDEF_TOPIC_ID2, PredefTopicId = ?PREDEF_TOPIC_ID2,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
@ -785,7 +812,8 @@ t_publish_qos2_case03(_) ->
MsgId = 7, MsgId = 7,
TopicId0 = 0, TopicId0 = 0,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>), ClientId = ?CLIENTID,
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId), send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId),
@ -811,7 +839,7 @@ t_will_case01(_) ->
WillMsg = <<10, 11, 12, 13, 14>>, WillMsg = <<10, 11, 12, 13, 14>>,
WillTopic = <<"abc">>, WillTopic = <<"abc">>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
ok = emqx_broker:subscribe(WillTopic), ok = emqx_broker:subscribe(WillTopic),
@ -846,7 +874,7 @@ t_will_test2(_) ->
Duration = 1, Duration = 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, <<"goodbye">>, QoS), send_willtopic_msg(Socket, <<"goodbye">>, QoS),
@ -870,7 +898,7 @@ t_will_test3(_) ->
Duration = 1, Duration = 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_empty_msg(Socket), send_willtopic_empty_msg(Socket),
@ -892,7 +920,7 @@ t_will_test4(_) ->
Duration = 1, Duration = 1,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, <<"abc">>, QoS), send_willtopic_msg(Socket, <<"abc">>, QoS),
@ -920,7 +948,7 @@ t_will_test5(_) ->
Duration = 5, Duration = 5,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, <<"abc">>, QoS), send_willtopic_msg(Socket, <<"abc">>, QoS),
@ -947,7 +975,7 @@ t_will_case06(_) ->
WillMsg = <<10, 11, 12, 13, 14>>, WillMsg = <<10, 11, 12, 13, 14>>,
WillTopic = <<"abc">>, WillTopic = <<"abc">>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
ok = emqx_broker:subscribe(WillTopic), ok = emqx_broker:subscribe(WillTopic),
@ -983,7 +1011,7 @@ t_asleep_test01_timeout(_) ->
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1007,7 +1035,7 @@ t_asleep_test02_to_awake_and_back(_) ->
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1022,13 +1050,13 @@ t_asleep_test02_to_awake_and_back(_) ->
timer:sleep(4500), timer:sleep(4500),
% goto awake state and back % goto awake state and back
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
timer:sleep(4500), timer:sleep(4500),
% goto awake state and back % goto awake state and back
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
%% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process %% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process
@ -1045,7 +1073,7 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
MsgId = 1000, MsgId = 1000,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1108,7 +1136,7 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1149,7 +1177,7 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
timer:sleep(300), timer:sleep(300),
% goto awake state, receive downlink messages, and go back to asleep % goto awake state, receive downlink messages, and go back to asleep
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
%% 1. get REGISTER first, since this topic has never been registered %% 1. get REGISTER first, since this topic has never been registered
UdpData1 = receive_response(Socket), UdpData1 = receive_response(Socket),
@ -1183,7 +1211,7 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1228,7 +1256,7 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
timer:sleep(50), timer:sleep(50),
% goto awake state, receive downlink messages, and go back to asleep % goto awake state, receive downlink messages, and go back to asleep
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
UdpData_reg = receive_response(Socket), UdpData_reg = receive_response(Socket),
{TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg), {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg),
@ -1261,7 +1289,7 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1304,7 +1332,7 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
timer:sleep(300), timer:sleep(300),
% goto awake state, receive downlink messages, and go back to asleep % goto awake state, receive downlink messages, and go back to asleep
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
UdpData = wrap_receive_response(Socket), UdpData = wrap_receive_response(Socket),
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
@ -1323,7 +1351,7 @@ t_asleep_test07_to_connected(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1372,7 +1400,7 @@ t_asleep_test08_to_disconnected(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1403,7 +1431,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Duration, ClientId), send_connect_msg_with_will(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1446,7 +1474,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
ok = emqtt:disconnect(C), ok = emqtt:disconnect(C),
% goto awake state, receive downlink messages, and go back to asleep % goto awake state, receive downlink messages, and go back to asleep
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
UdpData_reg = receive_response(Socket), UdpData_reg = receive_response(Socket),
{TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg), {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg),
@ -1482,7 +1510,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
%% send PINGREQ again to enter awake state %% send PINGREQ again to enter awake state
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
%% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here %% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
@ -1495,7 +1523,7 @@ t_awake_test01_to_connected(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1529,7 +1557,7 @@ t_awake_test02_to_disconnected(_) ->
WillTopic = <<"dead">>, WillTopic = <<"dead">>,
WillPayload = <<10, 11, 12, 13, 14>>, WillPayload = <<10, 11, 12, 13, 14>>,
{ok, Socket} = gen_udp:open(0, [binary]), {ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>, ClientId = ?CLIENTID,
send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS), send_willtopic_msg(Socket, WillTopic, QoS),
@ -1545,7 +1573,7 @@ t_awake_test02_to_disconnected(_) ->
timer:sleep(100), timer:sleep(100),
% goto awake state % goto awake state
send_pingreq_msg(Socket, <<"test">>), send_pingreq_msg(Socket, ClientId),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -1592,7 +1620,7 @@ send_connect_msg(Socket, ClientId) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
send_connect_msg_with_will(Socket, Duration, ClientId) -> send_connect_msg_with_will(Socket, Duration, ClientId) ->
Length = 10, Length = 6 + byte_size(ClientId),
Will = 1, Will = 1,
CleanSession = 1, CleanSession = 1,
ProtocolId = 1, ProtocolId = 1,
@ -1601,7 +1629,7 @@ send_connect_msg_with_will(Socket, Duration, ClientId) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
send_connect_msg_with_will1(Socket, Duration, ClientId) -> send_connect_msg_with_will1(Socket, Duration, ClientId) ->
Length = 10, Length = 6 + byte_size(ClientId),
Will = 1, Will = 1,
CleanSession = 0, CleanSession = 0,
ProtocolId = 1, ProtocolId = 1,