Add some test suites and fix bugs
This commit is contained in:
parent
cd7f79ec04
commit
f4330b8af3
|
@ -126,7 +126,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped
|
|||
|
||||
%% @doc Enqueue a message.
|
||||
-spec(in(message(), mqueue()) -> mqueue()).
|
||||
in(#message{flags = #{qos := ?QOS_0}}, MQ = #mqueue{qos0 = false}) ->
|
||||
in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
|
||||
MQ;
|
||||
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
|
||||
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
-module(emqx_keepalive_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
all() -> [{group, keepalive}].
|
||||
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
|
||||
-module(emqx_message_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx.hrl").
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() ->
|
||||
[
|
||||
message_make,
|
||||
message_flag,
|
||||
message_header,
|
||||
message_format
|
||||
].
|
||||
|
||||
message_make(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
?assertEqual(0, Msg#message.qos),
|
||||
Msg1 = emqx_message:make(<<"clientid">>, ?QOS2, <<"topic">>, <<"payload">>),
|
||||
?assert(is_binary(Msg1#message.id)),
|
||||
?assertEqual(2, Msg1#message.qos).
|
||||
|
||||
message_flag(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
||||
Msg3 = emqx_message:set_flag(dup, Msg2),
|
||||
?assert(emqx_message:get_flag(dup, Msg3)),
|
||||
?assertNot(emqx_message:get_flag(retain, Msg3)),
|
||||
Msg4 = emqx_message:unset_flag(dup, Msg3),
|
||||
Msg5 = emqx_message:unset_flag(retain, Msg4),
|
||||
?assertEqual(undefined, emqx_message:get_flag(dup, Msg5, undefined)),
|
||||
?assertEqual(undefined, emqx_message:get_flag(retain, Msg5, undefined)),
|
||||
Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5),
|
||||
?assert(emqx_message:get_flag(dup, Msg6)),
|
||||
?assert(emqx_message:get_flag(retain, Msg6)).
|
||||
|
||||
message_header(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg),
|
||||
Msg2 = emqx_message:set_header(c, 3, Msg1),
|
||||
?assertEqual(1, emqx_message:get_header(a, Msg2)),
|
||||
?assertEqual(4, emqx_message:get_header(d, Msg2, 4)).
|
||||
|
||||
message_format(_) ->
|
||||
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
|
||||
|
||||
|
|
@ -37,7 +37,11 @@ t_get_set_caps(_) ->
|
|||
mqtt_shared_subscription => true,
|
||||
mqtt_wildcard_subscription => true
|
||||
},
|
||||
Caps = emqx_mqtt_caps:get_caps(zone),
|
||||
Caps2 = Caps#{max_packet_size => 1048576},
|
||||
case emqx_mqtt_caps:get_caps(zone) of
|
||||
Caps -> ok;
|
||||
Caps2 -> ok
|
||||
end,
|
||||
PubCaps = #{
|
||||
max_qos_allowed => ?QOS_2,
|
||||
mqtt_retain_available => true
|
||||
|
|
|
@ -24,14 +24,12 @@
|
|||
|
||||
-define(Q, emqx_mqueue).
|
||||
|
||||
all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_priority_mqueue,
|
||||
t_priority_mqueue2, t_infinity_priority_mqueue,
|
||||
t_infinity_simple_mqueue].
|
||||
all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue,
|
||||
t_priority_mqueue, t_infinity_priority_mqueue].
|
||||
|
||||
t_in(_) ->
|
||||
Opts = [{max_length, 5},
|
||||
{store_qos0, true}],
|
||||
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
|
||||
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
||||
Q = ?Q:new(<<"testQ">>, Opts),
|
||||
?assert(?Q:is_empty(Q)),
|
||||
Q1 = ?Q:in(#message{}, Q),
|
||||
?assertEqual(1, ?Q:len(Q1)),
|
||||
|
@ -43,18 +41,16 @@ t_in(_) ->
|
|||
?assertEqual(5, ?Q:len(Q5)).
|
||||
|
||||
t_in_qos0(_) ->
|
||||
Opts = [{max_length, 5},
|
||||
{store_qos0, false}],
|
||||
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
|
||||
Q1 = ?Q:in(#message{}, Q),
|
||||
Opts = #{type => simple, max_len => 5, store_qos0 => false},
|
||||
Q = ?Q:new(<<"testQ">>, Opts),
|
||||
Q1 = ?Q:in(#message{qos = 0}, Q),
|
||||
?assert(?Q:is_empty(Q1)),
|
||||
Q2 = ?Q:in(#message{qos = 0}, Q1),
|
||||
?assert(?Q:is_empty(Q2)).
|
||||
|
||||
t_out(_) ->
|
||||
Opts = [{max_length, 5},
|
||||
{store_qos0, true}],
|
||||
Q = ?Q:new(<<"testQ">>, Opts, alarm_fun()),
|
||||
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
||||
Q = ?Q:new(<<"testQ">>, Opts),
|
||||
{empty, Q} = ?Q:out(Q),
|
||||
Q1 = ?Q:in(#message{}, Q),
|
||||
{Value, Q2} = ?Q:out(Q1),
|
||||
|
@ -62,12 +58,8 @@ t_out(_) ->
|
|||
?assertEqual({value, #message{}}, Value).
|
||||
|
||||
t_simple_mqueue(_) ->
|
||||
Opts = [{type, simple},
|
||||
{max_length, 3},
|
||||
{low_watermark, 0.2},
|
||||
{high_watermark, 0.6},
|
||||
{store_qos0, false}],
|
||||
Q = ?Q:new("simple_queue", Opts, alarm_fun()),
|
||||
Opts = #{type => simple, max_len => 3, store_qos0 => false},
|
||||
Q = ?Q:new("simple_queue", Opts),
|
||||
?assertEqual(simple, ?Q:type(Q)),
|
||||
?assertEqual(3, ?Q:max_len(Q)),
|
||||
?assertEqual(<<"simple_queue">>, ?Q:name(Q)),
|
||||
|
@ -82,12 +74,8 @@ t_simple_mqueue(_) ->
|
|||
?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
|
||||
|
||||
t_infinity_simple_mqueue(_) ->
|
||||
Opts = [{type, simple},
|
||||
{max_length, 0},
|
||||
{low_watermark, 0.2},
|
||||
{high_watermark, 0.6},
|
||||
{store_qos0, false}],
|
||||
Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()),
|
||||
Opts = #{type => simple, max_len => 0, store_qos0 => false},
|
||||
Q = ?Q:new("infinity_simple_queue", Opts),
|
||||
?assert(?Q:is_empty(Q)),
|
||||
?assertEqual(0, ?Q:max_len(Q)),
|
||||
Qx = lists:foldl(fun(I, AccQ) ->
|
||||
|
@ -99,37 +87,29 @@ t_infinity_simple_mqueue(_) ->
|
|||
?assertEqual(<<1>>, V#message.payload).
|
||||
|
||||
t_priority_mqueue(_) ->
|
||||
Opts = [{type, priority},
|
||||
{priority, [{<<"t">>, 10}]},
|
||||
{max_length, 3},
|
||||
{low_watermark, 0.2},
|
||||
{high_watermark, 0.6},
|
||||
{store_qos0, false}],
|
||||
Q = ?Q:new("priority_queue", Opts, alarm_fun()),
|
||||
Opts = #{type => priority, max_len => 3, store_qos0 => false},
|
||||
Q = ?Q:new("priority_queue", Opts),
|
||||
?assertEqual(priority, ?Q:type(Q)),
|
||||
?assertEqual(3, ?Q:max_len(Q)),
|
||||
?assertEqual(<<"priority_queue">>, ?Q:name(Q)),
|
||||
|
||||
?assert(?Q:is_empty(Q)),
|
||||
Q1 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q),
|
||||
Q2 = ?Q:in(#message{qos = 1, topic = <<"t">>}, Q1),
|
||||
Q3 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q2),
|
||||
Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
|
||||
Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
|
||||
Q3 = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2),
|
||||
?assertEqual(3, ?Q:len(Q3)),
|
||||
Q4 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q3),
|
||||
Q4 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3),
|
||||
?assertEqual(4, ?Q:len(Q4)),
|
||||
Q5 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q4),
|
||||
Q5 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4),
|
||||
?assertEqual(5, ?Q:len(Q5)),
|
||||
Q6 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q5),
|
||||
Q6 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
|
||||
?assertEqual(5, ?Q:len(Q6)),
|
||||
{{value, Msg}, _Q7} = ?Q:out(Q6),
|
||||
?assertEqual(<<"t">>, Msg#message.topic).
|
||||
{{value, Msg}, Q7} = ?Q:out(Q6),
|
||||
?assertEqual(4, ?Q:len(Q7)),
|
||||
?assertEqual(<<"t1">>, Msg#message.topic).
|
||||
|
||||
t_infinity_priority_mqueue(_) ->
|
||||
Opts = [{type, priority},
|
||||
{priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]},
|
||||
{max_length, 0},
|
||||
{store_qos0, false}],
|
||||
Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()),
|
||||
Opts = #{type => priority, max_len => 0, store_qos0 => false},
|
||||
Q = ?Q:new("infinity_priority_queue", Opts),
|
||||
?assertEqual(0, ?Q:max_len(Q)),
|
||||
Qx = lists:foldl(fun(I, AccQ) ->
|
||||
AccQ1 =
|
||||
|
@ -137,23 +117,4 @@ t_infinity_priority_mqueue(_) ->
|
|||
?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
|
||||
end, Q, lists:seq(1, 255)),
|
||||
?assertEqual(510, ?Q:len(Qx)),
|
||||
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
|
||||
|
||||
t_priority_mqueue2(_) ->
|
||||
Opts = [{type, priority},
|
||||
{max_length, 2},
|
||||
{low_watermark, 0.2},
|
||||
{high_watermark, 0.6},
|
||||
{store_qos0, false}],
|
||||
Q = ?Q:new("priority_queue2_test", Opts, alarm_fun()),
|
||||
?assertEqual(2, ?Q:max_len(Q)),
|
||||
Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
|
||||
Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
|
||||
Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
|
||||
Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
|
||||
?assertEqual(4, ?Q:len(Q4)),
|
||||
{{value, _Val}, Q5} = ?Q:out(Q4),
|
||||
?assertEqual(3, ?Q:len(Q5)).
|
||||
|
||||
alarm_fun() -> fun(_, _) -> alarm_fun() end.
|
||||
|
||||
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
|
|
@ -0,0 +1,93 @@
|
|||
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
|
||||
|
||||
-module(emqx_packet_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include("emqx.hrl").
|
||||
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() ->
|
||||
[
|
||||
packet_proto_name,
|
||||
packet_type_name,
|
||||
packet_validate,
|
||||
packet_message,
|
||||
packet_format,
|
||||
packet_will_msg
|
||||
].
|
||||
|
||||
packet_proto_name(_) ->
|
||||
?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)),
|
||||
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)),
|
||||
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(5)).
|
||||
|
||||
packet_type_name(_) ->
|
||||
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)),
|
||||
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
|
||||
|
||||
packet_validate(_) ->
|
||||
?assertEqual(true, emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))),
|
||||
?assertEqual(true, emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))),
|
||||
?assertEqual(true, emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))).
|
||||
|
||||
packet_message(_) ->
|
||||
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||
qos = ?QOS0,
|
||||
retain = false,
|
||||
dup = false},
|
||||
variable = #mqtt_packet_publish{topic_name = <<"topic">>,
|
||||
packet_id = 10,
|
||||
properties = #{}},
|
||||
payload = <<"payload">>},
|
||||
Msg = emqx_message:make(<<"clientid">>, ?QOS0, <<"topic">>, <<"payload">>),
|
||||
Msg2 = emqx_message:set_flag(retain, false, Msg),
|
||||
Pkt = emqx_packet:from_message(10, Msg2),
|
||||
Msg3 = emqx_message:set_header(username, "test", Msg2),
|
||||
Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, username => "test"}, Pkt),
|
||||
Msg5 = Msg4#message{timestamp = Msg3#message.timestamp},
|
||||
Msg5 = Msg3.
|
||||
|
||||
packet_format(_) ->
|
||||
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
|
||||
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
|
||||
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
|
||||
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
|
||||
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
|
||||
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
|
||||
|
||||
packet_will_msg(_) ->
|
||||
Pkt = #mqtt_packet_connect{ will_flag = true,
|
||||
client_id = <<"clientid">>,
|
||||
username = "test",
|
||||
will_retain = true,
|
||||
will_qos = ?QOS2,
|
||||
will_topic = <<"topic">>,
|
||||
will_props = #{},
|
||||
will_payload = <<"payload">>},
|
||||
Msg = emqx_packet:will_msg(Pkt),
|
||||
?assertEqual(<<"clientid">>, Msg#message.from),
|
||||
?assertEqual(<<"topic">>, Msg#message.topic).
|
||||
|
||||
|
|
@ -22,7 +22,7 @@
|
|||
all() -> [t_set_get_state, t_update_interval].
|
||||
|
||||
t_set_get_state(_) ->
|
||||
{ok, _} = emqx_stats:start_link(),
|
||||
emqx_stats:start_link(),
|
||||
SetConnsCount = emqx_stats:statsfun('connections/count'),
|
||||
SetConnsCount(1),
|
||||
1 = emqx_stats:getstat('connections/count'),
|
||||
|
@ -46,8 +46,9 @@ t_set_get_state(_) ->
|
|||
4 = proplists:get_value('connections/max', Conns).
|
||||
|
||||
t_update_interval(_) ->
|
||||
{ok, _} = emqx_stats:start_link(),
|
||||
ok = emqx_stats:update_interval(cm_stats, fun update_stats/0),
|
||||
emqx_stats:start_link(),
|
||||
emqx_stats:cancel_update(cm_stats),
|
||||
ok = emqx_stats:update_interval(stats_test, fun update_stats/0),
|
||||
timer:sleep(2500),
|
||||
1 = emqx_stats:getstat('connections/count').
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
all() -> [t_set_get_env].
|
||||
|
||||
t_set_get_env(_) ->
|
||||
{ok, _} = emqx_zone:start_link(),
|
||||
emqx_zone:start_link(),
|
||||
ok = emqx_zone:set_env(china, language, chinese),
|
||||
timer:sleep(100), % make sure set_env/3 is okay
|
||||
chinese = emqx_zone:get_env(china, language),
|
||||
|
|
Loading…
Reference in New Issue