%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module (emqx_sn_protocol_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -define(HOST, {127,0,0,1}). -define(PORT, 1884). -define(FLAG_DUP(X),X). -define(FLAG_QOS(X),X). -define(FLAG_RETAIN(X),X). -define(FLAG_SESSION(X),X). -define(LOG(Format, Args), ct:pal("TEST: " ++ Format, Args)). -define(MAX_PRED_TOPIC_ID, 2). -define(PREDEF_TOPIC_ID1, 1). -define(PREDEF_TOPIC_ID2, 2). -define(PREDEF_TOPIC_NAME1, <<"/predefined/topic/name/hello">>). -define(PREDEF_TOPIC_NAME2, <<"/predefined/topic/name/nice">>). -define(ENABLE_QOS3, true). % FLAG NOT USED -define(FNU, 0). %% erlang:system_time should be unique and random enough -define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-", integer_to_list(erlang:system_time())])). -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(CONF_DEFAULT, <<" gateway.mqttsn { gateway_id = 1 broadcast = true enable_qos3 = true predefined = [ { id = 1, topic = \"/predefined/topic/name/hello\" }, { id = 2, topic = \"/predefined/topic/name/nice\" } ] clientinfo_override { username = \"user1\" password = \"pw123\" } listeners.udp.default { bind = 1884 } } ">>). %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT), emqx_common_test_helpers:start_apps([emqx_gateway]), Config. end_per_suite(_) -> {ok, _} = emqx:remove_config([gateway, mqttsn]), emqx_common_test_helpers:stop_apps([emqx_gateway]). %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% Connect t_connect(_) -> SockName = {'mqttsn:udp:default', 1884}, ?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())), {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"client_id_test1">>), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId = ?MAX_PRED_TOPIC_ID + 1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), TopicName1 = <<"abcD">>, send_register_msg(Socket, TopicName1, MsgId), ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, receive_response(Socket)), ?assert(lists:member(TopicName1, emqx_broker:topics())), send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), timer:sleep(100), ?assertNot(lists:member(TopicName1, emqx_broker:topics())), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case01(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId = ?MAX_PRED_TOPIC_ID + 1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), TopicName1 = <<"abcD">>, send_register_msg(Socket, TopicName1, MsgId), ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, receive_response(Socket)), send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case02(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId = ?PREDEF_TOPIC_ID1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, ?CLIENTID), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic1 = ?PREDEF_TOPIC_NAME1, send_register_msg(Socket, Topic1, MsgId), ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, receive_response(Socket)), send_unsubscribe_msg_predefined_topic(Socket, TopicId, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case03(_) -> Dup = 0, QoS = 2, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId = 0, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"te">>, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, receive_response(Socket)), send_unsubscribe_msg_short_topic(Socket, <<"te">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). %% In this case We use predefined topic name to register and subcribe, %% and expect to receive the corresponding predefined topic id but not a new %% generated topic id from broker. We design this case to illustrate %% emqx_sn_gateway's compatibility of dealing with predefined and normal %% topics. %% %% Once we give more restrictions to different topic id type, this case %% would be deleted or modified. t_subscribe_case04(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId = ?PREDEF_TOPIC_ID1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic1 = ?PREDEF_TOPIC_NAME1, send_register_msg(Socket, Topic1, MsgId), ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, receive_response(Socket) ), send_unsubscribe_msg_normal_topic(Socket, Topic1, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case05(_) -> Dup = 0, QoS = 1, Retain = 0, Will = 0, CleanSession = 0, MsgId = 25, TopicId0 = 0, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId2 = ?MAX_PRED_TOPIC_ID + 2, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_register_msg(Socket, <<"abcD">>, MsgId), ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"abcD">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, receive_response(Socket) ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/sport/#">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, receive_response(Socket) ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/a/+/water">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, receive_response(Socket) ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/Tom/Home">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId2:16, MsgId:16, ReturnCode>>, receive_response(Socket) ), send_unsubscribe_msg_normal_topic(Socket, <<"abcD">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case06(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId0 = 0, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, TopicId2 = ?MAX_PRED_TOPIC_ID + 2, ReturnCode = 0, ClientId = ?CLIENTID, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_register_msg(Socket, <<"abc">>, MsgId), ?assertEqual( <<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket) ), send_register_msg(Socket, <<"/blue/#">>, MsgId), ?assertEqual( <<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket) ), send_register_msg(Socket, <<"/blue/+/white">>, MsgId), ?assertEqual( <<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket) ), send_register_msg(Socket, <<"/$sys/rain">>, MsgId), ?assertEqual( <<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, receive_response(Socket) ), send_subscribe_msg_short_topic(Socket, QoS, <<"Q2">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, receive_response(Socket) ), send_unsubscribe_msg_normal_topic(Socket, <<"Q2">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case07(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 2, TopicId2 = ?MAX_PRED_TOPIC_ID + 3, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket) ), send_unsubscribe_msg_predefined_topic(Socket, TopicId2, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_subscribe_case08(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId2 = 2, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket) ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_negqos_case09(_) -> Dup = 0, QoS = 0, NegQoS = 3, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic = <<"abc">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1), timer:sleep(100), case ?ENABLE_QOS3 of true -> Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What) end, send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos0_case01(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic = <<"abc">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1), timer:sleep(100), Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos0_case02(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, PredefTopicId = ?PREDEF_TOPIC_ID1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId1, PredefTopicId, Payload1), timer:sleep(100), Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos0_case3(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId = ?MAX_PRED_TOPIC_ID + 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic = <<"/a/b/c">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId1, TopicId, Payload1), timer:sleep(100), Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos0_case04(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId0 = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), MsgId1 = 2, Payload1 = <<20, 21, 22, 23>>, Topic = <<"TR">>, send_publish_msg_short_topic(Socket, QoS, MsgId1, Topic, Payload1), timer:sleep(100), Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos0_case05(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId0 = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos0_case06(_) -> Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic = <<"abc">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1), timer:sleep(100), Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos1_case01(_) -> Dup = 0, QoS = 1, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, Topic = <<"abc">>, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), ?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), timer:sleep(100), ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket) ), send_disconnect_msg(Socket, undefined), gen_udp:close(Socket). t_publish_qos1_case02(_) -> Dup = 0, QoS = 1, Retain = 0, Will = 0, CleanSession = 0, MsgId = 1, PredefTopicId = ?PREDEF_TOPIC_ID1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1), ?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), timer:sleep(100), send_disconnect_msg(Socket, undefined), gen_udp:close(Socket). t_publish_qos1_case03(_) -> QoS = 1, MsgId = 1, TopicId5 = 5, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>), ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos1_case04(_) -> Dup = 0, QoS = 1, Retain = 0, Will = 0, CleanSession = 0, MsgId = 7, TopicId0 = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), Topic = <<"ab">>, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_short_topic(Socket, QoS, MsgId, Topic, Payload1), <> = Topic, ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), timer:sleep(100), send_disconnect_msg(Socket, undefined), gen_udp:close(Socket). t_publish_qos1_case05(_) -> Dup = 0, QoS = 1, Retain = 0, Will = 0, CleanSession = 0, MsgId = 7, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/#">>, <<20, 21, 22, 23>>), <> = <<"/#">>, ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos1_case06(_) -> Dup = 0, QoS = 1, Retain = 0, Will = 0, CleanSession = 0, MsgId = 7, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/+">>, <<20, 21, 22, 23>>), <> = <<"/+">>, ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos2_case01(_) -> Dup = 0, QoS = 2, Retain = 0, Will = 0, CleanSession = 0, MsgId = 7, TopicId1 = ?MAX_PRED_TOPIC_ID + 1, Topic = <<"/abc">>, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), ?assertEqual( <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket) ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos2_case02(_) -> Dup = 0, QoS = 2, Retain = 0, Will = 0, CleanSession = 0, MsgId = 7, PredefTopicId = ?PREDEF_TOPIC_ID2, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), ?assertEqual( <<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), ?assertEqual( <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket) ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_publish_qos2_case03(_) -> Dup = 0, QoS = 2, Retain = 0, Will = 0, CleanSession = 0, MsgId = 7, TopicId0 = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId), ?assertEqual( <<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket) ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), ?assertEqual( <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket) ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_will_case01(_) -> QoS = 1, Duration = 1, WillMsg = <<10, 11, 12, 13, 14>>, WillTopic = <<"abc">>, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, ok = emqx_broker:subscribe(WillTopic), send_connect_msg_with_will(Socket, Duration, ClientId), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), send_willtopic_msg(Socket, WillTopic, QoS), ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), send_willmsg_msg(Socket, WillMsg), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % wait udp client keepalive timeout timer:sleep(3000), receive {deliver, WillTopic, #message{payload = WillMsg}} -> ok after 1000 -> ct:fail(wait_willmsg_timeout) end, ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(udp_receive_timeout, receive_response(Socket)), gen_udp:close(Socket). t_will_test2(_) -> QoS = 2, Duration = 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg_with_will(Socket, Duration, ClientId), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), send_willtopic_msg(Socket, <<"goodbye">>, QoS), ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), send_willmsg_msg(Socket, <<10, 11, 12, 13, 14>>), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), timer:sleep(4000), receive_response(Socket), % ignore PUBACK receive_response(Socket), % ignore PUBCOMP send_disconnect_msg(Socket, undefined), ?assertEqual(udp_receive_timeout, receive_response(Socket)), gen_udp:close(Socket). t_will_test3(_) -> Duration = 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg_with_will(Socket, Duration, ClientId), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), send_willtopic_empty_msg(Socket), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), timer:sleep(4000), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(udp_receive_timeout, receive_response(Socket)), gen_udp:close(Socket). t_will_test4(_) -> QoS = 1, Duration = 1, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg_with_will(Socket, Duration, ClientId), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), send_willtopic_msg(Socket, <<"abc">>, QoS), ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), send_willmsg_msg(Socket, <<10, 11, 12, 13, 14>>), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), send_willtopicupd_msg(Socket, <<"/XYZ">>, ?QOS_1), ?assertEqual(<<3, ?SN_WILLTOPICRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)), send_willmsgupd_msg(Socket, <<"1A2B3C">>), ?assertEqual(<<3, ?SN_WILLMSGRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)), timer:sleep(4000), receive_response(Socket), % ignore PUBACK send_disconnect_msg(Socket, undefined), ?assertEqual(udp_receive_timeout, receive_response(Socket)), gen_udp:close(Socket). t_will_test5(_) -> QoS = 1, Duration = 5, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, send_connect_msg_with_will(Socket, Duration, ClientId), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), send_willtopic_msg(Socket, <<"abc">>, QoS), ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), send_willmsg_msg(Socket, <<10, 11, 12, 13, 14>>), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), send_willtopicupd_empty_msg(Socket), ?assertEqual(<<3, ?SN_WILLTOPICRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)), timer:sleep(1000), ?assertEqual(udp_receive_timeout, receive_response(Socket)), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). t_will_case06(_) -> QoS = 1, Duration = 1, WillMsg = <<10, 11, 12, 13, 14>>, WillTopic = <<"abc">>, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, ok = emqx_broker:subscribe(WillTopic), send_connect_msg_with_will1(Socket, Duration, ClientId), ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), send_willtopic_msg(Socket, WillTopic, QoS), ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), send_willmsg_msg(Socket, WillMsg), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % wait udp client keepalive timeout timer:sleep(3000), receive {deliver, WillTopic, #message{payload = WillMsg}} -> ok; Msg -> ct:print("received --- unex: ~p", [Msg]) after 1000 -> ct:fail(wait_willmsg_timeout) end, send_disconnect_msg(Socket, undefined), gen_udp:close(Socket). %t_asleep_test01_timeout(_) -> % QoS = 1, % Duration = 1, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % send_disconnect_msg(Socket, 1), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % %% asleep timer should get timeout, and device is lost % timer:sleep(3000), % % gen_udp:close(Socket). % %t_asleep_test02_to_awake_and_back(_) -> % QoS = 1, % Keepalive_Duration = 1, % SleepDuration = 5, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % goto asleep state % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(4500), % % % goto awake state and back % send_pingreq_msg(Socket, ClientId), % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % timer:sleep(4500), % % % goto awake state and back % send_pingreq_msg(Socket, ClientId), % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % %% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process % % %% asleep timer should get timeout, and device should get lost % timer:sleep(8000), % % gen_udp:close(Socket). % %t_asleep_test03_to_awake_qos1_dl_msg(_) -> % QoS = 1, % Duration = 5, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % MsgId = 1000, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % subscribe % TopicName1 = <<"abc">>, % MsgId1 = 25, % TopicId1 = ?MAX_PRED_TOPIC_ID + 1, % WillBit = 0, % Dup = 0, % Retain = 0, % CleanSession = 0, % ReturnCode = 0, % Payload1 = <<55, 66, 77, 88, 99>>, % MsgId2 = 87, % % send_register_msg(Socket, TopicName1, MsgId1), % ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)), % send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), % ?assertEqual( % <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, % WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, % TopicId1:16, MsgId:16, ReturnCode>>, % receive_response(Socket) % ), % % % goto asleep state % send_disconnect_msg(Socket, 1), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(300), % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % %% send downlink data in asleep state. This message should be send to device once it wake up % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % send_publish_msg_predefined_topic(Socket, QoS, MsgId2, TopicId1, Payload1), % % {ok, C} = emqtt:start_link(), % {ok, _} = emqtt:connect(C), % {ok, _} = emqtt:publish(C, TopicName1, Payload1, QoS), % timer:sleep(100), % ok = emqtt:disconnect(C), % % timer:sleep(50), % % % goto awake state, receive downlink messages, and go back to asleep % send_pingreq_msg(Socket, ClientId), % % %% the broker should sent dl msgs to the awake client before sending the pingresp % UdpData = receive_response(Socket), % MsgId_udp = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData), % send_puback_msg(Socket, TopicId1, MsgId_udp), % % %% check the pingresp is received at last % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % gen_udp:close(Socket). % %t_asleep_test04_to_awake_qos1_dl_msg(_) -> % QoS = 1, % Duration = 5, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % subscribe % TopicName1 = <<"a/+/c">>, % MsgId1 = 25, % TopicId0 = 0, % WillBit = 0, % Dup = 0, % Retain = 0, % CleanSession = 0, % ReturnCode = 0, % send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), % ?assertEqual( % <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, % WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, % TopicId0:16, MsgId1:16, ReturnCode>>, % receive_response(Socket) % ), % % % goto asleep state % send_disconnect_msg(Socket, 1), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(300), % % %% send downlink data in asleep state. This message should be send to device once it wake up % Payload1 = <<55, 66, 77, 88, 99>>, % Payload2 = <<55, 66, 77, 88, 100>>, % % {ok, C} = emqtt:start_link(), % {ok, _} = emqtt:connect(C), % {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload1, QoS), % {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload2, QoS), % timer:sleep(100), % ok = emqtt:disconnect(C), % % timer:sleep(300), % % % goto awake state, receive downlink messages, and go back to asleep % send_pingreq_msg(Socket, ClientId), % % %% 1. get REGISTER first, since this topic has never been registered % UdpData1 = receive_response(Socket), % {TopicIdNew, MsgId3} = check_register_msg_on_udp(<<"a/b/c">>, UdpData1), % % %% 2. but before we reply the REGACK, the sn-gateway should not send any PUBLISH % ?assertError(_, receive_publish(Socket)), % % send_regack_msg(Socket, TopicIdNew, MsgId3), % % UdpData2 = receive_response(Socket), % MsgId_udp2 = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2), % send_puback_msg(Socket, TopicIdNew, MsgId_udp2), % % UdpData3 = receive_response(Socket), % MsgId_udp3 = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3), % send_puback_msg(Socket, TopicIdNew, MsgId_udp3), % % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % gen_udp:close(Socket). receive_publish(Socket) -> UdpData3 = receive_response(Socket, 1000), <> = UdpData3, <<_:8, ?SN_PUBLISH, _/binary>> = HeaderUdp. %t_asleep_test05_to_awake_qos1_dl_msg(_) -> % QoS = 1, % Duration = 5, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % subscribe % TopicName1 = <<"u/+/w">>, % MsgId1 = 25, % TopicId0 = 0, % WillBit = 0, % Dup = 0, % Retain = 0, % CleanSession = 0, % ReturnCode = 0, % send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), % ?assertEqual( % <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, % WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, % TopicId0:16, MsgId1:16, ReturnCode>>, % receive_response(Socket) % ), % % % goto asleep state % SleepDuration = 30, % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(300), % % %% send downlink data in asleep state. This message should be send to device once it wake up % Payload2 = <<55, 66, 77, 88, 99>>, % Payload3 = <<61, 71, 81>>, % Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>, % TopicName_test5 = <<"u/v/w">>, % {ok, C} = emqtt:start_link(), % {ok, _} = emqtt:connect(C), % {ok, _} = emqtt:publish(C, TopicName_test5, Payload2, QoS), % timer:sleep(100), % {ok, _} = emqtt:publish(C, TopicName_test5, Payload3, QoS), % timer:sleep(100), % {ok, _} = emqtt:publish(C, TopicName_test5, Payload4, QoS), % timer:sleep(200), % ok = emqtt:disconnect(C), % timer:sleep(50), % % % goto awake state, receive downlink messages, and go back to asleep % send_pingreq_msg(Socket, ClientId), % % UdpData_reg = receive_response(Socket), % {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg), % send_regack_msg(Socket, TopicIdNew, MsgId_reg), % % UdpData2 = receive_response(Socket), % MsgId2 = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), % send_puback_msg(Socket, TopicIdNew, MsgId2), % timer:sleep(50), % % UdpData3 = wrap_receive_response(Socket), % MsgId3 = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), % send_puback_msg(Socket, TopicIdNew, MsgId3), % timer:sleep(50), % % case receive_response(Socket) of % <<2,23>> -> ok; % UdpData4 -> % MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, % CleanSession, ?SN_NORMAL_TOPIC, % TopicIdNew, Payload4}, UdpData4), % send_puback_msg(Socket, TopicIdNew, MsgId4) % end, % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % gen_udp:close(Socket). % %t_asleep_test06_to_awake_qos2_dl_msg(_) -> % QoS = 2, % Duration = 1, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % subscribe % TopicName_tom = <<"tom">>, % MsgId1 = 25, % WillBit = 0, % Dup = 0, % Retain = 0, % CleanSession = 0, % ReturnCode = 0, % send_register_msg(Socket, TopicName_tom, MsgId1), % timer:sleep(50), % TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), % send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), % ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, % ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>, % receive_response(Socket)), % % % goto asleep state % SleepDuration = 11, % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % %% send downlink data in asleep state. This message should be send to device once it wake up % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % Payload1 = <<55, 66, 77, 88, 99>>, % {ok, C} = emqtt:start_link(), % {ok, _} = emqtt:connect(C), % {ok, _} = emqtt:publish(C, TopicName_tom, Payload1, QoS), % timer:sleep(100), % ok = emqtt:disconnect(C), % timer:sleep(300), % % % goto awake state, receive downlink messages, and go back to asleep % send_pingreq_msg(Socket, ClientId), % % UdpData = wrap_receive_response(Socket), % MsgId_udp = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), % send_pubrec_msg(Socket, MsgId_udp), % ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)), % send_pubcomp_msg(Socket, MsgId_udp), % % %% verify the pingresp is received after receiving all the buffered qos2 msgs % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % gen_udp:close(Socket). % %t_asleep_test07_to_connected(_) -> % QoS = 1, % Keepalive_Duration = 10, % SleepDuration = 1, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % subscribe % TopicName_tom = <<"tom">>, % MsgId1 = 25, % WillBit = 0, % Dup = 0, % Retain = 0, % CleanSession = 0, % ReturnCode = 0, % send_register_msg(Socket, TopicName_tom, MsgId1), % TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), % send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), % ?assertEqual( % <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, % WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, % TopicId_tom:16, MsgId1:16, ReturnCode>>, % receive_response(Socket) % ), % % % goto asleep state % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % %% send connect message, and goto connected state % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % send_connect_msg(Socket, ClientId), % ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)), % % timer:sleep(1500), % % asleep timer should get timeout, without any effect % % timer:sleep(4000), % % keepalive timer should get timeout % % gen_udp:close(Socket). % %t_asleep_test08_to_disconnected(_) -> % QoS = 1, % Keepalive_Duration = 3, % SleepDuration = 1, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % goto asleep state % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % %% send disconnect message, and goto disconnected state % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % send_disconnect_msg(Socket, undefined), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % it is a normal termination, without will message % % gen_udp:close(Socket). % %t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> % QoS = 1, % Duration = 5, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % subscribe % TopicName1 = <<"u/+/k">>, % MsgId1 = 25, % TopicId0 = 0, % WillBit = 0, % Dup = 0, % Retain = 0, % CleanSession = 0, % ReturnCode = 0, % send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), % ?assertEqual( % <<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, % WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, % TopicId0:16, MsgId1:16, ReturnCode>>, % receive_response(Socket) % ), % % goto asleep state % SleepDuration = 30, % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(1000), % % %% send downlink data in asleep state. This message should be send to device once it wake up % Payload2 = <<55, 66, 77, 88, 99>>, % Payload3 = <<61, 71, 81>>, % Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>, % TopicName_test9 = <<"u/v/k">>, % {ok, C} = emqtt:start_link(), % {ok, _} = emqtt:connect(C), % {ok, _} = emqtt:publish(C, TopicName_test9, Payload2, QoS), % timer:sleep(100), % {ok, _} = emqtt:publish(C, TopicName_test9, Payload3, QoS), % timer:sleep(100), % {ok, _} = emqtt:publish(C, TopicName_test9, Payload4, QoS), % timer:sleep(1000), % ok = emqtt:disconnect(C), % % % goto awake state, receive downlink messages, and go back to asleep % send_pingreq_msg(Socket, ClientId), % % UdpData_reg = receive_response(Socket), % {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg), % send_regack_msg(Socket, TopicIdNew, MsgId_reg), % % case wrap_receive_response(Socket) of % udp_receive_timeout -> % ok; % UdpData2 -> % MsgId2 = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), % send_puback_msg(Socket, TopicIdNew, MsgId2) % end, % timer:sleep(100), % % case wrap_receive_response(Socket) of % udp_receive_timeout -> % ok; % UdpData3 -> % MsgId3 = check_publish_msg_on_udp( % {Dup, QoS, Retain, WillBit, CleanSession, % ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), % send_puback_msg(Socket, TopicIdNew, MsgId3) % end, % timer:sleep(100), % % case wrap_receive_response(Socket) of % udp_receive_timeout -> % ok; % UdpData4 -> % MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, % CleanSession, ?SN_NORMAL_TOPIC, % TopicIdNew, Payload4}, UdpData4), % send_puback_msg(Socket, TopicIdNew, MsgId4) % end, % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % %% send PINGREQ again to enter awake state % send_pingreq_msg(Socket, ClientId), % %% will not receive any buffered PUBLISH messages buffered before last % %% awake, only receive PINGRESP here % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % gen_udp:close(Socket). % %t_awake_test01_to_connected(_) -> % QoS = 1, % Keepalive_Duration = 3, % SleepDuration = 1, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % goto asleep state % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % %% send connect message, and goto connected state % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % send_connect_msg(Socket, ClientId), % ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)), % % timer:sleep(1500), % % asleep timer should get timeout % % timer:sleep(4000), % % keepalive timer should get timeout % gen_udp:close(Socket). % %t_awake_test02_to_disconnected(_) -> % QoS = 1, % Keepalive_Duration = 3, % SleepDuration = 1, % WillTopic = <<"dead">>, % WillPayload = <<10, 11, 12, 13, 14>>, % {ok, Socket} = gen_udp:open(0, [binary]), % ClientId = ?CLIENTID, % send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId), % ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), % send_willtopic_msg(Socket, WillTopic, QoS), % ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), % send_willmsg_msg(Socket, WillPayload), % ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), % % % % goto asleep state % send_disconnect_msg(Socket, SleepDuration), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % % goto awake state % send_pingreq_msg(Socket, ClientId), % ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), % % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % %% send disconnect message, and goto disconnected state % %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% % send_disconnect_msg(Socket, undefined), % ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), % % timer:sleep(100), % % it is a normal termination, no will message will be send % % gen_udp:close(Socket). t_broadcast_test1(_) -> {ok, Socket} = gen_udp:open( 0, [binary]), send_searchgw_msg(Socket), ?assertEqual(<<3, ?SN_GWINFO, 1>>, receive_response(Socket)), timer:sleep(600), gen_udp:close(Socket). %%-------------------------------------------------------------------- %% Helper funcs %%-------------------------------------------------------------------- send_searchgw_msg(Socket) -> Length = 3, MsgType = ?SN_SEARCHGW, Radius = 0, ok = gen_udp:send(Socket, ?HOST, ?PORT, <>). send_connect_msg(Socket, ClientId) -> Length = 6 + byte_size(ClientId), MsgType = ?SN_CONNECT, Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 1, TopicIdType = 0, ProtocolId = 1, Duration = 10, Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_connect_msg_with_will(Socket, Duration, ClientId) -> Length = 6 + byte_size(ClientId), Will = 1, CleanSession = 1, ProtocolId = 1, ConnectPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). send_connect_msg_with_will1(Socket, Duration, ClientId) -> Length = 6 + byte_size(ClientId), Will = 1, CleanSession = 0, ProtocolId = 1, ConnectPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). send_willtopic_msg(Socket, Topic, QoS) -> Length = 3+byte_size(Topic), MsgType = ?SN_WILLTOPIC, Retain = 0, WillTopicPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket). send_willtopic_empty_msg(Socket) -> Length = 2, MsgType = ?SN_WILLTOPIC, WillTopicPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket). send_willmsg_msg(Socket, Msg) -> Length = 2+byte_size(Msg), WillMsgPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, WillMsgPacket). send_willtopicupd_msg(Socket, Topic, QoS) -> Length = 3+byte_size(Topic), MsgType = ?SN_WILLTOPICUPD, Retain = 0, WillTopicPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket). send_willtopicupd_empty_msg(Socket) -> Length = 2, MsgType = ?SN_WILLTOPICUPD, WillTopicPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket). send_willmsgupd_msg(Socket, Msg) -> Length = 2+byte_size(Msg), MsgType = ?SN_WILLMSGUPD, WillTopicPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket). send_register_msg(Socket, TopicName, MsgId) -> Length = 6 + byte_size(TopicName), MsgType = ?SN_REGISTER, TopicId = 0, RegisterPacket = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket). send_regack_msg(Socket, TopicId, MsgId) -> Length = 7, MsgType = ?SN_REGACK, Packet = <>, ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet). send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) -> Length = 7 + byte_size(Data), MsgType = ?SN_PUBLISH, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_NORMAL_TOPIC, PublishPacket = <>, ?LOG("send_publish_msg_normal_topic TopicId=~p, Data=~p", [TopicId, Data]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket). send_publish_msg_predefined_topic(Socket, QoS, MsgId, TopicId, Data) -> Length = 7 + byte_size(Data), MsgType = ?SN_PUBLISH, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_PREDEFINED_TOPIC, PublishPacket = <>, ?LOG("send_publish_msg_predefined_topic TopicId=~p, Data=~p", [TopicId, Data]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket). send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) -> Length = 7 + byte_size(Data), MsgType = ?SN_PUBLISH, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = 2, PublishPacket = <>, ?LOG("send_publish_msg_short_topic TopicName=~p, Data=~p", [TopicName, Data]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket). send_puback_msg(Socket, TopicId, MsgId) -> Length = 7, MsgType = ?SN_PUBACK, PubAckPacket = <>, ?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket). send_pubrec_msg(Socket, MsgId) -> Length = 4, MsgType = ?SN_PUBREC, PubRecPacket = <>, ?LOG("send_pubrec_msg MsgId=~p", [MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PubRecPacket). send_pubrel_msg(Socket, MsgId) -> Length = 4, MsgType = ?SN_PUBREL, PubRelPacket = <>, ?LOG("send_pubrel_msg MsgId=~p", [MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PubRelPacket). send_pubcomp_msg(Socket, MsgId) -> Length = 4, MsgType = ?SN_PUBCOMP, PubCompPacket = <>, ?LOG("send_pubcomp_msg MsgId=~p", [MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PubCompPacket). send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId) -> MsgType = ?SN_SUBSCRIBE, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_NORMAL_TOPIC, Length = byte_size(Topic) + 5, SubscribePacket = <>, ?LOG("send_subscribe_msg_normal_topic Topic=~p, MsgId=~p", [Topic, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket). send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId) -> Length = 7, MsgType = ?SN_SUBSCRIBE, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_PREDEFINED_TOPIC, SubscribePacket = <>, ?LOG("send_subscribe_msg_predefined_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket). send_subscribe_msg_short_topic(Socket, QoS, Topic, MsgId) -> Length = 7, MsgType = ?SN_SUBSCRIBE, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_SHORT_TOPIC, SubscribePacket = <>, ?LOG("send_subscribe_msg_short_topic Topic=~p, MsgId=~p", [Topic, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket). send_subscribe_msg_reserved_topic(Socket, QoS, TopicId, MsgId) -> Length = 7, MsgType = ?SN_SUBSCRIBE, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_RESERVED_TOPIC, SubscribePacket = <>, ?LOG("send_subscribe_msg_reserved_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket). send_unsubscribe_msg_predefined_topic(Socket, TopicId, MsgId) -> Length = 7, MsgType = ?SN_UNSUBSCRIBE, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_PREDEFINED_TOPIC, UnSubscribePacket = <>, ?LOG("send_unsubscribe_msg_predefined_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, UnSubscribePacket). send_unsubscribe_msg_normal_topic(Socket, TopicName, MsgId) -> MsgType = ?SN_UNSUBSCRIBE, Dup = 0, QoS = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_NORMAL_TOPIC, Length = 5 + byte_size(TopicName), UnSubscribePacket = <>, ?LOG("send_unsubscribe_msg_normal_topic TopicName=~p, MsgId=~p", [TopicName, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, UnSubscribePacket). send_unsubscribe_msg_short_topic(Socket, TopicId, MsgId) -> Length = 7, MsgType = ?SN_UNSUBSCRIBE, Dup = 0, Retain = 0, Will = 0, CleanSession = 0, TopicIdType = ?SN_SHORT_TOPIC, UnSubscribePacket = <>, ?LOG("send_unsubscribe_msg_short_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, UnSubscribePacket). send_pingreq_msg(Socket, ClientId)-> Length = 2, MsgType = ?SN_PINGREQ, PingReqPacket = case ClientId of undefined -> <>; Other -> Size = byte_size(Other)+2, <> end, ?LOG("send_pingreq_msg ClientId=~p", [ClientId]), ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket). send_disconnect_msg(Socket, Duration) -> Length = 2, Length2 = 4, MsgType = ?SN_DISCONNECT, DisConnectPacket = case Duration of undefined -> <>; Other -> <> end, ?LOG("send_disconnect_msg Duration=~p", [Duration]), ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket). mid(Id) -> Id. tid(Id) -> Id. %% filter <<2, 23>> pingresp packet wrap_receive_response(Socket) -> case receive_response(Socket) of <<2,23>> -> ct:log("PingResp"), wrap_receive_response(Socket); Other -> ct:log("Other: ~p", [Other]), Other end. receive_response(Socket) -> receive_response(Socket, 2000). receive_response(Socket, Timeout) -> receive {udp, Socket, _, _, Bin} -> ?LOG("receive_response Bin=~p~n", [Bin]), Bin; {mqttc, From, Data2} -> ?LOG("receive_response() ignore mqttc From=~p, Data2=~p~n", [From, Data2]), receive_response(Socket); Other -> ?LOG("receive_response() Other message: ~p", [{unexpected_udp_data, Other}]), receive_response(Socket) after Timeout -> udp_receive_timeout end. receive_emqttc_response() -> receive {mqttc, _From, Data2} -> Data2; {publish, Topic, Payload} -> {publish, Topic, Payload}; Other -> {unexpected_emqttc_data, Other} after 2000 -> emqttc_receive_timeout end. check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket) -> PubMsg = receive_response(Socket), Length = 7 + byte_size(Payload), ?LOG("check_dispatched_message ~p~n", [PubMsg]), ?LOG("expected ~p xx ~p~n", [<>, Payload]), <> = PubMsg, case QoS of 0 -> ok; 1 -> send_puback_msg(Socket, TopicId, MsgId); 2 -> send_pubrel_msg(Socket, MsgId), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)) end, ok. get_udp_broadcast_address() -> "255.255.255.255". check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) -> <> = UdpData, ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]), Size9 = byte_size(Payload) + 7, Eexp = <>, ?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same ?assertEqual(Payload, PayloadIn), % payload should be same MsgId. check_register_msg_on_udp(TopicName, UdpData) -> ct:log("UdpData: ~p~n", [UdpData]), <> = UdpData, Size = byte_size(TopicName) + 6, ?assertEqual(<>, HeaderUdp), ?assertEqual(TopicName, PayloadIn), {TopicId, MsgId}. check_regack_msg_on_udp(MsgId, UdpData) -> <<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> = UdpData, TopicId. flush() -> flush([]). flush(Msgs) -> receive M -> flush([M|Msgs]) after 0 -> lists:reverse(Msgs) end.