test cases for server, pubsub and router

This commit is contained in:
Feng 2016-03-11 23:19:17 +08:00
parent 32635af084
commit faf05eb85a
7 changed files with 152 additions and 93 deletions

View File

@ -68,7 +68,7 @@ start_link(Pool, Id, Env) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
%% @doc Create a Topic.
-spec create_topic(emqttd_topic:topic()) -> ok | {error, any()}.
-spec create_topic(binary()) -> ok | {error, any()}.
create_topic(Topic) when is_binary(Topic) ->
case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of
{atomic, ok} -> ok;
@ -76,7 +76,7 @@ create_topic(Topic) when is_binary(Topic) ->
end.
%% @doc Lookup a Topic.
-spec lookup_topic(emqttd_topic:topic()) -> list(mqtt_topic()).
-spec lookup_topic(binary()) -> list(mqtt_topic()).
lookup_topic(Topic) when is_binary(Topic) ->
mnesia:dirty_read(topic, Topic).
@ -106,7 +106,7 @@ publish(Topic, Msg) ->
%% @doc Dispatch Message to Subscribers
-spec dispatch(binary(), mqtt_message()) -> ok.
dispatch(Queue = <<"$Q/", _Q>>, Msg) ->
dispatch(Queue = <<"$queue/", _T>>, Msg) ->
case subscribers(Queue) of
[] ->
dropped(Queue);
@ -163,7 +163,8 @@ call(PubSub, Req) when is_pid(PubSub) ->
cast(PubSub, Msg) when is_pid(PubSub) ->
gen_server2:cast(PubSub, Msg).
pick(Topic) -> gproc_pool:pick_worker(pubsub, Topic).
pick(Topic) ->
gproc_pool:pick_worker(pubsub, Topic).
%%--------------------------------------------------------------------
%% gen_server Callbacks

View File

@ -139,11 +139,9 @@ word(<<"+">>) -> '+';
word(<<"#">>) -> '#';
word(Bin) -> Bin.
%% @doc Queue is a special topic name that starts with "$Q/"
%% @doc Queue is a special topic name that starts with "$queue/"
-spec is_queue(topic()) -> boolean().
is_queue(<<"$Q/", _Queue/binary>>) ->
true;
is_queue(<<"$q/", _Queue/binary>>) ->
is_queue(<<"$queue/", _Queue/binary>>) ->
true;
is_queue(_) ->
false.

View File

