From 9a4c44913e3b240f6e4fbc16aa99c220a4b65e63 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 12 Aug 2016 12:59:03 +0800 Subject: [PATCH] commen tests --- Makefile | 2 +- src/emqttd.erl | 40 ++++++++++++----- src/emqttd_cli.erl | 14 ++---- src/emqttd_pubsub_sup.erl | 24 ++++++---- src/emqttd_server.erl | 18 +++++--- src/emqttd_session.erl | 9 ++-- src/emqttd_topic.erl | 49 +++++++++++++++------ test/emqttd_SUITE.erl | 82 ++++++++--------------------------- test/emqttd_backend_SUITE.erl | 45 ------------------- test/emqttd_topic_SUITE.erl | 30 ++++++++----- 10 files changed, 134 insertions(+), 179 deletions(-) delete mode 100644 test/emqttd_backend_SUITE.erl diff --git a/Makefile b/Makefile index e471d50ed..49642f180 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose # EUNIT_ERL_OPTS = -CT_SUITES = emqttd emqttd_access emqttd_backend emqttd_lib emqttd_mod emqttd_net \ +CT_SUITES = emqttd emqttd_access emqttd_lib emqttd_mod emqttd_net \ emqttd_mqueue emqttd_protocol emqttd_topic emqttd_trie CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqttd_ct@127.0.0.1 diff --git a/src/emqttd.erl b/src/emqttd.erl index 2518d39b3..521251efc 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -14,9 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqttd). +%% Facade Module for The EMQTT Broker --author("Feng Lee "). +-module(emqttd). -include("emqttd.hrl"). @@ -29,7 +29,8 @@ unsubscribe/1, unsubscribe/2]). %% PubSub Management API --export([topics/0, subscribers/1, subscriptions/1]). +-export([setqos/3, topics/0, subscriptions/1, subscribers/1, + is_subscribed/2, subscriber_down/1]). %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/3]). @@ -37,7 +38,7 @@ %% Debug API -export([dump/0]). --type(subscriber() :: pid() | binary() | function()). +-type(subscriber() :: pid() | binary()). -type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). @@ -81,7 +82,7 @@ is_running(Node) -> end. %%-------------------------------------------------------------------- -%% PubSub APIs that wrap emqttd_pubsub +%% PubSub APIs %%-------------------------------------------------------------------- %% @doc Subscribe @@ -95,11 +96,12 @@ subscribe(Topic, Subscriber) -> -spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | pubsub_error()). subscribe(Topic, Subscriber, Options) -> - emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options). + with_pubsub(fun(PS) -> PS:subscribe(iolist_to_binary(Topic), Subscriber, Options) end). %% @doc Publish MQTT Message -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -publish(Msg) -> emqttd_server:publish(Msg). +publish(Msg) -> + with_pubsub(fun(PS) -> PS:publish(Msg) end). %% @doc Unsubscribe -spec(unsubscribe(iodata()) -> ok | pubsub_error()). @@ -108,18 +110,32 @@ unsubscribe(Topic) -> -spec(unsubscribe(iodata(), subscriber()) -> ok | pubsub_error()). unsubscribe(Topic, Subscriber) -> - emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber). + with_pubsub(fun(PS) -> PS:unsubscribe(iolist_to_binary(Topic), Subscriber) end). + +-spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok). +setqos(Topic, Subscriber, Qos) -> + with_pubsub(fun(PS) -> PS:setqos(iolist_to_binary(Topic), Subscriber, Qos) end). -spec(topics() -> [binary()]). topics() -> emqttd_router:topics(). -spec(subscribers(iodata()) -> list(subscriber())). subscribers(Topic) -> - emqttd_pubsub:subscribers(iolist_to_binary(Topic)). + with_pubsub(fun(PS) -> PS:subscribers(iolist_to_binary(Topic)) end). -spec(subscriptions(subscriber()) -> [{binary(), suboption()}]). subscriptions(Subscriber) -> - emqttd_server:get_subscriptions(Subscriber). + with_pubsub(fun(PS) -> PS:subscriptions(Subscriber) end). + +-spec(is_subscribed(iodata(), subscriber()) -> boolean()). +is_subscribed(Topic, Subscriber) -> + with_pubsub(fun(PS) -> PS:is_subscribed(iolist_to_binary(Topic), Subscriber) end). + +-spec(subscriber_down(subscriber()) -> ok). +subscriber_down(Subscriber) -> + with_pubsub(fun(PS) -> PS:subscriber_down(Subscriber) end). + +with_pubsub(Fun) -> Fun(env(pubsub_server, emqttd_server)). %%-------------------------------------------------------------------- %% Hooks API @@ -141,9 +157,9 @@ unhook(Hook, Function) -> run_hooks(Hook, Args, Acc) -> emqttd_hook:run(Hook, Args, Acc). - %%-------------------------------------------------------------------- %% Debug %%-------------------------------------------------------------------- -dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]). +dump() -> with_pubsub(fun(PS) -> lists:append([PS:dump(), emqttd_router:dump()]) end). + diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index a64f086ed..6c1907f2a 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -170,7 +170,7 @@ if_client(ClientId, Fun) -> %%-------------------------------------------------------------------- %% @doc Sessions Command sessions(["list"]) -> - [sessions(["list", Type]) || Type <- ["persistent", "transient"]]; + dump(mqtt_local_session); sessions(["list", "persistent"]) -> dump(mqtt_persistent_session); @@ -179,15 +179,9 @@ sessions(["list", "transient"]) -> dump(mqtt_transient_session); sessions(["show", ClientId]) -> - MP = {{bin(ClientId), '_'}, '_'}, - case {ets:match_object(mqtt_transient_session, MP), - ets:match_object(mqtt_persistent_session, MP)} of - {[], []} -> - ?PRINT_MSG("Not Found.~n"); - {[SessInfo], _} -> - print(SessInfo); - {_, [SessInfo]} -> - print(SessInfo) + case ets:lookup(mqtt_local_session, bin(ClientId)) of + [] -> ?PRINT_MSG("Not Found.~n"); + [SessInfo] -> print(SessInfo) end; sessions(_) -> diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 1b406e751..58143cc83 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -45,24 +45,30 @@ init([Env]) -> %% Create ETS Tables [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]], - %% PubSub Pool - PubSubMFA = {emqttd_pubsub, start_link, [Env]}, - PubSubPool = pool_sup(pubsub, Env, PubSubMFA), + {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }. - %% Server Pool - ServerMFA = {emqttd_server, start_link, [Env]}, - ServerPool = pool_sup(server, Env, ServerMFA), - - {ok, { {one_for_all, 10, 3600}, [PubSubPool, ServerPool]} }. +%%-------------------------------------------------------------------- +%% Pool +%%-------------------------------------------------------------------- pool_size(Env) -> Schedulers = erlang:system_info(schedulers), proplists:get_value(pool_size, Env, Schedulers). -pool_sup(Name, Env, MFA) -> +pool_sup(Name, Env) -> Pool = list_to_atom(atom_to_list(Name) ++ "_pool"), + MFA = {adapter(Name), start_link, [Env]}, emqttd_pool_sup:spec(Pool, [Name, hash, pool_size(Env), MFA]). +%%-------------------------------------------------------------------- +%% Adapter +%%-------------------------------------------------------------------- + +adapter(server) -> + emqttd:env(pubsub_server, emqttd_server); +adapter(pubsub) -> + emqttd:env(pubsub_adapter, emqttd_pubsub). + %%-------------------------------------------------------------------- %% Create PubSub Tables %%-------------------------------------------------------------------- diff --git a/src/emqttd_server.erl b/src/emqttd_server.erl index c56ed2309..567d49388 100644 --- a/src/emqttd_server.erl +++ b/src/emqttd_server.erl @@ -37,7 +37,8 @@ async_unsubscribe/1, async_unsubscribe/2]). %% Management API. --export([setqos/3, is_subscribed/2, get_subscriptions/1, subscriber_down/1]). +-export([setqos/3, subscriptions/1, subscribers/1, is_subscribed/2, + subscriber_down/1]). %% Debug API -export([dump/0]). @@ -131,12 +132,8 @@ async_unsubscribe(Topic, Subscriber) when is_binary(Topic) -> setqos(Topic, Subscriber, Qos) when is_binary(Topic) -> call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}). --spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()). -is_subscribed(Topic, Subscriber) when is_binary(Topic) -> - ets:member(mqtt_subproperty, {Topic, Subscriber}). - --spec(get_subscriptions(emqttd:subscriber()) -> [{binary(), list()}]). -get_subscriptions(Subscriber) -> +-spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]). +subscriptions(Subscriber) -> lists:map(fun({_, Topic}) -> subscription(Topic, Subscriber) end, ets:lookup(mqtt_subscription, Subscriber)). @@ -144,6 +141,13 @@ get_subscriptions(Subscriber) -> subscription(Topic, Subscriber) -> {Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}. +subscribers(Topic) -> emqttd_pubsub:subscribers(Topic). + +-spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()). +is_subscribed(Topic, Subscriber) when is_binary(Topic) -> + ets:member(mqtt_subproperty, {Topic, Subscriber}). + +-spec(subscriber_down(emqttd:subscriber()) -> ok). subscriber_down(Subscriber) -> cast(pick(Subscriber), {subscriber_down, Subscriber}). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f27ba605b..2035ce3f6 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -297,7 +297,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session), SubDict; {ok, OldQos} -> - emqttd_server:setqos(Topic, ClientId, Qos), + emqttd:setqos(Topic, ClientId, Qos), ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session), dict:store(Topic, Qos, SubDict); error -> @@ -328,8 +328,8 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, Subscriptions1 = lists:foldl( fun(Topic, SubDict) -> case dict:find(Topic, SubDict) of - {ok, Qos} -> - emqttd:unsubscribe(ClientId, Topic, Qos), + {ok, _Qos} -> + emqttd:unsubscribe(ClientId, Topic), dict:erase(Topic, SubDict); error -> SubDict @@ -532,8 +532,7 @@ handle_info(Info, Session) -> ?UNEXPECTED_INFO(Info, Session). terminate(_Reason, #session{client_id = ClientId}) -> - %%TODO: ... - emqttd_server:subscriber_down(ClientId), + emqttd:subscriber_down(ClientId), emqttd_sm:unregister_session(ClientId). code_change(_OldVsn, Session, _Extra) -> diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 5e91ffa26..5ece2255b 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -16,17 +16,20 @@ -module(emqttd_topic). +-import(lists, [reverse/1]). -export([match/2, validate/1, triples/1, words/1, wildcard/1]). --export([join/1, feed_var/3, is_queue/1, systop/1]). +-export([join/1, feed_var/3, systop/1]). --type topic() :: binary(). +-export([strip/1, strip/2]). --type word() :: '' | '+' | '#' | binary(). +-type(topic() :: binary()). --type words() :: list(word()). +-type(word() :: '' | '+' | '#' | binary()). --type triple() :: {root | binary(), word(), binary()}. +-type(words() :: list(word())). + +-type(triple() :: {root | binary(), word(), binary()}). -export_type([topic/0, word/0, triple/0]). @@ -111,7 +114,7 @@ triples(Topic) when is_binary(Topic) -> triples(words(Topic), root, []). triples([], _Parent, Acc) -> - lists:reverse(Acc); + reverse(Acc); triples([W|Words], Parent, Acc) -> Node = join(Parent, W), @@ -137,13 +140,6 @@ word(<<"+">>) -> '+'; word(<<"#">>) -> '#'; word(Bin) -> Bin. -%% @doc Queue is a special topic name that starts with "$queue/" --spec(is_queue(topic()) -> boolean()). -is_queue(<<"$queue/", _Queue/binary>>) -> - true; -is_queue(_) -> - false. - %% @doc '$SYS' Topic. systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])); @@ -155,7 +151,7 @@ systop(Name) when is_binary(Name) -> feed_var(Var, Val, Topic) -> feed_var(Var, Val, words(Topic), []). feed_var(_Var, _Val, [], Acc) -> - join(lists:reverse(Acc)); + join(reverse(Acc)); feed_var(Var, Val, [Var|Words], Acc) -> feed_var(Var, Val, Words, [Val|Acc]); feed_var(Var, Val, [W|Words], Acc) -> @@ -175,3 +171,28 @@ join(Words) -> end, {true, <<>>}, [bin(W) || W <- Words]), Bin. +-spec(strip(topic()) -> {topic(), [local | {share, binary()}]}). +strip(Topic) when is_binary(Topic) -> + strip(Topic, []). + +strip(Topic = <<"$local/", Topic1/binary>>, Options) -> + case lists:member(local, Options) of + true -> error({invalid_topic, Topic}); + false -> strip(Topic1, [local | Options]) + end; + +strip(Topic = <<"$queue/", Topic1/binary>>, Options) -> + case lists:keyfind(share, 1, Options) of + {share, _} -> error({invalid_topic, Topic}); + false -> strip(Topic1, [{share, '$queue'} | Options]) + end; + +strip(Topic = <<"$share/", Topic1/binary>>, Options) -> + case lists:keyfind(share, 1, Options) of + {share, _} -> error({invalid_topic, Topic}); + false -> [Share, Topic2] = binary:split(Topic1, <<"/">>), + {Topic2, [{share, Share} | Options]} + end; + +strip(Topic, Options) -> {Topic, Options}. + diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index afd5b28d1..93b1b380b 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -32,19 +32,16 @@ all() -> {group, metrics}, {group, stats}, {group, hook}, - {group, backend}, + %%{group, backend}, {group, cli}]. groups() -> [{protocol, [sequence], [mqtt_connect]}, {pubsub, [sequence], - [create_topic, - create_subscription, - subscribe_unsubscribe, + [subscribe_unsubscribe, publish, pubsub, - 'pubsub#', 'pubsub+', - pubsub_queue]}, + 'pubsub#', 'pubsub+']}, {router, [sequence], [router_add_del, router_print, @@ -65,7 +62,7 @@ groups() -> dispatch_retained_messages, expire_retained_messages]}, {backend, [sequence], - [backend_subscription]}, + []}, {cli, [sequence], [ctl_register_cmd, cli_status, @@ -115,26 +112,13 @@ connect_broker_(Packet, RecvSize) -> %% PubSub Test %%-------------------------------------------------------------------- -create_topic(_) -> - ok = emqttd:create(topic, <<"topic/create">>), - ok = emqttd:create(topic, <<"topic/create2">>), - [#mqtt_topic{topic = <<"topic/create">>, flags = [static]}] - = emqttd:lookup(topic, <<"topic/create">>). - -create_subscription(_) -> - ok = emqttd:create(subscription, {<<"clientId">>, <<"topic/sub">>, qos2}), - [#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}] - = emqttd_backend:lookup_subscriptions(<<"clientId">>), - ok = emqttd_backend:del_subscriptions(<<"clientId">>), - ?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)). - subscribe_unsubscribe(_) -> - ok = emqttd:subscribe(<<"topic/subunsub">>), - ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub1">>, 1), - ok = emqttd:subscribe(<<"clientId">>, <<"topic/subunsub2">>, 2), - ok = emqttd:unsubscribe(<<"topic/subunsub">>), - ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub1">>, 1), - ok = emqttd:unsubscribe(<<"clientId">>, <<"topic/subunsub2">>, 2). + ok = emqttd:subscribe(<<"topic">>, <<"clientId">>), + ok = emqttd:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]), + ok = emqttd:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]), + ok = emqttd:unsubscribe(<<"topic">>, <<"clientId">>), + ok = emqttd:unsubscribe(<<"topic/1">>, <<"clientId">>), + ok = emqttd:unsubscribe(<<"topic/2">>, <<"clientId">>). publish(_) -> Msg = emqttd_message:make(ct, <<"test/pubsub">>, <<"hello">>), @@ -145,11 +129,11 @@ publish(_) -> pubsub(_) -> Self = self(), - emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 1}), - emqttd:subscribe({<<"clientId">>, <<"a/b/c">>, 2}), + ok = emqttd:subscribe(<<"a/b/c">>, Self, [{qos, 1}]), + ?assertMatch({error, _}, emqttd:subscribe(<<"a/b/c">>, Self, [{qos, 2}])), timer:sleep(10), - [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self), - [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>), + [{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self), + [{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), spawn(fun() -> @@ -175,22 +159,6 @@ pubsub(_) -> ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end), emqttd:unsubscribe(<<"a/+/+">>). -pubsub_queue(_) -> - Self = self(), Q = <<"$queue/abc">>, - SubFun = fun() -> - emqttd:subscribe(Q), - timer:sleep(10), - {ok, Msgs} = loop_recv(Q, 10), - Self ! {recv, self(), Msgs} - end, - Sub1 = spawn(SubFun), Sub2 = spawn(SubFun), - timer:sleep(5), - emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)), - emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)), - emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)), - ?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end), - ?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end). - loop_recv(Topic, Timeout) -> loop_recv(Topic, Timeout, []). @@ -215,15 +183,15 @@ router_add_del(_) -> #mqtt_route{topic = <<"#">>, node = node()}, #mqtt_route{topic = <<"+/#">>, node = node()}, #mqtt_route{topic = <<"a/b/c">>, node = node()}], - Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), %% Batch Add emqttd_router:add_routes(Routes), - Routes = lists:sort(emqttd_router:lookup(<<"a/b/c">>)), + Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)), %% Del emqttd_router:del_route(<<"a/b/c">>), - [R1, R2] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)), + [R1, R2] = lists:sort(emqttd_router:match(<<"a/b/c">>)), {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]), %% Batch Del @@ -231,7 +199,7 @@ router_add_del(_) -> emqttd_router:add_route(R3), emqttd_router:del_routes([R1, R2]), emqttd_router:del_route(R3), - [] = lists:sort(emqttd_router:lookup(<<"a/b/c">>)). + [] = lists:sort(emqttd_router:match(<<"a/b/c">>)). router_print(_) -> Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()}, @@ -360,20 +328,6 @@ expire_retained_messages(_) -> emqttd_backend:expire_messages(emqttd_time:now_to_secs()), 0 = emqttd_backend:retained_count(). -%%-------------------------------------------------------------------- -%% Backend Test -%%-------------------------------------------------------------------- - -backend_subscription(_) -> - Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2}, - Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"#">>, qos = 2}, - emqttd_backend:add_subscription(Sub1), - emqttd_backend:add_subscription(Sub2), - [Sub1, Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>), - emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>), - [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>), - emqttd_backend:del_subscriptions(<<"clientId">>), - [] = emqttd_backend:lookup_subscriptions(<<"clientId">>). %%-------------------------------------------------------------------- %% CLI Group diff --git a/test/emqttd_backend_SUITE.erl b/test/emqttd_backend_SUITE.erl deleted file mode 100644 index ceb1b3d6e..000000000 --- a/test/emqttd_backend_SUITE.erl +++ /dev/null @@ -1,45 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_backend_SUITE). - --include("emqttd.hrl"). - --compile(export_all). - -all() -> [{group, subscription}]. - -groups() -> [{subscription, [], [add_del_subscription]}]. - -init_per_suite(Config) -> - ok = emqttd_mnesia:ensure_started(), - emqttd_backend:mnesia(boot), - emqttd_backend:mnesia(copy), - Config. - -end_per_suite(_Config) -> - emqttd_mnesia:ensure_stopped(). - -add_del_subscription(_) -> - Sub1 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 2}, - Sub2 = #mqtt_subscription{subid = <<"clientId">>, topic = <<"topic">>, qos = 1}, - ok = emqttd_backend:add_subscription(Sub1), - {error, already_existed} = emqttd_backend:add_subscription(Sub1), - ok = emqttd_backend:add_subscription(Sub2), - [Sub2] = emqttd_backend:lookup_subscriptions(<<"clientId">>), - emqttd_backend:del_subscription(<<"clientId">>, <<"topic">>), - [] = emqttd_backend:lookup_subscriptions(<<"clientId">>). - diff --git a/test/emqttd_topic_SUITE.erl b/test/emqttd_topic_SUITE.erl index abcf50cf9..5e9608e00 100644 --- a/test/emqttd_topic_SUITE.erl +++ b/test/emqttd_topic_SUITE.erl @@ -16,18 +16,20 @@ -module(emqttd_topic_SUITE). +-include_lib("eunit/include/eunit.hrl"). + %% CT -compile(export_all). -import(emqttd_topic, [wildcard/1, match/2, validate/1, triples/1, join/1, - words/1, systop/1, is_queue/1, feed_var/3]). + words/1, systop/1, feed_var/3, strip/1, strip/2]). -define(N, 10000). all() -> [t_wildcard, t_match, t_match2, t_validate, t_triples, t_join, - t_words, t_systop, t_is_queue, t_feed_var, t_sys_match, 't_#_match', + t_words, t_systop, t_feed_var, t_sys_match, 't_#_match', t_sigle_level_validate, t_sigle_level_match, t_match_perf, - t_triples_perf]. + t_triples_perf, t_strip]. t_wildcard(_) -> true = wildcard(<<"a/b/#">>), @@ -155,21 +157,25 @@ t_join(_) -> <<"/ab/cd/ef/">> = join(words(<<"/ab/cd/ef/">>)), <<"ab/+/#">> = join(words(<<"ab/+/#">>)). -t_is_queue(_) -> - true = is_queue(<<"$queue/queue">>), - false = is_queue(<<"xyz/queue">>). - t_systop(_) -> SysTop1 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/xyz"]), - SysTop1 = systop('xyz'), + ?assertEqual(SysTop1, systop('xyz')), SysTop2 = iolist_to_binary(["$SYS/brokers/", atom_to_list(node()), "/abc"]), - SysTop2 = systop(<<"abc">>). + ?assertEqual(SysTop2,systop(<<"abc">>)). t_feed_var(_) -> - <<"$queue/client/clientId">> = feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>), - <<"username/test/client/x">> = feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>), - <<"username/test/client/clientId">> = feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>). + ?assertEqual(<<"$queue/client/clientId">>, feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)), + ?assertEqual(<<"username/test/client/x">>, feed_var(<<"%u">>, <<"test">>, <<"username/%u/client/x">>)), + ?assertEqual(<<"username/test/client/clientId">>, feed_var(<<"%c">>, <<"clientId">>, <<"username/test/client/%c">>)). long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]). +t_strip(_) -> + ?assertEqual({<<"a/b/+/#">>, []}, strip(<<"a/b/+/#">>)), + ?assertEqual({<<"topic">>, [{share, '$queue'}]}, strip(<<"$queue/topic">>)), + ?assertEqual({<<"topic">>, [{share, <<"group">>}]}, strip(<<"$share/group/topic">>)), + ?assertEqual({<<"topic">>, [local]}, strip(<<"$local/topic">>)), + ?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, strip(<<"$local/$queue/topic">>)), + ?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, strip(<<"$local/$share/group//a/b/c">>)). +