diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index badc975eb..8e9366503 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -20,6 +20,8 @@ -include("emqttd.hrl"). +-include_lib("eunit/include/eunit.hrl"). + all() -> [{group, pubsub}, {group, router}, @@ -38,7 +40,8 @@ groups() -> create_subscription, subscribe_unsubscribe, publish, pubsub, - 'pubsub#', 'pubsub+']}, + 'pubsub#', 'pubsub+', + pubsub_queue]}, {router, [sequence], [router_add_del, router_print, @@ -99,7 +102,7 @@ create_subscription(_) -> [#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}] = emqttd_backend:lookup_subscriptions(<<"clientId">>), ok = emqttd_backend:del_subscriptions(<<"clientId">>), - [] = emqttd_backend:lookup_subscriptions(<<"clientId">>). + ?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)). subscribe_unsubscribe(_) -> ok = emqttd:subscribe(<<"topic/subunsub">>), @@ -114,7 +117,7 @@ publish(_) -> ok = emqttd:subscribe(<<"test/+">>), timer:sleep(10), emqttd:publish(Msg), - true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end. + ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). pubsub(_) -> Self = self(), @@ -124,7 +127,7 @@ pubsub(_) -> [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self), [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), - true = receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end, + ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), spawn(fun() -> emqttd:subscribe(<<"a/b/c">>), emqttd:subscribe(<<"c/d/e">>), @@ -138,16 +141,42 @@ pubsub(_) -> emqttd:subscribe(<<"a/#">>), timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), - true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end, + ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end), emqttd:unsubscribe(<<"a/#">>). 'pubsub+'(_) -> emqttd:subscribe(<<"a/+/+">>), timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), - true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end, + ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end), emqttd:unsubscribe(<<"a/+/+">>). +pubsub_queue(_) -> + Self = self(), Q = <<"$queue/abc">>, + SubFun = fun() -> + emqttd:subscribe(Q), + {ok, Msgs} = loop_recv(Q, 10), + Self ! {recv, self(), Msgs} + end, + Sub1 = spawn(SubFun), Sub2 = spawn(SubFun), + timer:sleep(5), + emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)), + emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)), + emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)), + ?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end), + ?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end). + +loop_recv(Topic, Timeout) -> + loop_recv(Topic, Timeout, []). + +loop_recv(Topic, Timeout, Acc) -> + receive + {dispatch, Topic, Msg} -> + loop_recv(Topic, Timeout, [Msg|Acc]) + after + Timeout -> {ok, Acc} + end. + %%-------------------------------------------------------------------- %% Router Test %%-------------------------------------------------------------------- @@ -293,7 +322,7 @@ dispatch_retained_messages(_) -> payload = <<"payload">>}, emqttd_retainer:retain(Msg), emqttd_retainer:dispatch(<<"a/b/+">>, self()), - true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end, + ?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end), emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}), [] = emqttd_backend:read_messages(<<"a/b/c">>).