pubsub_queue, assert

This commit is contained in:
Feng 2016-04-16 14:28:21 +08:00
parent a0319d9d26
commit e1ae9a7068
1 changed files with 36 additions and 7 deletions

View File

@ -20,6 +20,8 @@
-include("emqttd.hrl").
-include_lib("eunit/include/eunit.hrl").
all() ->
[{group, pubsub},
{group, router},
@ -38,7 +40,8 @@ groups() ->
create_subscription,
subscribe_unsubscribe,
publish, pubsub,
'pubsub#', 'pubsub+']},
'pubsub#', 'pubsub+',
pubsub_queue]},
{router, [sequence],
[router_add_del,
router_print,
@ -99,7 +102,7 @@ create_subscription(_) ->
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
= emqttd_backend:lookup_subscriptions(<<"clientId">>),
ok = emqttd_backend:del_subscriptions(<<"clientId">>),
[] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)).
subscribe_unsubscribe(_) ->
ok = emqttd:subscribe(<<"topic/subunsub">>),
@ -114,7 +117,7 @@ publish(_) ->
ok = emqttd:subscribe(<<"test/+">>),
timer:sleep(10),
emqttd:publish(Msg),
true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end.
?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
pubsub(_) ->
Self = self(),
@ -124,7 +127,7 @@ pubsub(_) ->
[{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
[{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end,
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end),
spawn(fun() ->
emqttd:subscribe(<<"a/b/c">>),
emqttd:subscribe(<<"c/d/e">>),
@ -138,16 +141,42 @@ pubsub(_) ->
emqttd:subscribe(<<"a/#">>),
timer:sleep(10),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end,
?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end),
emqttd:unsubscribe(<<"a/#">>).
'pubsub+'(_) ->
emqttd:subscribe(<<"a/+/+">>),
timer:sleep(10),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end,
?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end),
emqttd:unsubscribe(<<"a/+/+">>).
pubsub_queue(_) ->
Self = self(), Q = <<"$queue/abc">>,
SubFun = fun() ->
emqttd:subscribe(Q),
{ok, Msgs} = loop_recv(Q, 10),
Self ! {recv, self(), Msgs}
end,
Sub1 = spawn(SubFun), Sub2 = spawn(SubFun),
timer:sleep(5),
emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)),
emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)),
emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)),
?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end),
?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end).
loop_recv(Topic, Timeout) ->
loop_recv(Topic, Timeout, []).
loop_recv(Topic, Timeout, Acc) ->
receive
{dispatch, Topic, Msg} ->
loop_recv(Topic, Timeout, [Msg|Acc])
after
Timeout -> {ok, Acc}
end.
%%--------------------------------------------------------------------
%% Router Test
%%--------------------------------------------------------------------
@ -293,7 +322,7 @@ dispatch_retained_messages(_) ->
payload = <<"payload">>},
emqttd_retainer:retain(Msg),
emqttd_retainer:dispatch(<<"a/b/+">>, self()),
true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end,
?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end),
emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}),
[] = emqttd_backend:read_messages(<<"a/b/c">>).