@ -28,6 +28,7 @@ all() ->
{group, broker},
{group, metrics},
{group, stats},
{group, hook},
{group, cli}].
groups() ->
@ -35,11 +36,12 @@ groups() ->
[create_topic,
create_subscription,
subscribe_unsubscribe,
publish_message]},
publish, pubsub,
'pubsub#', 'pubsub+']},
{router, [sequence],
[add_delete_routes,
add_delete_route,
route_message]},
[router_add_del,
router_print,
router_unused]},
{session, [sequence],
[start_session]},
{retainer, [sequence],
@ -50,12 +52,16 @@ groups() ->
[inc_dec_metric]},
{stats, [sequence],
[set_get_stat]},
{hook, [sequence],
[add_delete_hook,
run_hooks]},
{cli, [sequence],
[ctl_register_cmd,
cli_status,
cli_broker,
cli_clients,
cli_sessions,
cli_routes,
cli_topics,
cli_subscriptions,
cli_bridges,
@ -74,98 +80,107 @@ end_per_suite(_Config) ->
emqttd_mnesia:ensure_stopped().
%%--------------------------------------------------------------------
%% PubSub Group
%% PubSub Test
%%--------------------------------------------------------------------
create_topic(_) ->
Node = node(),
ok = emqttd_pubsub:create(topic, <<"topic/create">>),
ok = emqttd_pubsub:create(topic, <<"topic/create2">>),
[#mqtt_topic{topic = <<"topic/create">>, node = Node}]
= emqttd_pubsub:lookup(topic, <<"topic/create">>).
ok = emqttd:create(topic, <<"topic/create">>),
ok = emqttd:create(topic, <<"topic/create2">>),
[#mqtt_topic{topic = <<"topic/create">>, flags = [static]}]
= emqttd:lookup(topic, <<"topic/create">>).
create_subscription(_) ->
ok = emqttd_pubsub:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
ok = emqttd:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}),
[#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}]
= emqttd_pubsub:lookup(subscription, <<"clientId">>),
ok = emqttd_pubsub:delete(subscription, <<"clientId">>),
[] = emqttd_pubsub:lookup(subscription, <<"clientId">>).
= emqttd_backend:lookup_subscriptions(<<"clientId">>),
ok = emqttd_backend:del_subscriptions(<<"clientId">>),
[] = emqttd_backend:lookup_subscriptions(<<"clientId">>).
subscribe_unsubscribe(_) ->
{ok, [1]} = emqttd_pubsub:subscribe({<<"topic/subunsub">>, 1}),
{ok, [1, 2]} = emqttd_pubsub:subscribe([{<<"topic/subunsub1">>, 1}, {<<"topic/subunsub2">>, 2}]),
ok = emqttd_pubsub:unsubscribe(<<"topic/subunsub">>),
ok = emqttd_pubsub:unsubscribe([<<"topic/subunsub1">>, <<"topic/subunsub2">>]),
ok = emqttd:subscribe(<<"topic/subunsub">>),
ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub2">>, 2),
ok = emqttd:unsubscribe(<<"topic/subunsub">>),
ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub1">>, 1),
ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub2">>, 2).
{ok, [1]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, {<<"topic/subunsub">>, 1}),
{ok, [1,2]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, [{<<"topic/subunsub1">>, 1},
{<<"topic/subunsub2">>, 2}]),
ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, <<"topic/subunsub">>),
ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, [<<"topic/subunsub1">>,
<<"topic/subunsub2">>]).
publish_message(_) ->
publish(_) ->
Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>),
{ok, [1]} = emqttd_pubsub:subscribe({<<"test/+">>, qos1}),
emqttd_pubsub:publish(Msg),
ok = emqttd:subscribe(<<"test/+">>),
emqttd:publish(Msg),
true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end.
pubsub(_) ->
Self = self(),
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}),
emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}),
[{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self),
[{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end,
spawn(fun() ->
emqttd:subscribe(<<"a/b/c">>),
emqttd:subscribe(<<"c/d/e">>),
timer:sleep(10),
emqttd:unsubscribe(<<"a/b/c">>)
end),
timer:sleep(20),
emqttd:unsubscribe(<<"a/b/c">>).
'pubsub#'(_) ->
emqttd:subscribe(<<"a/#">>),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end,
emqttd:unsubscribe(<<"a/#">>).
'pubsub+'(_) ->
emqttd:subscribe(<<"a/+/+">>),
emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)),
true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end,
emqttd:unsubscribe(<<"a/+/+">>).
%%--------------------------------------------------------------------
%% Route Group
%% Router Test
%%--------------------------------------------------------------------
add_delete_route(_) ->
Self = self(),
emqttd_router:add_route(<<"topic1">>, Self),
true = emqttd_router:has_route(<<"topic1">>),
emqttd_router:add_route(<<"topic2">>, Self),
true = emqttd_router:has_route(<<"topic2">>),
[Self] = emqttd_router:lookup_routes(<<"topic1">>),
[Self] = emqttd_router:lookup_routes(<<"topic2">>),
%% Del topic1
emqttd_router:delete_route(<<"topic1">>, Self),
erlang:yield(),
timer:sleep(10),
false = emqttd_router:has_route(<<"topic1">>),
%% Del topic2
emqttd_router:delete_route(<<"topic2">>, Self),
erlang:yield(),
timer:sleep(10),
false = emqttd_router:has_route(<<"topic2">>).
router_add_del(_) ->
%% Add
emqttd_router:add_route(<<"#">>),
emqttd_router:add_route(<<"a/b/c">>),
emqttd_router:add_route(<<"+/#">>, node()),
Routes = [R1, R2 | _] = [
#mqtt_route{topic = <<"#">>, node = node()},
#mqtt_route{topic = <<"+/#">>, node = node()},
#mqtt_route{topic = <<"a/b/c">>, node = node()}],
Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
add_delete_routes(_) ->
Self = self(),
emqttd_router:add_routes([], Self),
emqttd_router:add_routes([<<"t0">>], Self),
emqttd_router:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self),
true = emqttd_router:has_route(<<"t1">>),
[Self] = emqttd_router:lookup_routes(<<"t1">>),
[Self] = emqttd_router:lookup_routes(<<"t2">>),
[Self] = emqttd_router:lookup_routes(<<"t3">>),
%% Batch Add
emqttd_router:add_routes(Routes),
Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
emqttd_router:delete_routes([<<"t3">>], Self),
emqttd_router:delete_routes([<<"t0">>, <<"t1">>], Self),
erlang:yield(),
timer:sleep(10),
false = emqttd_router:has_route(<<"t0">>),
false = emqttd_router:has_route(<<"t1">>),
true = emqttd_router:has_route(<<"t2">>),
false = emqttd_router:has_route(<<"t3">>).
%% Del
emqttd_router:del_route(<<"a/b/c">>),
[R1, R2] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)),
{atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]),
route_message(_) ->
Self = self(),
Pid = spawn_link(fun() -> timer:sleep(1000) end),
emqttd_router:add_routes([<<"$Q/1">>,<<"t/2">>,<<"t/3">>], Self),
emqttd_router:add_routes([<<"t/2">>], Pid),
Msg1 = #mqtt_message{topic = <<"$Q/1">>, payload = <<"q">>},
Msg2 = #mqtt_message{topic = <<"t/2">>, payload = <<"t2">>},
Msg3 = #mqtt_message{topic = <<"t/3">>, payload = <<"t3">>},
emqttd_router:route(<<"$Q/1">>, Msg1),
emqttd_router:route(<<"t/2">>, Msg2),
emqttd_router:route(<<"t/3">>, Msg3),
[Msg1, Msg2, Msg3] = recv_loop([]),
emqttd_router:add_route(<<"$Q/1">>, Self),
emqttd_router:route(<<"$Q/1">>, Msg1).
%% Batch Del
R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'},
emqttd_router:add_route(R3),
emqttd_router:del_routes([R1, R2]),
emqttd_router:del_route(R3),
[] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)).
router_print(_) ->
Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()},
#mqtt_route{topic = <<"#">>, node = node()},
#mqtt_route{topic = <<"+/#">>, node = node()}],
emqttd_router:add_routes(Routes),
emqttd_router:print(<<"a/b/c">>).
router_unused(_) ->
gen_server:call(emqttd_router, bad_call),
gen_server:cast(emqttd_router, bad_msg),
emqttd_router ! bad_info.
recv_loop(Msgs) ->
receive
@ -225,6 +240,46 @@ set_get_stat(_) ->
emqttd_stats:setstat('retained/max', 99),
99 = emqttd_stats:getstat('retained/max').
%%--------------------------------------------------------------------
%% Hook Test
%%--------------------------------------------------------------------
add_delete_hook(_) ->
emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []),
emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
{error, already_hooked} = emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []),
Callbacks = [{callback, fun ?MODULE:hook_fun1/1, [], 0},
{callback, fun ?MODULE:hook_fun2/1, [], 0}],
Callbacks = emqttd_hook:lookup(test_hook),
emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1),
emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1),
{error, not_found} = emqttd:unhook(test_hook1, fun ?MODULE:hook_fun2/1),
[] = emqttd_hook:lookup(test_hook),
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9),
emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun2/1, [], 8),
Callbacks2 = [{callback, fun ?MODULE:hook_fun2/1, [], 8},
{callback, fun ?MODULE:hook_fun1/1, [], 9}],
Callbacks2 = emqttd_hook:lookup(emqttd_hook),
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1),
emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun2/1),
[] = emqttd_hook:lookup(emqttd_hook).
run_hooks(_) ->
emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]),
emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]),
emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]),
{stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []),
{ok, []} = emqttd:run_hooks(unknown_hook, [], []).
hook_fun1([]) -> ok.
hook_fun2([]) -> {ok, []}.
hook_fun3(arg1, arg2, _Acc, init) -> ok.
hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}.
hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}.
%%--------------------------------------------------------------------
%% CLI Group
%%--------------------------------------------------------------------
@ -263,9 +318,17 @@ cli_sessions(_) ->
emqttd_cli:sessions(["list", "transient"]),
emqttd_cli:sessions(["show", "clientId"]).
cli_routes(_) ->
emqttd:subscribe(<<"topic/route">>),
emqttd_cli:routes(["list"]),
emqttd_cli:routes(["show", "topic/route"]),
emqttd:unsubscribe(<<"topic/route">>).
cli_topics(_) ->
emqttd:subscribe(<<"topic">>),
emqttd_cli:topics(["list"]),
emqttd_cli:topics(["show", "topic"]).
emqttd_cli:topics(["show", "topic"]),
emqttd:unsubscribe(<<"topic">>).
cli_subscriptions(_) ->
emqttd_cli:subscriptions(["list"]),

