diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 458c301fc..73fd6d613 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -126,7 +126,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> mqueue()). -in(#message{flags = #{qos := ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> +in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; diff --git a/test/emqx_keepalive_SUITE.erl b/test/emqx_keepalive_SUITE.erl index e07c96ffe..c4dbd80f2 100644 --- a/test/emqx_keepalive_SUITE.erl +++ b/test/emqx_keepalive_SUITE.erl @@ -15,6 +15,7 @@ -module(emqx_keepalive_SUITE). -compile(export_all). +-compile(nowarn_export_all). all() -> [{group, keepalive}]. diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl new file mode 100644 index 000000000..da39ef882 --- /dev/null +++ b/test/emqx_message_SUITE.erl @@ -0,0 +1,65 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_message_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + message_make, + message_flag, + message_header, + message_format + ]. + +message_make(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + ?assertEqual(0, Msg#message.qos), + Msg1 = emqx_message:make(<<"clientid">>, ?QOS2, <<"topic">>, <<"payload">>), + ?assert(is_binary(Msg1#message.id)), + ?assertEqual(2, Msg1#message.qos). + +message_flag(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + Msg2 = emqx_message:set_flag(retain, false, Msg), + Msg3 = emqx_message:set_flag(dup, Msg2), + ?assert(emqx_message:get_flag(dup, Msg3)), + ?assertNot(emqx_message:get_flag(retain, Msg3)), + Msg4 = emqx_message:unset_flag(dup, Msg3), + Msg5 = emqx_message:unset_flag(retain, Msg4), + ?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)), + ?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)), + Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5), + ?assert(emqx_message:get_flag(dup, Msg6)), + ?assert(emqx_message:get_flag(retain, Msg6)). + +message_header(_) -> + Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), + Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), + Msg2 = emqx_message:set_header(c, 3, Msg1), + ?assertEqual(1, emqx_message:get_header(a, Msg2)), + ?assertEqual(4, emqx_message:get_header(d, Msg2, 4)). + +message_format(_) -> + io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]). + + diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index f2f50a296..8b840b91c 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -37,7 +37,11 @@ t_get_set_caps(_) -> mqtt_shared_subscription => true, mqtt_wildcard_subscription => true }, - Caps = emqx_mqtt_caps:get_caps(zone), + Caps2 = Caps#{max_packet_size => 1048576}, + case emqx_mqtt_caps:get_caps(zone) of + Caps -> ok; + Caps2 -> ok + end, PubCaps = #{ max_qos_allowed => ?QOS_2, mqtt_retain_available => true diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 0cff6a627..575478f90 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -24,14 +24,12 @@ -define(Q, emqx_mqueue). -all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_priority_mqueue, - t_priority_mqueue2, t_infinity_priority_mqueue, - t_infinity_simple_mqueue]. +all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue, + t_priority_mqueue, t_infinity_priority_mqueue]. t_in(_) -> - Opts = [{max_length, 5}, - {store_qos0, true}], - Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), + Opts = #{type => simple, max_len => 5, store_qos0 => true}, + Q = ?Q:new(<<"testQ">>, Opts), ?assert(?Q:is_empty(Q)), Q1 = ?Q:in(#message{}, Q), ?assertEqual(1, ?Q:len(Q1)), @@ -43,18 +41,16 @@ t_in(_) -> ?assertEqual(5, ?Q:len(Q5)). t_in_qos0(_) -> - Opts = [{max_length, 5}, - {store_qos0, false}], - Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), - Q1 = ?Q:in(#message{}, Q), + Opts = #{type => simple, max_len => 5, store_qos0 => false}, + Q = ?Q:new(<<"testQ">>, Opts), + Q1 = ?Q:in(#message{qos = 0}, Q), ?assert(?Q:is_empty(Q1)), Q2 = ?Q:in(#message{qos = 0}, Q1), ?assert(?Q:is_empty(Q2)). t_out(_) -> - Opts = [{max_length, 5}, - {store_qos0, true}], - Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()), + Opts = #{type => simple, max_len => 5, store_qos0 => true}, + Q = ?Q:new(<<"testQ">>, Opts), {empty, Q} = ?Q:out(Q), Q1 = ?Q:in(#message{}, Q), {Value, Q2} = ?Q:out(Q1), @@ -62,12 +58,8 @@ t_out(_) -> ?assertEqual({value, #message{}}, Value). t_simple_mqueue(_) -> - Opts = [{type, simple}, - {max_length, 3}, - {low_watermark, 0.2}, - {high_watermark, 0.6}, - {store_qos0, false}], - Q = ?Q:new("simple_queue", Opts, alarm_fun()), + Opts = #{type => simple, max_len => 3, store_qos0 => false}, + Q = ?Q:new("simple_queue", Opts), ?assertEqual(simple, ?Q:type(Q)), ?assertEqual(3, ?Q:max_len(Q)), ?assertEqual(<<"simple_queue">>, ?Q:name(Q)), @@ -82,12 +74,8 @@ t_simple_mqueue(_) -> ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). t_infinity_simple_mqueue(_) -> - Opts = [{type, simple}, - {max_length, 0}, - {low_watermark, 0.2}, - {high_watermark, 0.6}, - {store_qos0, false}], - Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()), + Opts = #{type => simple, max_len => 0, store_qos0 => false}, + Q = ?Q:new("infinity_simple_queue", Opts), ?assert(?Q:is_empty(Q)), ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> @@ -99,37 +87,29 @@ t_infinity_simple_mqueue(_) -> ?assertEqual(<<1>>, V#message.payload). t_priority_mqueue(_) -> - Opts = [{type, priority}, - {priority, [{<<"t">>, 10}]}, - {max_length, 3}, - {low_watermark, 0.2}, - {high_watermark, 0.6}, - {store_qos0, false}], - Q = ?Q:new("priority_queue", Opts, alarm_fun()), + Opts = #{type => priority, max_len => 3, store_qos0 => false}, + Q = ?Q:new("priority_queue", Opts), ?assertEqual(priority, ?Q:type(Q)), ?assertEqual(3, ?Q:max_len(Q)), ?assertEqual(<<"priority_queue">>, ?Q:name(Q)), - ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q), - Q2 = ?Q:in(#message{qos = 1, topic = <<"t">>}, Q1), - Q3 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q2), + Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q), + Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1), + Q3 = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2), ?assertEqual(3, ?Q:len(Q3)), - Q4 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q3), + Q4 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3), ?assertEqual(4, ?Q:len(Q4)), - Q5 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q4), + Q5 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4), ?assertEqual(5, ?Q:len(Q5)), - Q6 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q5), + Q6 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), - {{value, Msg}, _Q7} = ?Q:out(Q6), - ?assertEqual(<<"t">>, Msg#message.topic). + {{value, Msg}, Q7} = ?Q:out(Q6), + ?assertEqual(4, ?Q:len(Q7)), + ?assertEqual(<<"t1">>, Msg#message.topic). t_infinity_priority_mqueue(_) -> - Opts = [{type, priority}, - {priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]}, - {max_length, 0}, - {store_qos0, false}], - Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()), + Opts = #{type => priority, max_len => 0, store_qos0 => false}, + Q = ?Q:new("infinity_priority_queue", Opts), ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> AccQ1 = @@ -137,23 +117,4 @@ t_infinity_priority_mqueue(_) -> ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) end, Q, lists:seq(1, 255)), ?assertEqual(510, ?Q:len(Qx)), - ?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)). - -t_priority_mqueue2(_) -> - Opts = [{type, priority}, - {max_length, 2}, - {low_watermark, 0.2}, - {high_watermark, 0.6}, - {store_qos0, false}], - Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()), - ?assertEqual(2, ?Q:max_len(Q)), - Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), - Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), - Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), - Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), - ?assertEqual(4, ?Q:len(Q4)), - {{value, _Val}, Q5} = ?Q:out(Q4), - ?assertEqual(3, ?Q:len(Q5)). - -alarm_fun() -> fun(_, _) -> alarm_fun() end. - + ?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)). \ No newline at end of file diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl new file mode 100644 index 000000000..bd1b0cb4a --- /dev/null +++ b/test/emqx_packet_SUITE.erl @@ -0,0 +1,93 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + + +-module(emqx_packet_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx.hrl"). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + packet_proto_name, + packet_type_name, + packet_validate, + packet_message, + packet_format, + packet_will_msg + ]. + +packet_proto_name(_) -> + ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), + ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)), + ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(5)). + +packet_type_name(_) -> + ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), + ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). + +packet_validate(_) -> + ?assertEqual(true, emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))), + ?assertEqual(true, emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), + ?assertEqual(true, emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))). + +packet_message(_) -> + Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = ?QOS0, + retain = false, + dup = false}, + variable = #mqtt_packet_publish{topic_name = <<"topic">>, + packet_id = 10, + properties = #{}}, + payload = <<"payload">>}, + Msg = emqx_message:make(<<"clientid">>, ?QOS0, <<"topic">>, <<"payload">>), + Msg2 = emqx_message:set_flag(retain, false, Msg), + Pkt = emqx_packet:from_message(10, Msg2), + Msg3 = emqx_message:set_header(username, "test", Msg2), + Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, username => "test"}, Pkt), + Msg5 = Msg4#message{timestamp = Msg3#message.timestamp}, + Msg5 = Msg3. + +packet_format(_) -> + io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), + io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), + io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), + io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), + io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), + io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). + +packet_will_msg(_) -> + Pkt = #mqtt_packet_connect{ will_flag = true, + client_id = <<"clientid">>, + username = "test", + will_retain = true, + will_qos = ?QOS2, + will_topic = <<"topic">>, + will_props = #{}, + will_payload = <<"payload">>}, + Msg = emqx_packet:will_msg(Pkt), + ?assertEqual(<<"clientid">>, Msg#message.from), + ?assertEqual(<<"topic">>, Msg#message.topic). + + diff --git a/test/emqx_stats_SUITE.erl b/test/emqx_stats_SUITE.erl index d7fc294b1..5c7254468 100644 --- a/test/emqx_stats_SUITE.erl +++ b/test/emqx_stats_SUITE.erl @@ -22,7 +22,7 @@ all() -> [t_set_get_state, t_update_interval]. t_set_get_state(_) -> - {ok, _} = emqx_stats:start_link(), + emqx_stats:start_link(), SetConnsCount = emqx_stats:statsfun('connections/count'), SetConnsCount(1), 1 = emqx_stats:getstat('connections/count'), @@ -46,8 +46,9 @@ t_set_get_state(_) -> 4 = proplists:get_value('connections/max', Conns). t_update_interval(_) -> - {ok, _} = emqx_stats:start_link(), - ok = emqx_stats:update_interval(cm_stats, fun update_stats/0), + emqx_stats:start_link(), + emqx_stats:cancel_update(cm_stats), + ok = emqx_stats:update_interval(stats_test, fun update_stats/0), timer:sleep(2500), 1 = emqx_stats:getstat('connections/count'). diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 15c449ae9..282acc3e5 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -22,7 +22,7 @@ all() -> [t_set_get_env]. t_set_get_env(_) -> - {ok, _} = emqx_zone:start_link(), + emqx_zone:start_link(), ok = emqx_zone:set_env(china, language, chinese), timer:sleep(100), % make sure set_env/3 is okay chinese = emqx_zone:get_env(china, language),