diff --git a/test/emqttd_mqueue_tests.erl b/test/emqttd_mqueue_tests.erl index bbb9e7daa..6d20b3cbf 100644 --- a/test/emqttd_mqueue_tests.erl +++ b/test/emqttd_mqueue_tests.erl @@ -70,7 +70,9 @@ simple_mqueue_test() -> {high_watermark, 0.6}, {queue_qos0, false}], Q = ?Q:new("simple_queue", Opts, alarm_fun()), - + ?assertEqual(simple, ?Q:type(Q)), + ?assertEqual(3, ?Q:max_len(Q)), + ?assertEqual(<<"simple_queue">>, ?Q:name(Q)), ?assert(?Q:is_empty(Q)), Q1 = ?Q:in(#mqtt_message{qos = 1, payload = <<"1">>}, Q), Q2 = ?Q:in(#mqtt_message{qos = 1, payload = <<"2">>}, Q1), @@ -78,7 +80,25 @@ simple_mqueue_test() -> Q4 = ?Q:in(#mqtt_message{qos = 1, payload = <<"4">>}, Q3), ?assertEqual(3, ?Q:len(Q4)), {{value, Msg}, Q5} = ?Q:out(Q4), - ?assertMatch(<<"2">>, Msg#mqtt_message.payload). + ?assertMatch(<<"2">>, Msg#mqtt_message.payload), + ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). + +infinity_simple_mqueue_test() -> + Opts = [{type, simple}, + {max_length, infinity}, + {low_watermark, 0.2}, + {high_watermark, 0.6}, + {queue_qos0, false}], + Q = ?Q:new("infinity_simple_queue", Opts, alarm_fun()), + ?assert(?Q:is_empty(Q)), + ?assertEqual(infinity, ?Q:max_len(Q)), + Qx = lists:foldl(fun(I, AccQ) -> + ?Q:in(#mqtt_message{qos = 1, payload = iolist_to_binary([I])}, AccQ) + end, Q, lists:seq(1, 255)), + ?assertEqual(255, ?Q:len(Qx)), + ?assertEqual([{len, 255}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)), + {{value, V}, Qy} = ?Q:out(Qx), + ?assertEqual(<<1>>, V#mqtt_message.payload). priority_mqueue_test() -> Opts = [{type, priority}, @@ -88,6 +108,9 @@ priority_mqueue_test() -> {high_watermark, 0.6}, {queue_qos0, false}], Q = ?Q:new("priority_queue", Opts, alarm_fun()), + ?assertEqual(priority, ?Q:type(Q)), + ?assertEqual(3, ?Q:max_len(Q)), + ?assertEqual(<<"priority_queue">>, ?Q:name(Q)), ?assert(?Q:is_empty(Q)), Q1 = ?Q:in(#mqtt_message{qos = 1, topic = <<"t1">>}, Q), @@ -102,6 +125,21 @@ priority_mqueue_test() -> ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, Q7} = ?Q:out(Q6), ?assertMatch(<<"t">>, Msg#mqtt_message.topic). + +infinity_priority_mqueue_test() -> + Opts = [{type, priority}, + {priority, [{<<"t1">>, 10}, {<<"t2">>, 8}]}, + {max_length, infinity}, + {queue_qos0, false}], + Q = ?Q:new("infinity_priority_queue", Opts, alarm_fun()), + ?assertEqual(infinity, ?Q:max_len(Q)), + Qx = lists:foldl(fun(I, AccQ) -> + AccQ1 = + ?Q:in(#mqtt_message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), + ?Q:in(#mqtt_message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) + end, Q, lists:seq(1, 255)), + ?assertEqual(510, ?Q:len(Qx)), + ?assertEqual([{len, 510}, {max_len, infinity}, {dropped, 0}], ?Q:stats(Qx)). alarm_fun() -> fun(_, _) -> alarm_fun() end.