test(mqttsn): more tests for topic register and subs_resume

This commit is contained in:
JianBo He 2022-03-14 17:45:46 +08:00
parent 3201d11212
commit d4c1b3acc6
1 changed files with 324 additions and 103 deletions

View File

@ -862,108 +862,6 @@ t_delivery_qos1_register_invalid_topic_id(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_delivery_takeover_and_re_register(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_disconnect_msg(NSocket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket).
t_will_case01(_) ->
QoS = 1,
Duration = 1,
@ -1725,6 +1623,326 @@ t_broadcast_test1(_) ->
timer:sleep(600),
gen_udp:close(Socket).
t_register_subs_resume_on(_) ->
application:set_env(emqx_sn, subs_resume, true),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% receive the queued messages
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdA:16, MsgIdA2:16, "m3">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdA2),
<<_, ?SN_PUBREL, MsgIdA2:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdA2),
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdB:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdB:16, MsgIdB1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m3">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB2),
<<_, ?SN_PUBREL, MsgIdB2:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB2),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
application:set_env(emqx_sn, subs_resume, false),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1).
t_register_subs_resume_off(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1).
t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
application:set_env(emqx_sn, subs_resume, true),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
%% registered failured topic-name will be skipped
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID),
%% the gateway try to shutdown this client if it reached max-retry-times
%%
%% times-0
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% times-1
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% times-2
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% just a ping
send_pingreq_msg(NSocket, <<"test">>),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(NSocket)),
%% times-3
timer:sleep(5000), %% RETYRY_TIMEOUT
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
%% shutdown due to reached max retry times
timer:sleep(5000), %% RETYRY_TIMEOUT
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
application:set_env(emqx_sn, subs_resume, false),
gen_udp:close(NSocket).
t_register_enqueue_delivering_messages(_) ->
application:set_env(emqx_sn, subs_resume, true),
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
emqx_logger:set_log_level(debug),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% receive subs register requests
%% registered failured topic-name will be skipped
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
_ = emqx:publish(emqx_message:make(test, ?QOS_0, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
%% receive the queued messages
<<_, ?SN_PUBLISH, 2#00000000,
TopicIdA:16, 0:16, "m1">> = receive_response(NSocket),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
application:set_env(emqx_sn, subs_resume, false),
gen_udp:close(NSocket),
{ok, NSocket1} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket1, <<"test">>),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket1)),
send_disconnect_msg(NSocket1, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
gen_udp:close(NSocket1).
%%--------------------------------------------------------------------
%% Helper funcs
%%--------------------------------------------------------------------
@ -1816,9 +2034,12 @@ send_register_msg(Socket, TopicName, MsgId) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket).
send_regack_msg(Socket, TopicId, MsgId) ->
send_regack_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
send_regack_msg(Socket, TopicId, MsgId, Rc) ->
Length = 7,
MsgType = ?SN_REGACK,
Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc>>,
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->