From faf05eb85a9d5718a908ba67cb6d4de2975d87ea Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 11 Mar 2016 23:19:17 +0800 Subject: [PATCH] test cases for server, pubsub and router --- rel/files/{test.config => emqttd.test.config} | 0 src/emqttd_pubsub.erl | 9 +- src/emqttd_topic.erl | 6 +- test/emqttd_SUITE.erl | 221 +++++++++++------- test/emqttd_access_SUITE.erl | 1 - test/emqttd_protocol_SUITE.erl | 3 +- test/emqttd_topic_SUITE.erl | 5 +- 7 files changed, 152 insertions(+), 93 deletions(-) rename rel/files/{test.config => emqttd.test.config} (100%) diff --git a/rel/files/test.config b/rel/files/emqttd.test.config similarity index 100% rename from rel/files/test.config rename to rel/files/emqttd.test.config diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index de1cf245c..03ca4907f 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -68,7 +68,7 @@ start_link(Pool, Id, Env) -> gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []). %% @doc Create a Topic. --spec create_topic(emqttd_topic:topic()) -> ok | {error, any()}. +-spec create_topic(binary()) -> ok | {error, any()}. create_topic(Topic) when is_binary(Topic) -> case mnesia:transaction(fun add_topic_/2, [Topic, [static]]) of {atomic, ok} -> ok; @@ -76,7 +76,7 @@ create_topic(Topic) when is_binary(Topic) -> end. %% @doc Lookup a Topic. --spec lookup_topic(emqttd_topic:topic()) -> list(mqtt_topic()). +-spec lookup_topic(binary()) -> list(mqtt_topic()). lookup_topic(Topic) when is_binary(Topic) -> mnesia:dirty_read(topic, Topic). @@ -106,7 +106,7 @@ publish(Topic, Msg) -> %% @doc Dispatch Message to Subscribers -spec dispatch(binary(), mqtt_message()) -> ok. -dispatch(Queue = <<"$Q/", _Q>>, Msg) -> +dispatch(Queue = <<"$queue/", _T>>, Msg) -> case subscribers(Queue) of [] -> dropped(Queue); @@ -163,7 +163,8 @@ call(PubSub, Req) when is_pid(PubSub) -> cast(PubSub, Msg) when is_pid(PubSub) -> gen_server2:cast(PubSub, Msg). -pick(Topic) -> gproc_pool:pick_worker(pubsub, Topic). +pick(Topic) -> + gproc_pool:pick_worker(pubsub, Topic). %%-------------------------------------------------------------------- %% gen_server Callbacks diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index f3c1f1a33..7e0d0b6ac 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -139,11 +139,9 @@ word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; word(Bin) -> Bin. -%% @doc Queue is a special topic name that starts with "$Q/" +%% @doc Queue is a special topic name that starts with "$queue/" -spec is_queue(topic()) -> boolean(). -is_queue(<<"$Q/", _Queue/binary>>) -> - true; -is_queue(<<"$q/", _Queue/binary>>) -> +is_queue(<<"$queue/", _Queue/binary>>) -> true; is_queue(_) -> false. diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 6eafdd2dd..bd6913a69 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -28,6 +28,7 @@ all() -> {group, broker}, {group, metrics}, {group, stats}, + {group, hook}, {group, cli}]. groups() -> @@ -35,11 +36,12 @@ groups() -> [create_topic, create_subscription, subscribe_unsubscribe, - publish_message]}, + publish, pubsub, + 'pubsub#', 'pubsub+']}, {router, [sequence], - [add_delete_routes, - add_delete_route, - route_message]}, + [router_add_del, + router_print, + router_unused]}, {session, [sequence], [start_session]}, {retainer, [sequence], @@ -50,12 +52,16 @@ groups() -> [inc_dec_metric]}, {stats, [sequence], [set_get_stat]}, + {hook, [sequence], + [add_delete_hook, + run_hooks]}, {cli, [sequence], [ctl_register_cmd, cli_status, cli_broker, cli_clients, cli_sessions, + cli_routes, cli_topics, cli_subscriptions, cli_bridges, @@ -74,98 +80,107 @@ end_per_suite(_Config) -> emqttd_mnesia:ensure_stopped(). %%-------------------------------------------------------------------- -%% PubSub Group +%% PubSub Test %%-------------------------------------------------------------------- create_topic(_) -> - Node = node(), - ok = emqttd_pubsub:create(topic, <<"topic/create">>), - ok = emqttd_pubsub:create(topic, <<"topic/create2">>), - [#mqtt_topic{topic = <<"topic/create">>, node = Node}] - = emqttd_pubsub:lookup(topic, <<"topic/create">>). + ok = emqttd:create(topic, <<"topic/create">>), + ok = emqttd:create(topic, <<"topic/create2">>), + [#mqtt_topic{topic = <<"topic/create">>, flags = [static]}] + = emqttd:lookup(topic, <<"topic/create">>). create_subscription(_) -> - ok = emqttd_pubsub:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}), + ok = emqttd:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}), [#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}] - = emqttd_pubsub:lookup(subscription, <<"clientId">>), - ok = emqttd_pubsub:delete(subscription, <<"clientId">>), - [] = emqttd_pubsub:lookup(subscription, <<"clientId">>). + = emqttd_backend:lookup_subscriptions(<<"clientId">>), + ok = emqttd_backend:del_subscriptions(<<"clientId">>), + [] = emqttd_backend:lookup_subscriptions(<<"clientId">>). subscribe_unsubscribe(_) -> - {ok, [1]} = emqttd_pubsub:subscribe({<<"topic/subunsub">>, 1}), - {ok, [1, 2]} = emqttd_pubsub:subscribe([{<<"topic/subunsub1">>, 1}, {<<"topic/subunsub2">>, 2}]), - ok = emqttd_pubsub:unsubscribe(<<"topic/subunsub">>), - ok = emqttd_pubsub:unsubscribe([<<"topic/subunsub1">>, <<"topic/subunsub2">>]), + ok = emqttd:subscribe(<<"topic/subunsub">>), + ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub1">>, 1), + ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub2">>, 2), + ok = emqttd:unsubscribe(<<"topic/subunsub">>), + ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub1">>, 1), + ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub2">>, 2). - {ok, [1]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, {<<"topic/subunsub">>, 1}), - {ok, [1,2]} = emqttd_pubsub:subscribe(<<"client_subunsub">>, [{<<"topic/subunsub1">>, 1}, - {<<"topic/subunsub2">>, 2}]), - ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, <<"topic/subunsub">>), - ok = emqttd_pubsub:unsubscribe(<<"client_subunsub">>, [<<"topic/subunsub1">>, - <<"topic/subunsub2">>]). - -publish_message(_) -> +publish(_) -> Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>), - {ok, [1]} = emqttd_pubsub:subscribe({<<"test/+">>, qos1}), - emqttd_pubsub:publish(Msg), + ok = emqttd:subscribe(<<"test/+">>), + emqttd:publish(Msg), true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end. +pubsub(_) -> + Self = self(), + emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}), + emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}), + [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self), + [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>), + emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), + true = receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end, + spawn(fun() -> + emqttd:subscribe(<<"a/b/c">>), + emqttd:subscribe(<<"c/d/e">>), + timer:sleep(10), + emqttd:unsubscribe(<<"a/b/c">>) + end), + timer:sleep(20), + emqttd:unsubscribe(<<"a/b/c">>). + +'pubsub#'(_) -> + emqttd:subscribe(<<"a/#">>), + emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), + true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end, + emqttd:unsubscribe(<<"a/#">>). + +'pubsub+'(_) -> + emqttd:subscribe(<<"a/+/+">>), + emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), + true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end, + emqttd:unsubscribe(<<"a/+/+">>). + %%-------------------------------------------------------------------- -%% Route Group +%% Router Test %%-------------------------------------------------------------------- -add_delete_route(_) -> - Self = self(), - emqttd_router:add_route(<<"topic1">>, Self), - true = emqttd_router:has_route(<<"topic1">>), - emqttd_router:add_route(<<"topic2">>, Self), - true = emqttd_router:has_route(<<"topic2">>), - [Self] = emqttd_router:lookup_routes(<<"topic1">>), - [Self] = emqttd_router:lookup_routes(<<"topic2">>), - %% Del topic1 - emqttd_router:delete_route(<<"topic1">>, Self), - erlang:yield(), - timer:sleep(10), - false = emqttd_router:has_route(<<"topic1">>), - %% Del topic2 - emqttd_router:delete_route(<<"topic2">>, Self), - erlang:yield(), - timer:sleep(10), - false = emqttd_router:has_route(<<"topic2">>). +router_add_del(_) -> + %% Add + emqttd_router:add_route(<<"#">>), + emqttd_router:add_route(<<"a/b/c">>), + emqttd_router:add_route(<<"+/#">>, node()), + Routes = [R1, R2 | _] = [ + #mqtt_route{topic = <<"#">>, node = node()}, + #mqtt_route{topic = <<"+/#">>, node = node()}, + #mqtt_route{topic = <<"a/b/c">>, node = node()}], + Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)), -add_delete_routes(_) -> - Self = self(), - emqttd_router:add_routes([], Self), - emqttd_router:add_routes([<<"t0">>], Self), - emqttd_router:add_routes([<<"t1">>,<<"t2">>,<<"t3">>], Self), - true = emqttd_router:has_route(<<"t1">>), - [Self] = emqttd_router:lookup_routes(<<"t1">>), - [Self] = emqttd_router:lookup_routes(<<"t2">>), - [Self] = emqttd_router:lookup_routes(<<"t3">>), + %% Batch Add + emqttd_router:add_routes(Routes), + Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)), - emqttd_router:delete_routes([<<"t3">>], Self), - emqttd_router:delete_routes([<<"t0">>, <<"t1">>], Self), - erlang:yield(), - timer:sleep(10), - false = emqttd_router:has_route(<<"t0">>), - false = emqttd_router:has_route(<<"t1">>), - true = emqttd_router:has_route(<<"t2">>), - false = emqttd_router:has_route(<<"t3">>). + %% Del + emqttd_router:del_route(<<"a/b/c">>), + [R1, R2] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)), + {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), -route_message(_) -> - Self = self(), - Pid = spawn_link(fun() -> timer:sleep(1000) end), - emqttd_router:add_routes([<<"$Q/1">>,<<"t/2">>,<<"t/3">>], Self), - emqttd_router:add_routes([<<"t/2">>], Pid), - Msg1 = #mqtt_message{topic = <<"$Q/1">>, payload = <<"q">>}, - Msg2 = #mqtt_message{topic = <<"t/2">>, payload = <<"t2">>}, - Msg3 = #mqtt_message{topic = <<"t/3">>, payload = <<"t3">>}, - emqttd_router:route(<<"$Q/1">>, Msg1), - emqttd_router:route(<<"t/2">>, Msg2), - emqttd_router:route(<<"t/3">>, Msg3), - [Msg1, Msg2, Msg3] = recv_loop([]), - emqttd_router:add_route(<<"$Q/1">>, Self), - emqttd_router:route(<<"$Q/1">>, Msg1). + %% Batch Del + R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'}, + emqttd_router:add_route(R3), + emqttd_router:del_routes([R1, R2]), + emqttd_router:del_route(R3), + [] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)). + +router_print(_) -> + Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, + #mqtt_route{topic = <<"#">>, node = node()}, + #mqtt_route{topic = <<"+/#">>, node = node()}], + emqttd_router:add_routes(Routes), + emqttd_router:print(<<"a/b/c">>). + +router_unused(_) -> + gen_server:call(emqttd_router, bad_call), + gen_server:cast(emqttd_router, bad_msg), + emqttd_router ! bad_info. recv_loop(Msgs) -> receive @@ -225,6 +240,46 @@ set_get_stat(_) -> emqttd_stats:setstat('retained/max', 99), 99 = emqttd_stats:getstat('retained/max'). +%%-------------------------------------------------------------------- +%% Hook Test +%%-------------------------------------------------------------------- + +add_delete_hook(_) -> + emqttd:hook(test_hook, fun ?MODULE:hook_fun1/1, []), + emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []), + {error, already_hooked} = emqttd:hook(test_hook, fun ?MODULE:hook_fun2/1, []), + Callbacks = [{callback, fun ?MODULE:hook_fun1/1, [], 0}, + {callback, fun ?MODULE:hook_fun2/1, [], 0}], + Callbacks = emqttd_hook:lookup(test_hook), + emqttd:unhook(test_hook, fun ?MODULE:hook_fun1/1), + emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1), + ok = emqttd:unhook(test_hook, fun ?MODULE:hook_fun2/1), + {error, not_found} = emqttd:unhook(test_hook1, fun ?MODULE:hook_fun2/1), + [] = emqttd_hook:lookup(test_hook), + + emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun1/1, [], 9), + emqttd:hook(emqttd_hook, fun ?MODULE:hook_fun2/1, [], 8), + Callbacks2 = [{callback, fun ?MODULE:hook_fun2/1, [], 8}, + {callback, fun ?MODULE:hook_fun1/1, [], 9}], + Callbacks2 = emqttd_hook:lookup(emqttd_hook), + emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun1/1), + emqttd:unhook(emqttd_hook, fun ?MODULE:hook_fun2/1), + [] = emqttd_hook:lookup(emqttd_hook). + +run_hooks(_) -> + emqttd:hook(test_hook, fun ?MODULE:hook_fun3/4, [init]), + emqttd:hook(test_hook, fun ?MODULE:hook_fun4/4, [init]), + emqttd:hook(test_hook, fun ?MODULE:hook_fun5/4, [init]), + {stop, [r3, r2]} = emqttd:run_hooks(test_hook, [arg1, arg2], []), + {ok, []} = emqttd:run_hooks(unknown_hook, [], []). + +hook_fun1([]) -> ok. +hook_fun2([]) -> {ok, []}. + +hook_fun3(arg1, arg2, _Acc, init) -> ok. +hook_fun4(arg1, arg2, Acc, init) -> {ok, [r2 | Acc]}. +hook_fun5(arg1, arg2, Acc, init) -> {stop, [r3 | Acc]}. + %%-------------------------------------------------------------------- %% CLI Group %%-------------------------------------------------------------------- @@ -263,9 +318,17 @@ cli_sessions(_) -> emqttd_cli:sessions(["list", "transient"]), emqttd_cli:sessions(["show", "clientId"]). +cli_routes(_) -> + emqttd:subscribe(<<"topic/route">>), + emqttd_cli:routes(["list"]), + emqttd_cli:routes(["show", "topic/route"]), + emqttd:unsubscribe(<<"topic/route">>). + cli_topics(_) -> + emqttd:subscribe(<<"topic">>), emqttd_cli:topics(["list"]), - emqttd_cli:topics(["show", "topic"]). + emqttd_cli:topics(["show", "topic"]), + emqttd:unsubscribe(<<"topic">>). cli_subscriptions(_) -> emqttd_cli:subscriptions(["list"]), diff --git a/test/emqttd_access_SUITE.erl b/test/emqttd_access_SUITE.erl index 3a58d031c..c12fd00bd 100644 --- a/test/emqttd_access_SUITE.erl +++ b/test/emqttd_access_SUITE.erl @@ -74,7 +74,6 @@ end_per_testcase(_TestCase, _Config) -> %%-------------------------------------------------------------------- reload_acl(_) -> - ct:print("~p~n", [whereis(?AC)]), [ok] = ?AC:reload_acl(). register_mod(_) -> diff --git a/test/emqttd_protocol_SUITE.erl b/test/emqttd_protocol_SUITE.erl index f5f63a185..18984591d 100644 --- a/test/emqttd_protocol_SUITE.erl +++ b/test/emqttd_protocol_SUITE.erl @@ -132,9 +132,8 @@ parse_bridge(_) -> %% CONNECT(Q0, R0, D0, ClientId=C_00:0C:29:2B:77:52, ProtoName=MQIsdp, ProtoVsn=131, CleanSess=false, KeepAlive=60, %% Username=undefined, Password=undefined, Will(Q1, R1, Topic=$SYS/broker/connection/C_00:0C:29:2B:77:52/state, Msg=0)) {ok, #mqtt_packet{variable = Variable}, <<>>} = Parser(Data), - ct:print("~p", [Variable]), #mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>, - proto_ver = 16#83, + proto_ver = 16#03, proto_name = <<"MQIsdp">>, will_retain = true, will_qos = 1, diff --git a/test/emqttd_topic_SUITE.erl b/test/emqttd_topic_SUITE.erl index aa79fffa1..abcf50cf9 100644 --- a/test/emqttd_topic_SUITE.erl +++ b/test/emqttd_topic_SUITE.erl @@ -156,8 +156,7 @@ t_join(_) -> <<"ab/+/#">> = join(words(<<"ab/+/#">>)). t_is_queue(_) -> - true = is_queue(<<"$Q/queue">>), - true = is_queue(<<"$q/queue">>), + true = is_queue(<<"$queue/queue">>), false = is_queue(<<"xyz/queue">>). t_systop(_) -> @@ -167,7 +166,7 @@ t_systop(_) -> SysTop2 = systop(<<"abc">>). t_feed_var(_) -> - <<"$Q/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$Q/client/$c">>), + <<"$queue/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>), <<"username/test/client/x">> = feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>), <<"username/test/client/clientId">> = feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>).