View File

@ -74,7 +74,6 @@ end_per_testcase(_TestCase, _Config) ->
%%--------------------------------------------------------------------
reload_acl(_) ->
ct:print("~p~n", [whereis(?AC)]),
[ok] = ?AC:reload_acl().
register_mod(_) ->

View File

@ -132,9 +132,8 @@ parse_bridge(_) ->
%% CONNECT(Q0, R0, D0, ClientId=C_00:0C:29:2B:77:52, ProtoName=MQIsdp, ProtoVsn=131, CleanSess=false, KeepAlive=60,
%% Username=undefined, Password=undefined, Will(Q1, R1, Topic=$SYS/broker/connection/C_00:0C:29:2B:77:52/state, Msg=0))
{ok, #mqtt_packet{variable = Variable}, <<>>} = Parser(Data),
ct:print("~p", [Variable]),
#mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>,
proto_ver = 16#83,
proto_ver = 16#03,
proto_name = <<"MQIsdp">>,
will_retain = true,
will_qos = 1,

View File

@ -156,8 +156,7 @@ t_join(_) ->
<<"ab/+/#">> = join(words(<<"ab/+/#">>)).
t_is_queue(_) ->
true = is_queue(<<"$Q/queue">>),
true = is_queue(<<"$q/queue">>),
true = is_queue(<<"$queue/queue">>),
false = is_queue(<<"xyz/queue">>).
t_systop(_) ->
@ -167,7 +166,7 @@ t_systop(_) ->
SysTop2 = systop(<<"abc">>).
t_feed_var(_) ->
<<"$Q/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>),
<<"$queue/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>),
<<"username/test/client/x">> = feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>),
<<"username/test/client/clientId">> = feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>).