Merge pull request #9024 from lafirest/test/mqtt_sn
test(mqtt_sn): improve test coverage to 90%
This commit is contained in:
commit
7fd760db2c
|
@ -12,7 +12,9 @@ File format:
|
||||||
|
|
||||||
## v4.3.22
|
## v4.3.22
|
||||||
|
|
||||||
### Minor changes
|
### Bug fixes
|
||||||
|
|
||||||
|
- Fix that after receiving publish in `idle mode` the emqx-sn gateway may panic. [#9024](https://github.com/emqx/emqx/pull/9024)
|
||||||
|
|
||||||
## v4.3.21
|
## v4.3.21
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_sn,
|
{application, emqx_sn,
|
||||||
[{description, "EMQ X MQTT-SN Plugin"},
|
[{description, "EMQ X MQTT-SN Plugin"},
|
||||||
{vsn, "4.3.7"}, % strict semver, bump manually!
|
{vsn, "4.3.8"}, % strict semver, bump manually!
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel,stdlib,esockd]},
|
{applications, [kernel,stdlib,esockd]},
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
{"4.3.7",[
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.6",[
|
{"4.3.6",[
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
|
@ -29,6 +32,9 @@
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
|
{"4.3.7",[
|
||||||
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
{"4.3.6",[
|
{"4.3.6",[
|
||||||
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
|
|
|
@ -208,7 +208,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) ->
|
||||||
|
|
||||||
idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) ->
|
idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) ->
|
||||||
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!"),
|
?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!"),
|
||||||
{keep_state, State#state.idle_timeout};
|
{keep_state_and_data, State#state.idle_timeout};
|
||||||
|
|
||||||
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
|
idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
|
||||||
topic_id_type = TopicIdType
|
topic_id_type = TopicIdType
|
||||||
|
@ -226,7 +226,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId]),
|
?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId]),
|
||||||
{keep_state, State#state.idle_timeout};
|
{keep_state_and_data, State#state.idle_timeout};
|
||||||
|
|
||||||
idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
|
idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) ->
|
||||||
handle_ping(PingReq, State);
|
handle_ping(PingReq, State);
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
-include_lib("emqx_sn/include/emqx_sn.hrl").
|
-include_lib("emqx_sn/include/emqx_sn.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-define(SHOW(X), ??X).
|
||||||
|
|
||||||
-import(emqx_sn_frame, [ parse/1
|
-import(emqx_sn_frame, [ parse/1
|
||||||
, serialize/1
|
, serialize/1
|
||||||
|
@ -67,6 +68,14 @@ t_willtopic(_) ->
|
||||||
Wt = #mqtt_sn_message{type = ?SN_WILLTOPIC, variable = {Flags, <<"WillTopic">>}},
|
Wt = #mqtt_sn_message{type = ?SN_WILLTOPIC, variable = {Flags, <<"WillTopic">>}},
|
||||||
?assertEqual({ok, Wt}, parse(serialize(Wt))).
|
?assertEqual({ok, Wt}, parse(serialize(Wt))).
|
||||||
|
|
||||||
|
t_undefined_willtopic(_) ->
|
||||||
|
Wt = #mqtt_sn_message{type = ?SN_WILLTOPIC},
|
||||||
|
?assertEqual({ok, Wt}, parse(serialize(Wt))).
|
||||||
|
|
||||||
|
t_willtopic_resp(_) ->
|
||||||
|
Wt = #mqtt_sn_message{type = ?SN_WILLTOPICRESP, variable = 0},
|
||||||
|
?assertEqual({ok, Wt}, parse(serialize(Wt))).
|
||||||
|
|
||||||
t_willmsgreq(_) ->
|
t_willmsgreq(_) ->
|
||||||
WmReq = #mqtt_sn_message{type = ?SN_WILLMSGREQ},
|
WmReq = #mqtt_sn_message{type = ?SN_WILLMSGREQ},
|
||||||
?assertEqual({ok, WmReq}, parse(serialize(WmReq))).
|
?assertEqual({ok, WmReq}, parse(serialize(WmReq))).
|
||||||
|
@ -88,6 +97,12 @@ t_publish(_) ->
|
||||||
PubMsg = #mqtt_sn_message{type = ?SN_PUBLISH, variable = {Flags, 1, 2, <<"Payload">>}},
|
PubMsg = #mqtt_sn_message{type = ?SN_PUBLISH, variable = {Flags, 1, 2, <<"Payload">>}},
|
||||||
?assertEqual({ok, PubMsg}, parse(serialize(PubMsg))).
|
?assertEqual({ok, PubMsg}, parse(serialize(PubMsg))).
|
||||||
|
|
||||||
|
t_publish_long_msg(_) ->
|
||||||
|
Flags = #mqtt_sn_flags{dup = false, qos = 1, retain = false, topic_id_type = 2#01},
|
||||||
|
Payload = generate_random_binary(256 + rand:uniform(256)),
|
||||||
|
PubMsg = #mqtt_sn_message{type = ?SN_PUBLISH, variable = {Flags, 1, 2, Payload}},
|
||||||
|
?assertEqual({ok, PubMsg}, parse(serialize(PubMsg))).
|
||||||
|
|
||||||
t_puback(_) ->
|
t_puback(_) ->
|
||||||
PubAck = #mqtt_sn_message{type = ?SN_PUBACK, variable = {1, 2, 0}},
|
PubAck = #mqtt_sn_message{type = ?SN_PUBACK, variable = {1, 2, 0}},
|
||||||
?assertEqual({ok, PubAck}, parse(serialize(PubAck))).
|
?assertEqual({ok, PubAck}, parse(serialize(PubAck))).
|
||||||
|
@ -105,9 +120,21 @@ t_pubcomp(_) ->
|
||||||
?assertEqual({ok, PubComp}, parse(serialize(PubComp))).
|
?assertEqual({ok, PubComp}, parse(serialize(PubComp))).
|
||||||
|
|
||||||
t_subscribe(_) ->
|
t_subscribe(_) ->
|
||||||
Flags = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = 16#01},
|
Flags = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_PREDEFINED_TOPIC},
|
||||||
SubMsg = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags, 16#4321, 16}},
|
SubMsg = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags, 16#4321, 16}},
|
||||||
?assertEqual({ok, SubMsg}, parse(serialize(SubMsg))).
|
?assertEqual({ok, SubMsg}, parse(serialize(SubMsg))),
|
||||||
|
|
||||||
|
Flags1 = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_NORMAL_TOPIC},
|
||||||
|
SubMsg1 = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags1, 16#4321, <<"t/+">>}},
|
||||||
|
?assertEqual({ok, SubMsg1}, parse(serialize(SubMsg1))),
|
||||||
|
|
||||||
|
Flags2 = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_SHORT_TOPIC},
|
||||||
|
SubMsg2 = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags2, 16#4321, <<"t/+">>}},
|
||||||
|
?assertEqual({ok, SubMsg2}, parse(serialize(SubMsg2))),
|
||||||
|
|
||||||
|
Flags3 = #mqtt_sn_flags{dup = false, qos = 1, topic_id_type = ?SN_RESERVED_TOPIC},
|
||||||
|
SubMsg3 = #mqtt_sn_message{type = ?SN_SUBSCRIBE, variable = {Flags3, 16#4321, <<"t/+">>}},
|
||||||
|
?assertEqual({ok, SubMsg3}, parse(serialize(SubMsg3))).
|
||||||
|
|
||||||
t_suback(_) ->
|
t_suback(_) ->
|
||||||
Flags = #mqtt_sn_flags{qos = 1},
|
Flags = #mqtt_sn_flags{qos = 1},
|
||||||
|
@ -137,6 +164,10 @@ t_disconnect(_) ->
|
||||||
Disconn = #mqtt_sn_message{type = ?SN_DISCONNECT},
|
Disconn = #mqtt_sn_message{type = ?SN_DISCONNECT},
|
||||||
?assertEqual({ok, Disconn}, parse(serialize(Disconn))).
|
?assertEqual({ok, Disconn}, parse(serialize(Disconn))).
|
||||||
|
|
||||||
|
t_disconnect_duration(_) ->
|
||||||
|
Disconn = #mqtt_sn_message{type = ?SN_DISCONNECT, variable = 120},
|
||||||
|
?assertEqual({ok, Disconn}, parse(serialize(Disconn))).
|
||||||
|
|
||||||
t_willtopicupd(_) ->
|
t_willtopicupd(_) ->
|
||||||
Flags = #mqtt_sn_flags{qos = 1, retain = true},
|
Flags = #mqtt_sn_flags{qos = 1, retain = true},
|
||||||
WtUpd = #mqtt_sn_message{type = ?SN_WILLTOPICUPD, variable = {Flags, <<"Topic">>}},
|
WtUpd = #mqtt_sn_message{type = ?SN_WILLTOPICUPD, variable = {Flags, <<"Topic">>}},
|
||||||
|
@ -150,6 +181,43 @@ t_willmsgresp(_) ->
|
||||||
UpdResp = #mqtt_sn_message{type = ?SN_WILLMSGRESP, variable = 0},
|
UpdResp = #mqtt_sn_message{type = ?SN_WILLMSGRESP, variable = 0},
|
||||||
?assertEqual({ok, UpdResp}, parse(serialize(UpdResp))).
|
?assertEqual({ok, UpdResp}, parse(serialize(UpdResp))).
|
||||||
|
|
||||||
|
t_invalid_inpacket(_) ->
|
||||||
|
Bin = <<2:8/big-integer, 16#F0:8/big-integer>>,
|
||||||
|
?assertMatch({'EXIT', {unkown_message_type, _Stack}}, catch parse(Bin)).
|
||||||
|
|
||||||
|
t_message_type(_) ->
|
||||||
|
TypeNames = [ {?SN_ADVERTISE, ?SHOW(SN_ADVERTISE)}
|
||||||
|
, {?SN_SEARCHGW, ?SHOW(SN_SEARCHGW)}
|
||||||
|
, {?SN_GWINFO, ?SHOW(SN_GWINFO)}
|
||||||
|
, {?SN_CONNECT, ?SHOW(SN_CONNECT)}
|
||||||
|
, {?SN_CONNACK, ?SHOW(SN_CONNACK)}
|
||||||
|
, {?SN_WILLTOPICREQ, ?SHOW(SN_WILLTOPICREQ)}
|
||||||
|
, {?SN_WILLTOPIC, ?SHOW(SN_WILLTOPIC)}
|
||||||
|
, {?SN_WILLMSGREQ, ?SHOW(SN_WILLMSGREQ)}
|
||||||
|
, {?SN_WILLMSG, ?SHOW(SN_WILLMSG)}
|
||||||
|
, {?SN_REGISTER, ?SHOW(SN_REGISTER)}
|
||||||
|
, {?SN_REGACK, ?SHOW(SN_REGACK)}
|
||||||
|
, {?SN_PUBLISH, ?SHOW(SN_PUBLISH)}
|
||||||
|
, {?SN_PUBACK, ?SHOW(SN_PUBACK)}
|
||||||
|
, {?SN_PUBCOMP, ?SHOW(SN_PUBCOMP)}
|
||||||
|
, {?SN_PUBREC, ?SHOW(SN_PUBREC)}
|
||||||
|
, {?SN_PUBREL, ?SHOW(SN_PUBREL)}
|
||||||
|
, {?SN_SUBSCRIBE, ?SHOW(SN_SUBSCRIBE)}
|
||||||
|
, {?SN_SUBACK, ?SHOW(SN_SUBACK)}
|
||||||
|
, {?SN_UNSUBSCRIBE, ?SHOW(SN_UNSUBSCRIBE)}
|
||||||
|
, {?SN_UNSUBACK, ?SHOW(SN_UNSUBACK)}
|
||||||
|
, {?SN_PINGREQ, ?SHOW(SN_PINGREQ)}
|
||||||
|
, {?SN_PINGRESP, ?SHOW(SN_PINGRESP)}
|
||||||
|
, {?SN_DISCONNECT, ?SHOW(SN_DISCONNECT)}
|
||||||
|
, {?SN_WILLTOPICUPD, ?SHOW(SN_WILLTOPICUPD)}
|
||||||
|
, {?SN_WILLTOPICRESP, ?SHOW(SN_WILLTOPICRESP)}
|
||||||
|
, {?SN_WILLMSGUPD, ?SHOW(SN_WILLMSGUPD)}
|
||||||
|
, {?SN_WILLMSGRESP, ?SHOW(SN_WILLMSGRESP)}
|
||||||
|
],
|
||||||
|
{Types, Names} = lists:unzip(TypeNames),
|
||||||
|
?assertEqual(Names, [emqx_sn_frame:message_type(Type) || Type <- Types]),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_random_test(_) ->
|
t_random_test(_) ->
|
||||||
random_test_body(),
|
random_test_body(),
|
||||||
random_test_body(),
|
random_test_body(),
|
||||||
|
@ -171,6 +239,9 @@ random_test_body() ->
|
||||||
generate_random_binary() ->
|
generate_random_binary() ->
|
||||||
% The min packet length is 2
|
% The min packet length is 2
|
||||||
Len = rand:uniform(299) + 1,
|
Len = rand:uniform(299) + 1,
|
||||||
|
generate_random_binary(Len).
|
||||||
|
|
||||||
|
generate_random_binary(Len) ->
|
||||||
gen_next(Len, <<>>).
|
gen_next(Len, <<>>).
|
||||||
|
|
||||||
gen_next(0, Acc) ->
|
gen_next(0, Acc) ->
|
||||||
|
@ -178,4 +249,3 @@ gen_next(0, Acc) ->
|
||||||
gen_next(N, Acc) ->
|
gen_next(N, Acc) ->
|
||||||
Byte = rand:uniform(256) - 1,
|
Byte = rand:uniform(256) - 1,
|
||||||
gen_next(N-1, <<Acc/binary, Byte:8>>).
|
gen_next(N-1, <<Acc/binary, Byte:8>>).
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_sn_misc_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Setups
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_ct_helpers:start_apps([emqx_sn]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_ct_helpers:stop_apps([emqx_sn]).
|
||||||
|
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
t_sn_app(_) ->
|
||||||
|
?assertMatch({'EXIT', {_, _}}, catch emqx_sn_app:start_listeners()),
|
||||||
|
?assertMatch({error, _}, emqx_sn_app:stop_listener({udp, 9999, []})),
|
||||||
|
?assertMatch({error, _}, emqx_sn_app:stop_listener({udp, {{0,0,0,0}, 9999}, []})),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_sn_broadcast(_) ->
|
||||||
|
?assertEqual(ignored, gen_server:call(emqx_sn_broadcast, ignored)),
|
||||||
|
?assertEqual(ok, gen_server:cast(emqx_sn_broadcast, ignored)),
|
||||||
|
?assertEqual(ignored, erlang:send(emqx_sn_broadcast, ignored)),
|
||||||
|
?assertEqual(broadcast_advertise, erlang:send(emqx_sn_broadcast, broadcast_advertise)),
|
||||||
|
?assertEqual(ok, emqx_sn_broadcast:stop()).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Helper funcs
|
||||||
|
%%--------------------------------------------------------------------
|
|
@ -85,6 +85,15 @@ restart_emqx_sn(#{subs_resume := Bool}) ->
|
||||||
_ = application:ensure_all_started(emqx_sn),
|
_ = application:ensure_all_started(emqx_sn),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
recoverable_restart_emqx_sn(Setup) ->
|
||||||
|
AppEnvs = application:get_all_env(emqx_sn),
|
||||||
|
Setup(),
|
||||||
|
_ = application:stop(emqx_sn),
|
||||||
|
_ = application:ensure_all_started(emqx_sn),
|
||||||
|
fun() ->
|
||||||
|
application:set_env([{emqx_sn, AppEnvs}])
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -100,6 +109,17 @@ t_connect(_) ->
|
||||||
send_connect_msg(Socket, <<"client_id_test1">>),
|
send_connect_msg(Socket, <<"client_id_test1">>),
|
||||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
%% unexpected advertise
|
||||||
|
Adv = ?SN_ADVERTISE_MSG(1, 100),
|
||||||
|
AdvPacket = emqx_sn_frame:serialize(Adv),
|
||||||
|
send_packet(Socket, AdvPacket),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
%% unexpected connect
|
||||||
|
ClientId = ?CLIENTID,
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
send_disconnect_msg(Socket, undefined),
|
send_disconnect_msg(Socket, undefined),
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
@ -377,6 +397,38 @@ t_subscribe_case08(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_subscribe_case09(_) ->
|
||||||
|
Dup = 0,
|
||||||
|
QoS = 0,
|
||||||
|
Retain = 0,
|
||||||
|
CleanSession = 0,
|
||||||
|
ReturnCode = 0,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
ClientId = ?CLIENTID,
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
TopicName1 = <<"t/+">>,
|
||||||
|
MsgId1 = 25,
|
||||||
|
TopicId0 = 0,
|
||||||
|
WillBit = 0,
|
||||||
|
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
||||||
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||||
|
receive_response(Socket)),
|
||||||
|
|
||||||
|
{ok, C} = emqtt:start_link(),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
ok = emqtt:publish(C, <<"t/1">>, <<"Hello">>, 0),
|
||||||
|
timer:sleep(100),
|
||||||
|
ok = emqtt:disconnect(C),
|
||||||
|
|
||||||
|
timer:sleep(50),
|
||||||
|
?assertError(_, receive_publish(Socket)),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_publish_negqos_case09(_) ->
|
t_publish_negqos_case09(_) ->
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 0,
|
QoS = 0,
|
||||||
|
@ -393,7 +445,6 @@ t_publish_negqos_case09(_) ->
|
||||||
|
|
||||||
Topic = <<"abc">>,
|
Topic = <<"abc">>,
|
||||||
|
|
||||||
|
|
||||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
||||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||||
receive_response(Socket)),
|
receive_response(Socket)),
|
||||||
|
@ -412,6 +463,16 @@ t_publish_negqos_case09(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_publish_negqos_case10(_) ->
|
||||||
|
QoS = ?QOS_NEG1,
|
||||||
|
MsgId = 1,
|
||||||
|
TopicId1 = ?PREDEF_TOPIC_ID1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
Payload1 = <<20, 21, 22, 23>>,
|
||||||
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
||||||
|
timer:sleep(100),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_publish_qos0_case01(_) ->
|
t_publish_qos0_case01(_) ->
|
||||||
Dup = 0,
|
Dup = 0,
|
||||||
QoS = 0,
|
QoS = 0,
|
||||||
|
@ -1075,6 +1136,60 @@ t_will_case06(_) ->
|
||||||
|
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_will_case07(_) ->
|
||||||
|
QoS = 1,
|
||||||
|
Duration = 1,
|
||||||
|
WillMsg = <<10, 11, 12, 13, 14>>,
|
||||||
|
WillTopic = <<"abc">>,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
ClientId = ?CLIENTID,
|
||||||
|
|
||||||
|
ok = emqx_broker:subscribe(WillTopic),
|
||||||
|
|
||||||
|
send_connect_msg_with_will(Socket, Duration, ClientId),
|
||||||
|
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
%% unexpected advertise
|
||||||
|
Adv = ?SN_ADVERTISE_MSG(1, 100),
|
||||||
|
AdvPacket = emqx_sn_frame:serialize(Adv),
|
||||||
|
send_packet(Socket, AdvPacket),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
%% unexpected connect
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
send_willtopic_msg(Socket, WillTopic, QoS),
|
||||||
|
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
%% unexpected advertise
|
||||||
|
send_packet(Socket, AdvPacket),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
%% unexpected connect
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
send_willmsg_msg(Socket, WillMsg),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_pingreq_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
% wait udp client keepalive timeout
|
||||||
|
timer:sleep(2000),
|
||||||
|
|
||||||
|
receive
|
||||||
|
{deliver, WillTopic, #message{payload = WillMsg}} -> ok;
|
||||||
|
Msg -> ct:print("recevived --- unex: ~p", [Msg])
|
||||||
|
after
|
||||||
|
1000 -> ct:fail(wait_willmsg_timeout)
|
||||||
|
end,
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
||||||
|
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_asleep_test01_timeout(_) ->
|
t_asleep_test01_timeout(_) ->
|
||||||
QoS = 1,
|
QoS = 1,
|
||||||
Duration = 1,
|
Duration = 1,
|
||||||
|
@ -1258,6 +1373,7 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
|
||||||
?assertError(_, receive_publish(Socket)),
|
?assertError(_, receive_publish(Socket)),
|
||||||
|
|
||||||
send_regack_msg(Socket, TopicIdNew, MsgId3),
|
send_regack_msg(Socket, TopicIdNew, MsgId3),
|
||||||
|
send_regack_msg(Socket, TopicIdNew, MsgId3, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
|
||||||
UdpData2 = receive_response(Socket),
|
UdpData2 = receive_response(Socket),
|
||||||
MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2),
|
MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2),
|
||||||
|
@ -1587,6 +1703,33 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
||||||
|
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_asleep_unexpected(_) ->
|
||||||
|
SleepDuration = 3,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
ClientId = ?CLIENTID,
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
% goto asleep state
|
||||||
|
send_disconnect_msg(Socket, SleepDuration),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
|
send_pingreq_msg(Socket, undefined),
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_puback_msg(Socket, 5, 5, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
send_pubrec_msg(Socket, 5),
|
||||||
|
|
||||||
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
timer:sleep(100),
|
||||||
|
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_awake_test01_to_connected(_) ->
|
t_awake_test01_to_connected(_) ->
|
||||||
QoS = 1,
|
QoS = 1,
|
||||||
Keepalive_Duration = 3,
|
Keepalive_Duration = 3,
|
||||||
|
@ -1960,6 +2103,7 @@ t_register_enqueue_delivering_messages(_) ->
|
||||||
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
|
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
|
||||||
|
|
||||||
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
|
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_ACCEPTED),
|
||||||
|
send_regack_msg(NSocket, TopicIdA, RegMsgIdA, ?SN_RC_INVALID_TOPIC_ID),
|
||||||
|
|
||||||
%% receive the queued messages
|
%% receive the queued messages
|
||||||
|
|
||||||
|
@ -1983,6 +2127,88 @@ t_register_enqueue_delivering_messages(_) ->
|
||||||
gen_udp:close(NSocket1),
|
gen_udp:close(NSocket1),
|
||||||
restart_emqx_sn(#{subs_resume => false}).
|
restart_emqx_sn(#{subs_resume => false}).
|
||||||
|
|
||||||
|
t_code_change(_) ->
|
||||||
|
Old = [state, gwid, socket, socketpid, socketstate, socketname, peername,
|
||||||
|
channel, clientid, username, password, will_msg, keepalive_interval,
|
||||||
|
connpkt, asleep_timer, enable_stats, stats_timer, enable_qos3,
|
||||||
|
has_pending_pingresp, pending_topic_ids],
|
||||||
|
New = Old ++ [false, [], undefined],
|
||||||
|
|
||||||
|
OldTulpe = erlang:list_to_tuple(Old),
|
||||||
|
NewTulpe = erlang:list_to_tuple(New),
|
||||||
|
|
||||||
|
?assertEqual({ok, name, NewTulpe},
|
||||||
|
emqx_sn_gateway:code_change(1, name, OldTulpe, ["4.3.2"])),
|
||||||
|
|
||||||
|
?assertEqual({ok, name, NewTulpe},
|
||||||
|
emqx_sn_gateway:code_change(1, name, NewTulpe, ["4.3.6"])),
|
||||||
|
|
||||||
|
?assertEqual({ok, name, NewTulpe},
|
||||||
|
emqx_sn_gateway:code_change({down, 1}, name, NewTulpe, ["4.3.6"])),
|
||||||
|
|
||||||
|
?assertEqual({ok, name, OldTulpe},
|
||||||
|
emqx_sn_gateway:code_change({down, 1}, name, NewTulpe, ["4.3.2"])).
|
||||||
|
|
||||||
|
t_topic_id_to_large(_) ->
|
||||||
|
Dup = 0,
|
||||||
|
QoS = 0,
|
||||||
|
Retain = 0,
|
||||||
|
Will = 0,
|
||||||
|
CleanSession = 0,
|
||||||
|
MsgId = 1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
ClientId = ?CLIENTID,
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
mnesia:dirty_write(emqx_sn_registry, {emqx_sn_registry, {ClientId, next_topic_id}, 16#FFFF}),
|
||||||
|
|
||||||
|
TopicName1 = <<"abcD">>,
|
||||||
|
send_register_msg(Socket, TopicName1, MsgId),
|
||||||
|
?assertEqual(<<7, ?SN_REGACK, 0:16, MsgId:16, 3:8>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
|
||||||
|
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
|
||||||
|
CleanSession:1, ?SN_NORMAL_TOPIC:2, 0:16,
|
||||||
|
MsgId:16, 2>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_idle_timeout(_) ->
|
||||||
|
Backup = recoverable_restart_emqx_sn(fun() ->
|
||||||
|
application:set_env(emqx_sn, idle_timeout, 500),
|
||||||
|
application:set_env(emqx_sn, enable_qos3, false)
|
||||||
|
end),
|
||||||
|
timer:sleep(200),
|
||||||
|
QoS = ?QOS_NEG1,
|
||||||
|
MsgId = 1,
|
||||||
|
TopicId1 = ?PREDEF_TOPIC_ID1,
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
Payload1 = <<20, 21, 22, 23>>,
|
||||||
|
send_publish_msg_predefined_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
||||||
|
timer:sleep(1500),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket),
|
||||||
|
_ = recoverable_restart_emqx_sn(Backup),
|
||||||
|
timer:sleep(200),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_invalid_packet(_) ->
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, <<"client_id_test1">>),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
send_packet(Socket, emqx_sn_frame_SUITE:generate_random_binary()),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(udp_receive_timeout, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper funcs
|
%% Helper funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -2267,6 +2493,9 @@ send_disconnect_msg(Socket, Duration) ->
|
||||||
?LOG("send_disconnect_msg Duration=~p", [Duration]),
|
?LOG("send_disconnect_msg Duration=~p", [Duration]),
|
||||||
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket).
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, DisConnectPacket).
|
||||||
|
|
||||||
|
send_packet(Socket, Packet) ->
|
||||||
|
ok = gen_udp:send(Socket, ?HOST, ?PORT, Packet).
|
||||||
|
|
||||||
mid(Id) -> Id.
|
mid(Id) -> Id.
|
||||||
tid(Id) -> Id.
|
tid(Id) -> Id.
|
||||||
|
|
||||||
|
|
|
@ -108,6 +108,11 @@ t_deny_wildcard_topic(_Config) ->
|
||||||
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)),
|
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/TopicA/#">>)),
|
||||||
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)).
|
?assertEqual({error, wildcard_topic}, ?REGISTRY:register_topic(<<"ClientId">>, <<"/+/TopicB">>)).
|
||||||
|
|
||||||
|
t_gen_server(_) ->
|
||||||
|
?assertEqual(ignored, gen_server:call(emqx_sn_registry, ignored)),
|
||||||
|
?assertEqual(ok, gen_server:cast(emqx_sn_registry, ignored)),
|
||||||
|
?assertEqual(ignored, erlang:send(emqx_sn_registry, ignored)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper funcs
|
%% Helper funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue