1958 lines
71 KiB
Erlang
1958 lines
71 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module (emqx_sn_protocol_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("emqx_gateway/src/mqttsn/include/emqx_sn.hrl").
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
|
|
-define(HOST, {127,0,0,1}).
|
|
-define(PORT, 1884).
|
|
|
|
-define(FLAG_DUP(X),X).
|
|
-define(FLAG_QOS(X),X).
|
|
-define(FLAG_RETAIN(X),X).
|
|
-define(FLAG_SESSION(X),X).
|
|
|
|
-define(LOG(Format, Args), ct:print("TEST: " ++ Format, Args)).
|
|
|
|
-define(MAX_PRED_TOPIC_ID, 2).
|
|
-define(PREDEF_TOPIC_ID1, 1).
|
|
-define(PREDEF_TOPIC_ID2, 2).
|
|
-define(PREDEF_TOPIC_NAME1, <<"/predefined/topic/name/hello">>).
|
|
-define(PREDEF_TOPIC_NAME2, <<"/predefined/topic/name/nice">>).
|
|
-define(ENABLE_QOS3, true).
|
|
% FLAG NOT USED
|
|
-define(FNU, 0).
|
|
|
|
%% erlang:system_time should be unique and random enough
|
|
-define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-",
|
|
integer_to_list(erlang:system_time())])).
|
|
|
|
-define(CONF_DEFAULT, <<"
|
|
gateway.mqttsn {
|
|
gateway_id = 1
|
|
broadcast = true
|
|
enable_qos3 = true
|
|
predefined = [
|
|
{ id = 1,
|
|
topic = \"/predefined/topic/name/hello\"
|
|
},
|
|
{ id = 2,
|
|
topic = \"/predefined/topic/name/nice\"
|
|
}
|
|
]
|
|
clientinfo_override {
|
|
username = \"user1\"
|
|
password = \"pw123\"
|
|
}
|
|
listeners.udp.default {
|
|
bind = 1884
|
|
}
|
|
}
|
|
">>).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Setups
|
|
%%--------------------------------------------------------------------
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
ok = emqx_config:init_load(emqx_gateway_schema, ?CONF_DEFAULT),
|
|
emqx_common_test_helpers:start_apps([emqx_gateway]),
|
|
Config.
|
|
|
|
end_per_suite(_) ->
|
|
{ok, _} = emqx:remove_config([gateway, mqttsn]),
|
|
emqx_common_test_helpers:stop_apps([emqx_gateway]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test cases
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Connect
|
|
|
|
t_connect(_) ->
|
|
SockName = {'mqttsn:udp:default', 1884},
|
|
?assertEqual(true, lists:keymember(SockName, 1, esockd:listeners())),
|
|
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
send_connect_msg(Socket, <<"client_id_test1">>),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId = ?MAX_PRED_TOPIC_ID + 1,
|
|
ReturnCode = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
TopicName1 = <<"abcD">>,
|
|
send_register_msg(Socket, TopicName1, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16,
|
|
MsgId:16, ReturnCode>>, receive_response(Socket)),
|
|
?assert(lists:member(TopicName1, emqx_broker:topics())),
|
|
|
|
send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
timer:sleep(100),
|
|
?assertNot(lists:member(TopicName1, emqx_broker:topics())),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case01(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId = ?MAX_PRED_TOPIC_ID + 1,
|
|
ReturnCode = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
TopicName1 = <<"abcD">>,
|
|
send_register_msg(Socket, TopicName1, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_unsubscribe_msg_normal_topic(Socket, TopicName1, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case02(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
|
|
ReturnCode = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
|
|
send_connect_msg(Socket, ?CLIENTID),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
Topic1 = ?PREDEF_TOPIC_NAME1,
|
|
send_register_msg(Socket, Topic1, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_unsubscribe_msg_predefined_topic(Socket, TopicId, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case03(_) ->
|
|
Dup = 0,
|
|
QoS = 2,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId = 0,
|
|
ReturnCode = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_short_topic(Socket, QoS, <<"te">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_unsubscribe_msg_short_topic(Socket, <<"te">>, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
%%In this case We use predefined topic name to register and subcribe, and expect to receive the corresponding predefined topic id but not a new generated topic id from broker. We design this case to illustrate
|
|
%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. Once we give more restrictions to different topic id type, this case would be deleted or modified.
|
|
t_subscribe_case04(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
|
|
ReturnCode = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
Topic1 = ?PREDEF_TOPIC_NAME1,
|
|
send_register_msg(Socket, Topic1, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_unsubscribe_msg_normal_topic(Socket, Topic1, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case05(_) ->
|
|
Dup = 0,
|
|
QoS = 1,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 25,
|
|
TopicId0 = 0,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
TopicId2 = ?MAX_PRED_TOPIC_ID + 2,
|
|
ReturnCode = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_register_msg(Socket, <<"abcD">>, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"abcD">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"/sport/#">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"/a/+/water">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"/Tom/Home">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId2:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
send_unsubscribe_msg_normal_topic(Socket, <<"abcD">>, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case06(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId0 = 0,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
TopicId2 = ?MAX_PRED_TOPIC_ID + 2,
|
|
ReturnCode = 0,
|
|
ClientId = ?CLIENTID,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
send_connect_msg(Socket, ClientId),
|
|
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_register_msg(Socket, <<"abc">>, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
|
|
send_register_msg(Socket, <<"/blue/#">>, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)),
|
|
|
|
send_register_msg(Socket, <<"/blue/+/white">>, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)),
|
|
send_register_msg(Socket, <<"/$sys/rain">>, MsgId),
|
|
?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_short_topic(Socket, QoS, <<"Q2">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
|
|
receive_response(Socket)),
|
|
|
|
send_unsubscribe_msg_normal_topic(Socket, <<"Q2">>, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case07(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 2,
|
|
TopicId2 = ?MAX_PRED_TOPIC_ID + 3,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
|
|
receive_response(Socket)),
|
|
|
|
send_unsubscribe_msg_predefined_topic(Socket, TopicId2, MsgId),
|
|
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_subscribe_case08(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId2 = 2,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
|
|
receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_negqos_case09(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
NegQoS = 3,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
Topic = <<"abc">>,
|
|
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
MsgId1 = 3,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1),
|
|
timer:sleep(100),
|
|
case ?ENABLE_QOS3 of
|
|
true ->
|
|
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
|
What = receive_response(Socket),
|
|
?assertEqual(Eexp, What)
|
|
end,
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos0_case01(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
Topic = <<"abc">>,
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16,
|
|
MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
MsgId1 = 3,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1),
|
|
timer:sleep(100),
|
|
|
|
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
|
What = receive_response(Socket),
|
|
?assertEqual(Eexp, What),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos0_case02(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
PredefTopicId = ?PREDEF_TOPIC_ID1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
MsgId1 = 3,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId1, PredefTopicId, Payload1),
|
|
timer:sleep(100),
|
|
|
|
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
|
What = receive_response(Socket),
|
|
?assertEqual(Eexp, What),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos0_case3(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId = ?MAX_PRED_TOPIC_ID + 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
Topic = <<"/a/b/c">>,
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
MsgId1 = 3,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId1, TopicId, Payload1),
|
|
timer:sleep(100),
|
|
|
|
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
|
What = receive_response(Socket),
|
|
?assertEqual(Eexp, What),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos0_case04(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId0 = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
MsgId1 = 2,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
Topic = <<"TR">>,
|
|
send_publish_msg_short_topic(Socket, QoS, MsgId1, Topic, Payload1),
|
|
timer:sleep(100),
|
|
|
|
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
|
What = receive_response(Socket),
|
|
?assertEqual(Eexp, What),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos0_case05(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId0 = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
|
|
t_publish_qos0_case06(_) ->
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
Topic = <<"abc">>,
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
MsgId1 = 3,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1),
|
|
timer:sleep(100),
|
|
|
|
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
|
What = receive_response(Socket),
|
|
?assertEqual(Eexp, What),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos1_case01(_) ->
|
|
Dup = 0,
|
|
QoS = 1,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
Topic = <<"abc">>,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
|
?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
timer:sleep(100),
|
|
|
|
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos1_case02(_) ->
|
|
Dup = 0,
|
|
QoS = 1,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 1,
|
|
PredefTopicId = ?PREDEF_TOPIC_ID1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1),
|
|
?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
timer:sleep(100),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos1_case03(_) ->
|
|
QoS = 1,
|
|
MsgId = 1,
|
|
TopicId5 = 5,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>),
|
|
?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos1_case04(_) ->
|
|
Dup = 0,
|
|
QoS = 1,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 7,
|
|
TopicId0 = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
Topic = <<"ab">>,
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_short_topic(Socket, QoS, MsgId, Topic, Payload1),
|
|
<<TopicIdShort:16>> = Topic,
|
|
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
timer:sleep(100),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos1_case05(_) ->
|
|
Dup = 0,
|
|
QoS = 1,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 7,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/#">>, <<20, 21, 22, 23>>),
|
|
<<TopicIdShort:16>> = <<"/#">>,
|
|
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos1_case06(_) ->
|
|
Dup = 0,
|
|
QoS = 1,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 7,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/+">>, <<20, 21, 22, 23>>),
|
|
<<TopicIdShort:16>> = <<"/+">>,
|
|
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos2_case01(_) ->
|
|
Dup = 0,
|
|
QoS = 2,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 7,
|
|
TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
Topic = <<"/abc">>,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId1:16, MsgId:16,
|
|
?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
|
|
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
|
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
|
send_pubrel_msg(Socket, MsgId),
|
|
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
|
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
|
timer:sleep(100),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos2_case02(_) ->
|
|
Dup = 0,
|
|
QoS = 2,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 7,
|
|
PredefTopicId = ?PREDEF_TOPIC_ID2,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1),
|
|
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
|
send_pubrel_msg(Socket, MsgId),
|
|
|
|
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC :2, PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
|
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
|
|
|
timer:sleep(100),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_publish_qos2_case03(_) ->
|
|
Dup = 0,
|
|
QoS = 2,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
MsgId = 7,
|
|
TopicId0 = 0,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg(Socket, ClientId),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, <<"/#">>, MsgId),
|
|
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
receive_response(Socket)),
|
|
|
|
Payload1 = <<20, 21, 22, 23>>,
|
|
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/a">>, Payload1),
|
|
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
|
send_pubrel_msg(Socket, MsgId),
|
|
|
|
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
|
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
|
timer:sleep(100),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
gen_udp:close(Socket).
|
|
|
|
t_will_case01(_) ->
|
|
QoS = 1,
|
|
Duration = 1,
|
|
WillMsg = <<10, 11, 12, 13, 14>>,
|
|
WillTopic = <<"abc">>,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
|
|
ok = emqx_broker:subscribe(WillTopic),
|
|
|
|
send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
|
|
send_willtopic_msg(Socket, WillTopic, QoS),
|
|
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
|
|
send_willmsg_msg(Socket, WillMsg),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_pingreq_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
|
|
% wait udp client keepalive timeout
|
|
timer:sleep(3000),
|
|
|
|
receive
|
|
{deliver, WillTopic, #message{payload = WillMsg}} ->
|
|
ok
|
|
after
|
|
1000 -> ct:fail(wait_willmsg_timeout)
|
|
end,
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_will_test2(_) ->
|
|
QoS = 2,
|
|
Duration = 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
send_willtopic_msg(Socket, <<"goodbye">>, QoS),
|
|
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
send_willmsg_msg(Socket, <<10, 11, 12, 13, 14>>),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_pingreq_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
|
|
timer:sleep(4000),
|
|
|
|
receive_response(Socket), % ignore PUBACK
|
|
receive_response(Socket), % ignore PUBCOMP
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_will_test3(_) ->
|
|
Duration = 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
send_willtopic_empty_msg(Socket),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_pingreq_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
|
|
timer:sleep(4000),
|
|
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_will_test4(_) ->
|
|
QoS = 1,
|
|
Duration = 1,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
send_willtopic_msg(Socket, <<"abc">>, QoS),
|
|
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
send_willmsg_msg(Socket, <<10, 11, 12, 13, 14>>),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_pingreq_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
send_willtopicupd_msg(Socket, <<"/XYZ">>, ?QOS_1),
|
|
?assertEqual(<<3, ?SN_WILLTOPICRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
send_willmsgupd_msg(Socket, <<"1A2B3C">>),
|
|
?assertEqual(<<3, ?SN_WILLMSGRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
|
|
timer:sleep(4000),
|
|
|
|
receive_response(Socket), % ignore PUBACK
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_will_test5(_) ->
|
|
QoS = 1,
|
|
Duration = 5,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
|
|
ClientId = ?CLIENTID,
|
|
send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
send_willtopic_msg(Socket, <<"abc">>, QoS),
|
|
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
send_willmsg_msg(Socket, <<10, 11, 12, 13, 14>>),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
send_pingreq_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
send_willtopicupd_empty_msg(Socket),
|
|
?assertEqual(<<3, ?SN_WILLTOPICRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
|
|
timer:sleep(1000),
|
|
|
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
|
|
|
send_disconnect_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
t_will_case06(_) ->
|
|
QoS = 1,
|
|
Duration = 1,
|
|
WillMsg = <<10, 11, 12, 13, 14>>,
|
|
WillTopic = <<"abc">>,
|
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
|
ClientId = ?CLIENTID,
|
|
|
|
ok = emqx_broker:subscribe(WillTopic),
|
|
|
|
send_connect_msg_with_will1(Socket, Duration, ClientId),
|
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
|
|
send_willtopic_msg(Socket, WillTopic, QoS),
|
|
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
|
|
send_willmsg_msg(Socket, WillMsg),
|
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
|
|
send_pingreq_msg(Socket, undefined),
|
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
|
|
% wait udp client keepalive timeout
|
|
timer:sleep(3000),
|
|
|
|
receive
|
|
{deliver, WillTopic, #message{payload = WillMsg}} -> ok;
|
|
Msg -> ct:print("received --- unex: ~p", [Msg])
|
|
after
|
|
1000 -> ct:fail(wait_willmsg_timeout)
|
|
end,
|
|
send_disconnect_msg(Socket, undefined),
|
|
|
|
gen_udp:close(Socket).
|
|
|
|
%t_asleep_test01_timeout(_) ->
|
|
% QoS = 1,
|
|
% Duration = 1,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
%
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% send_disconnect_msg(Socket, 1),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% %% asleep timer should get timeout, and device is lost
|
|
% timer:sleep(3000),
|
|
%
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test02_to_awake_and_back(_) ->
|
|
% QoS = 1,
|
|
% Keepalive_Duration = 1,
|
|
% SleepDuration = 5,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
%
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(4500),
|
|
%
|
|
% % goto awake state and back
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(4500),
|
|
%
|
|
% % goto awake state and back
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% %% during above procedure, mqtt keepalive timer should not terminate mqtt-sn process
|
|
%
|
|
% %% asleep timer should get timeout, and device should get lost
|
|
% timer:sleep(8000),
|
|
%
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test03_to_awake_qos1_dl_msg(_) ->
|
|
% QoS = 1,
|
|
% Duration = 5,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% MsgId = 1000,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % subscribe
|
|
% TopicName1 = <<"abc">>,
|
|
% MsgId1 = 25,
|
|
% TopicId1 = ?MAX_PRED_TOPIC_ID + 1,
|
|
% WillBit = 0,
|
|
% Dup = 0,
|
|
% Retain = 0,
|
|
% CleanSession = 0,
|
|
% ReturnCode = 0,
|
|
% Payload1 = <<55, 66, 77, 88, 99>>,
|
|
% MsgId2 = 87,
|
|
%
|
|
% send_register_msg(Socket, TopicName1, MsgId1),
|
|
% ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)),
|
|
% send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
|
|
% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, 1),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(300),
|
|
%
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% %% send downlink data in asleep state. This message should be send to device once it wake up
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% send_publish_msg_predefined_topic(Socket, QoS, MsgId2, TopicId1, Payload1),
|
|
%
|
|
% {ok, C} = emqtt:start_link(),
|
|
% {ok, _} = emqtt:connect(C),
|
|
% {ok, _} = emqtt:publish(C, TopicName1, Payload1, QoS),
|
|
% timer:sleep(100),
|
|
% ok = emqtt:disconnect(C),
|
|
%
|
|
% timer:sleep(50),
|
|
%
|
|
% % goto awake state, receive downlink messages, and go back to asleep
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
%
|
|
% %% the broker should sent dl msgs to the awake client before sending the pingresp
|
|
% UdpData = receive_response(Socket),
|
|
% MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData),
|
|
% send_puback_msg(Socket, TopicId1, MsgId_udp),
|
|
%
|
|
% %% check the pingresp is received at last
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test04_to_awake_qos1_dl_msg(_) ->
|
|
% QoS = 1,
|
|
% Duration = 5,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % subscribe
|
|
% TopicName1 = <<"a/+/c">>,
|
|
% MsgId1 = 25,
|
|
% TopicId0 = 0,
|
|
% WillBit = 0,
|
|
% Dup = 0,
|
|
% Retain = 0,
|
|
% CleanSession = 0,
|
|
% ReturnCode = 0,
|
|
% send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
|
% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
|
% receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, 1),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(300),
|
|
%
|
|
% %% send downlink data in asleep state. This message should be send to device once it wake up
|
|
% Payload1 = <<55, 66, 77, 88, 99>>,
|
|
% Payload2 = <<55, 66, 77, 88, 100>>,
|
|
%
|
|
% {ok, C} = emqtt:start_link(),
|
|
% {ok, _} = emqtt:connect(C),
|
|
% {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload1, QoS),
|
|
% {ok, _} = emqtt:publish(C, <<"a/b/c">>, Payload2, QoS),
|
|
% timer:sleep(100),
|
|
% ok = emqtt:disconnect(C),
|
|
%
|
|
% timer:sleep(300),
|
|
%
|
|
% % goto awake state, receive downlink messages, and go back to asleep
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
%
|
|
% %% 1. get REGISTER first, since this topic has never been registered
|
|
% UdpData1 = receive_response(Socket),
|
|
% {TopicIdNew, MsgId3} = check_register_msg_on_udp(<<"a/b/c">>, UdpData1),
|
|
%
|
|
% %% 2. but before we reply the REGACK, the sn-gateway should not send any PUBLISH
|
|
% ?assertError(_, receive_publish(Socket)),
|
|
%
|
|
% send_regack_msg(Socket, TopicIdNew, MsgId3),
|
|
%
|
|
% UdpData2 = receive_response(Socket),
|
|
% MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId_udp2),
|
|
%
|
|
% UdpData3 = receive_response(Socket),
|
|
% MsgId_udp3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId_udp3),
|
|
%
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% gen_udp:close(Socket).
|
|
|
|
receive_publish(Socket) ->
|
|
UdpData3 = receive_response(Socket, 1000),
|
|
<<HeaderUdp:5/binary, _:16, _/binary>> = UdpData3,
|
|
<<_:8, ?SN_PUBLISH, _/binary>> = HeaderUdp.
|
|
|
|
%t_asleep_test05_to_awake_qos1_dl_msg(_) ->
|
|
% QoS = 1,
|
|
% Duration = 5,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % subscribe
|
|
% TopicName1 = <<"u/+/w">>,
|
|
% MsgId1 = 25,
|
|
% TopicId0 = 0,
|
|
% WillBit = 0,
|
|
% Dup = 0,
|
|
% Retain = 0,
|
|
% CleanSession = 0,
|
|
% ReturnCode = 0,
|
|
% send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
|
% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
|
% receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% SleepDuration = 30,
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(300),
|
|
%
|
|
% %% send downlink data in asleep state. This message should be send to device once it wake up
|
|
% Payload2 = <<55, 66, 77, 88, 99>>,
|
|
% Payload3 = <<61, 71, 81>>,
|
|
% Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>,
|
|
% TopicName_test5 = <<"u/v/w">>,
|
|
% {ok, C} = emqtt:start_link(),
|
|
% {ok, _} = emqtt:connect(C),
|
|
% {ok, _} = emqtt:publish(C, TopicName_test5, Payload2, QoS),
|
|
% timer:sleep(100),
|
|
% {ok, _} = emqtt:publish(C, TopicName_test5, Payload3, QoS),
|
|
% timer:sleep(100),
|
|
% {ok, _} = emqtt:publish(C, TopicName_test5, Payload4, QoS),
|
|
% timer:sleep(200),
|
|
% ok = emqtt:disconnect(C),
|
|
% timer:sleep(50),
|
|
%
|
|
% % goto awake state, receive downlink messages, and go back to asleep
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
%
|
|
% UdpData_reg = receive_response(Socket),
|
|
% {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg),
|
|
% send_regack_msg(Socket, TopicIdNew, MsgId_reg),
|
|
%
|
|
% UdpData2 = receive_response(Socket),
|
|
% MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId2),
|
|
% timer:sleep(50),
|
|
%
|
|
% UdpData3 = wrap_receive_response(Socket),
|
|
% MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId3),
|
|
% timer:sleep(50),
|
|
%
|
|
% case receive_response(Socket) of
|
|
% <<2,23>> -> ok;
|
|
% UdpData4 ->
|
|
% MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
|
|
% CleanSession, ?SN_NORMAL_TOPIC,
|
|
% TopicIdNew, Payload4}, UdpData4),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId4)
|
|
% end,
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test06_to_awake_qos2_dl_msg(_) ->
|
|
% QoS = 2,
|
|
% Duration = 1,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % subscribe
|
|
% TopicName_tom = <<"tom">>,
|
|
% MsgId1 = 25,
|
|
% WillBit = 0,
|
|
% Dup = 0,
|
|
% Retain = 0,
|
|
% CleanSession = 0,
|
|
% ReturnCode = 0,
|
|
% send_register_msg(Socket, TopicName_tom, MsgId1),
|
|
% timer:sleep(50),
|
|
% TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)),
|
|
% send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1),
|
|
% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1,
|
|
% ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>,
|
|
% receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% SleepDuration = 11,
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
%
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% %% send downlink data in asleep state. This message should be send to device once it wake up
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% Payload1 = <<55, 66, 77, 88, 99>>,
|
|
% {ok, C} = emqtt:start_link(),
|
|
% {ok, _} = emqtt:connect(C),
|
|
% {ok, _} = emqtt:publish(C, TopicName_tom, Payload1, QoS),
|
|
% timer:sleep(100),
|
|
% ok = emqtt:disconnect(C),
|
|
% timer:sleep(300),
|
|
%
|
|
% % goto awake state, receive downlink messages, and go back to asleep
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
%
|
|
% UdpData = wrap_receive_response(Socket),
|
|
% MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
|
|
% send_pubrec_msg(Socket, MsgId_udp),
|
|
% ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)),
|
|
% send_pubcomp_msg(Socket, MsgId_udp),
|
|
%
|
|
% %% verify the pingresp is received after receiving all the buffered qos2 msgs
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test07_to_connected(_) ->
|
|
% QoS = 1,
|
|
% Keepalive_Duration = 10,
|
|
% SleepDuration = 1,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % subscribe
|
|
% TopicName_tom = <<"tom">>,
|
|
% MsgId1 = 25,
|
|
% WillBit = 0,
|
|
% Dup = 0,
|
|
% Retain = 0,
|
|
% CleanSession = 0,
|
|
% ReturnCode = 0,
|
|
% send_register_msg(Socket, TopicName_tom, MsgId1),
|
|
% TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)),
|
|
% send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1),
|
|
% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>,
|
|
% receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
%
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% %% send connect message, and goto connected state
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% send_connect_msg(Socket, ClientId),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(1500),
|
|
% % asleep timer should get timeout, without any effect
|
|
%
|
|
% timer:sleep(4000),
|
|
% % keepalive timer should get timeout
|
|
%
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test08_to_disconnected(_) ->
|
|
% QoS = 1,
|
|
% Keepalive_Duration = 3,
|
|
% SleepDuration = 1,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
%
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% %% send disconnect message, and goto disconnected state
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% send_disconnect_msg(Socket, undefined),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
% % it is a normal termination, without will message
|
|
%
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
|
% QoS = 1,
|
|
% Duration = 5,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % subscribe
|
|
% TopicName1 = <<"u/+/k">>,
|
|
% MsgId1 = 25,
|
|
% TopicId0 = 0,
|
|
% WillBit = 0,
|
|
% Dup = 0,
|
|
% Retain = 0,
|
|
% CleanSession = 0,
|
|
% ReturnCode = 0,
|
|
% send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
|
% ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
|
% receive_response(Socket)),
|
|
% % goto asleep state
|
|
% SleepDuration = 30,
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(1000),
|
|
%
|
|
% %% send downlink data in asleep state. This message should be send to device once it wake up
|
|
% Payload2 = <<55, 66, 77, 88, 99>>,
|
|
% Payload3 = <<61, 71, 81>>,
|
|
% Payload4 = <<100, 101, 102, 103, 104, 105, 106, 107>>,
|
|
% TopicName_test9 = <<"u/v/k">>,
|
|
% {ok, C} = emqtt:start_link(),
|
|
% {ok, _} = emqtt:connect(C),
|
|
% {ok, _} = emqtt:publish(C, TopicName_test9, Payload2, QoS),
|
|
% timer:sleep(100),
|
|
% {ok, _} = emqtt:publish(C, TopicName_test9, Payload3, QoS),
|
|
% timer:sleep(100),
|
|
% {ok, _} = emqtt:publish(C, TopicName_test9, Payload4, QoS),
|
|
% timer:sleep(1000),
|
|
% ok = emqtt:disconnect(C),
|
|
%
|
|
% % goto awake state, receive downlink messages, and go back to asleep
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
%
|
|
% UdpData_reg = receive_response(Socket),
|
|
% {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg),
|
|
% send_regack_msg(Socket, TopicIdNew, MsgId_reg),
|
|
%
|
|
% case wrap_receive_response(Socket) of
|
|
% udp_receive_timeout ->
|
|
% ok;
|
|
% UdpData2 ->
|
|
% MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId2)
|
|
% end,
|
|
% timer:sleep(100),
|
|
%
|
|
% case wrap_receive_response(Socket) of
|
|
% udp_receive_timeout ->
|
|
% ok;
|
|
% UdpData3 ->
|
|
% MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId3)
|
|
% end,
|
|
% timer:sleep(100),
|
|
%
|
|
% case wrap_receive_response(Socket) of
|
|
% udp_receive_timeout ->
|
|
% ok;
|
|
% UdpData4 ->
|
|
% MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
|
|
% CleanSession, ?SN_NORMAL_TOPIC,
|
|
% TopicIdNew, Payload4}, UdpData4),
|
|
% send_puback_msg(Socket, TopicIdNew, MsgId4)
|
|
% end,
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% %% send PINGREQ again to enter awake state
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
% %% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_awake_test01_to_connected(_) ->
|
|
% QoS = 1,
|
|
% Keepalive_Duration = 3,
|
|
% SleepDuration = 1,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
%
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% %% send connect message, and goto connected state
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% send_connect_msg(Socket, ClientId),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(1500),
|
|
% % asleep timer should get timeout
|
|
%
|
|
% timer:sleep(4000),
|
|
% % keepalive timer should get timeout
|
|
% gen_udp:close(Socket).
|
|
%
|
|
%t_awake_test02_to_disconnected(_) ->
|
|
% QoS = 1,
|
|
% Keepalive_Duration = 3,
|
|
% SleepDuration = 1,
|
|
% WillTopic = <<"dead">>,
|
|
% WillPayload = <<10, 11, 12, 13, 14>>,
|
|
% {ok, Socket} = gen_udp:open(0, [binary]),
|
|
% ClientId = ?CLIENTID,
|
|
% send_connect_msg_with_will(Socket, Keepalive_Duration, ClientId),
|
|
% ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
|
% send_willtopic_msg(Socket, WillTopic, QoS),
|
|
% ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
|
% send_willmsg_msg(Socket, WillPayload),
|
|
% ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
|
%
|
|
%
|
|
% % goto asleep state
|
|
% send_disconnect_msg(Socket, SleepDuration),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
%
|
|
% % goto awake state
|
|
% send_pingreq_msg(Socket, ClientId),
|
|
% ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
|
%
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% %% send disconnect message, and goto disconnected state
|
|
% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
% send_disconnect_msg(Socket, undefined),
|
|
% ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
|
%
|
|
% timer:sleep(100),
|
|
% % it is a normal termination, no will message will be send
|
|
%
|
|
% gen_udp:close(Socket).
|
|
|
|
t_broadcast_test1(_) ->
|
|
{ok, Socket} = gen_udp:open( 0, [binary]),
|
|
send_searchgw_msg(Socket),
|
|
?assertEqual(<<3, ?SN_GWINFO, 1>>, receive_response(Socket)),
|
|
timer:sleep(600),
|
|
gen_udp:close(Socket).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Helper funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
send_searchgw_msg(Socket) ->
|
|
Length = 3,
|
|
MsgType = ?SN_SEARCHGW,
|
|
Radius = 0,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
|
|
|
|
send_connect_msg(Socket, ClientId) ->
|
|
Length = 6 + byte_size(ClientId),
|
|
MsgType = ?SN_CONNECT,
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 1,
|
|
TopicIdType = 0,
|
|
ProtocolId = 1,
|
|
Duration = 10,
|
|
Packet = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
|
TopicIdType:2, ProtocolId:8, Duration:16, ClientId/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
|
|
|
|
send_connect_msg_with_will(Socket, Duration, ClientId) ->
|
|
Length = 6 + byte_size(ClientId),
|
|
Will = 1,
|
|
CleanSession = 1,
|
|
ProtocolId = 1,
|
|
ConnectPacket = <<Length:8, ?SN_CONNECT:8, ?FNU:4, Will:1, CleanSession:1,
|
|
?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
|
|
|
|
send_connect_msg_with_will1(Socket, Duration, ClientId) ->
|
|
Length = 6 + byte_size(ClientId),
|
|
Will = 1,
|
|
CleanSession = 0,
|
|
ProtocolId = 1,
|
|
ConnectPacket = <<Length:8, ?SN_CONNECT:8, ?FNU:4, Will:1, CleanSession:1,
|
|
?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
|
|
|
|
send_willtopic_msg(Socket, Topic, QoS) ->
|
|
Length = 3+byte_size(Topic),
|
|
MsgType = ?SN_WILLTOPIC,
|
|
Retain = 0,
|
|
WillTopicPacket = <<Length:8, MsgType:8, ?FNU:1, QoS:2, Retain:1, ?FNU:4, Topic/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket).
|
|
|
|
send_willtopic_empty_msg(Socket) ->
|
|
Length = 2,
|
|
MsgType = ?SN_WILLTOPIC,
|
|
WillTopicPacket = <<Length:8, MsgType:8>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket).
|
|
|
|
send_willmsg_msg(Socket, Msg) ->
|
|
Length = 2+byte_size(Msg),
|
|
WillMsgPacket = <<Length:8, ?SN_WILLMSG:8, Msg/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, WillMsgPacket).
|
|
|
|
send_willtopicupd_msg(Socket, Topic, QoS) ->
|
|
Length = 3+byte_size(Topic),
|
|
MsgType = ?SN_WILLTOPICUPD,
|
|
Retain = 0,
|
|
WillTopicPacket = <<Length:8, MsgType:8, ?FNU:1, QoS:2, Retain:1, ?FNU:4, Topic/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket).
|
|
|
|
send_willtopicupd_empty_msg(Socket) ->
|
|
Length = 2,
|
|
MsgType = ?SN_WILLTOPICUPD,
|
|
WillTopicPacket = <<Length:8, MsgType:8>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket).
|
|
|
|
send_willmsgupd_msg(Socket, Msg) ->
|
|
Length = 2+byte_size(Msg),
|
|
MsgType = ?SN_WILLMSGUPD,
|
|
WillTopicPacket = <<Length:8, MsgType:8, Msg/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, WillTopicPacket).
|
|
|
|
send_register_msg(Socket, TopicName, MsgId) ->
|
|
Length = 6 + byte_size(TopicName),
|
|
MsgType = ?SN_REGISTER,
|
|
TopicId = 0,
|
|
RegisterPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, TopicName/binary>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, RegisterPacket).
|
|
|
|
send_regack_msg(Socket, TopicId, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_REGACK,
|
|
Packet = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
|
|
|
|
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId, Data) ->
|
|
Length = 7 + byte_size(Data),
|
|
MsgType = ?SN_PUBLISH,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_NORMAL_TOPIC,
|
|
PublishPacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, TopicId:16, MsgId:16, Data/binary>>,
|
|
?LOG("send_publish_msg_normal_topic TopicId=~p, Data=~p", [TopicId, Data]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
|
|
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId, TopicId, Data) ->
|
|
Length = 7 + byte_size(Data),
|
|
MsgType = ?SN_PUBLISH,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_PREDEFINED_TOPIC,
|
|
PublishPacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, TopicId:16, MsgId:16, Data/binary>>,
|
|
?LOG("send_publish_msg_predefined_topic TopicId=~p, Data=~p", [TopicId, Data]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
|
|
|
send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) ->
|
|
Length = 7 + byte_size(Data),
|
|
MsgType = ?SN_PUBLISH,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = 2,
|
|
PublishPacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, TopicName/binary, MsgId:16, Data/binary>>,
|
|
?LOG("send_publish_msg_short_topic TopicName=~p, Data=~p", [TopicName, Data]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
|
|
|
|
send_puback_msg(Socket, TopicId, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_PUBACK,
|
|
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED:8>>,
|
|
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).
|
|
|
|
send_pubrec_msg(Socket, MsgId) ->
|
|
Length = 4,
|
|
MsgType = ?SN_PUBREC,
|
|
PubRecPacket = <<Length:8, MsgType:8, MsgId:16>>,
|
|
?LOG("send_pubrec_msg MsgId=~p", [MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubRecPacket).
|
|
|
|
send_pubrel_msg(Socket, MsgId) ->
|
|
Length = 4,
|
|
MsgType = ?SN_PUBREL,
|
|
PubRelPacket = <<Length:8, MsgType:8, MsgId:16>>,
|
|
?LOG("send_pubrel_msg MsgId=~p", [MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubRelPacket).
|
|
|
|
send_pubcomp_msg(Socket, MsgId) ->
|
|
Length = 4,
|
|
MsgType = ?SN_PUBCOMP,
|
|
PubCompPacket = <<Length:8, MsgType:8, MsgId:16>>,
|
|
?LOG("send_pubcomp_msg MsgId=~p", [MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubCompPacket).
|
|
|
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId) ->
|
|
MsgType = ?SN_SUBSCRIBE,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_NORMAL_TOPIC,
|
|
Length = byte_size(Topic) + 5,
|
|
SubscribePacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, Topic/binary>>,
|
|
?LOG("send_subscribe_msg_normal_topic Topic=~p, MsgId=~p", [Topic, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket).
|
|
|
|
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_SUBSCRIBE,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_PREDEFINED_TOPIC,
|
|
SubscribePacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, TopicId:16>>,
|
|
?LOG("send_subscribe_msg_predefined_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket).
|
|
|
|
send_subscribe_msg_short_topic(Socket, QoS, Topic, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_SUBSCRIBE,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_SHORT_TOPIC,
|
|
SubscribePacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, Topic/binary>>,
|
|
?LOG("send_subscribe_msg_short_topic Topic=~p, MsgId=~p", [Topic, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket).
|
|
|
|
send_subscribe_msg_reserved_topic(Socket, QoS, TopicId, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_SUBSCRIBE,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_RESERVED_TOPIC,
|
|
SubscribePacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, TopicId:16>>,
|
|
?LOG("send_subscribe_msg_reserved_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, SubscribePacket).
|
|
|
|
send_unsubscribe_msg_predefined_topic(Socket, TopicId, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_UNSUBSCRIBE,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_PREDEFINED_TOPIC,
|
|
UnSubscribePacket = <<Length:8, MsgType:8, Dup:1, 0:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, TopicId:16>>,
|
|
?LOG("send_unsubscribe_msg_predefined_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, UnSubscribePacket).
|
|
|
|
send_unsubscribe_msg_normal_topic(Socket, TopicName, MsgId) ->
|
|
MsgType = ?SN_UNSUBSCRIBE,
|
|
Dup = 0,
|
|
QoS = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_NORMAL_TOPIC,
|
|
Length = 5 + byte_size(TopicName),
|
|
UnSubscribePacket = <<Length:8, MsgType:8, Dup:1, QoS:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, TopicName/binary>>,
|
|
?LOG("send_unsubscribe_msg_normal_topic TopicName=~p, MsgId=~p", [TopicName, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, UnSubscribePacket).
|
|
|
|
send_unsubscribe_msg_short_topic(Socket, TopicId, MsgId) ->
|
|
Length = 7,
|
|
MsgType = ?SN_UNSUBSCRIBE,
|
|
Dup = 0,
|
|
Retain = 0,
|
|
Will = 0,
|
|
CleanSession = 0,
|
|
TopicIdType = ?SN_SHORT_TOPIC,
|
|
UnSubscribePacket = <<Length:8, MsgType:8, Dup:1, ?QOS_0:2, Retain:1, Will:1,
|
|
CleanSession:1, TopicIdType:2, MsgId:16, TopicId/binary>>,
|
|
?LOG("send_unsubscribe_msg_short_topic TopicId=~p, MsgId=~p", [TopicId, MsgId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, UnSubscribePacket).
|
|
|
|
send_pingreq_msg(Socket, ClientId)->
|
|
Length = 2,
|
|
MsgType = ?SN_PINGREQ,
|
|
PingReqPacket = case ClientId of
|
|
undefined ->
|
|
<<Length:8, MsgType:8>>;
|
|
Other ->
|
|
Size = byte_size(Other)+2,
|
|
<<Size:8, MsgType:8, Other/binary>>
|
|
end,
|
|
?LOG("send_pingreq_msg ClientId=~p", [ClientId]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, PingReqPacket).
|
|
|
|
send_disconnect_msg(Socket, Duration) ->
|
|
Length = 2, Length2 = 4,
|
|
MsgType = ?SN_DISCONNECT,
|
|
DisConnectPacket = case Duration of
|
|
undefined -> <<Length:8, MsgType:8>>;
|
|
Other -> <<Length2:8, MsgType:8, Other:16>>
|
|
end,
|
|
?LOG("send_disconnect_msg Duration=~p", [Duration]),
|
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket).
|
|
|
|
mid(Id) -> Id.
|
|
tid(Id) -> Id.
|
|
|
|
%% filter <<2, 23>> pingresp packet
|
|
wrap_receive_response(Socket) ->
|
|
case receive_response(Socket) of
|
|
<<2,23>> ->
|
|
ct:log("PingResp"),
|
|
wrap_receive_response(Socket);
|
|
Other ->
|
|
ct:log("Other: ~p", [Other]),
|
|
Other
|
|
end.
|
|
receive_response(Socket) ->
|
|
receive_response(Socket, 2000).
|
|
receive_response(Socket, Timeout) ->
|
|
receive
|
|
{udp, Socket, _, _, Bin} ->
|
|
?LOG("receive_response Bin=~p~n", [Bin]),
|
|
Bin;
|
|
{mqttc, From, Data2} ->
|
|
?LOG("receive_response() ignore mqttc From=~p, Data2=~p~n", [From, Data2]),
|
|
receive_response(Socket);
|
|
Other ->
|
|
?LOG("receive_response() Other message: ~p", [{unexpected_udp_data, Other}]),
|
|
receive_response(Socket)
|
|
after Timeout ->
|
|
udp_receive_timeout
|
|
end.
|
|
|
|
receive_emqttc_response() ->
|
|
receive
|
|
{mqttc, _From, Data2} ->
|
|
Data2;
|
|
{publish, Topic, Payload} ->
|
|
{publish, Topic, Payload};
|
|
Other -> {unexpected_emqttc_data, Other}
|
|
after 2000 ->
|
|
emqttc_receive_timeout
|
|
end.
|
|
|
|
check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket) ->
|
|
PubMsg = receive_response(Socket),
|
|
Length = 7 + byte_size(Payload),
|
|
?LOG("check_dispatched_message ~p~n", [PubMsg]),
|
|
?LOG("expected ~p xx ~p~n", [<<Length, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16>>, Payload]),
|
|
<<Length, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16, MsgId:16, Payload/binary>> = PubMsg,
|
|
case QoS of
|
|
0 -> ok;
|
|
1 -> send_puback_msg(Socket, TopicId, MsgId);
|
|
2 -> send_pubrel_msg(Socket, MsgId),
|
|
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket))
|
|
end,
|
|
ok.
|
|
|
|
get_udp_broadcast_address() ->
|
|
"255.255.255.255".
|
|
|
|
check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) ->
|
|
<<HeaderUdp:5/binary, MsgId:16, PayloadIn/binary>> = UdpData,
|
|
ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]),
|
|
Size9 = byte_size(Payload) + 7,
|
|
Eexp = <<Size9:8, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, TopicType:2, TopicId:16>>,
|
|
?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same
|
|
?assertEqual(Payload, PayloadIn), % payload should be same
|
|
MsgId.
|
|
|
|
check_register_msg_on_udp(TopicName, UdpData) ->
|
|
ct:log("UdpData: ~p~n", [UdpData]),
|
|
<<HeaderUdp:2/binary, TopicId:16, MsgId:16, PayloadIn/binary>> = UdpData,
|
|
Size = byte_size(TopicName) + 6,
|
|
?assertEqual(<<Size:8, ?SN_REGISTER>>, HeaderUdp),
|
|
?assertEqual(TopicName, PayloadIn),
|
|
{TopicId, MsgId}.
|
|
|
|
check_regack_msg_on_udp(MsgId, UdpData) ->
|
|
<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>> = UdpData,
|
|
TopicId.
|
|
|
|
flush() ->
|
|
flush([]).
|
|
flush(Msgs) ->
|
|
receive
|
|
M -> flush([M|Msgs])
|
|
after
|
|
0 -> lists:reverse(Msgs)
|
|
end.
|