diff --git a/src/emqttd_boot.erl b/src/emqttd_boot.erl index 75302365e..054b57bde 100644 --- a/src/emqttd_boot.erl +++ b/src/emqttd_boot.erl @@ -18,6 +18,10 @@ -export([apply_module_attributes/1, all_module_attributes/1]). +-ifdef(TEST). +-compile(export_all). +-endif. + %% only {F, Args}... apply_module_attributes(Name) -> [{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} || @@ -43,7 +47,7 @@ all_module_attributes(Name) -> %% Copy from rabbit_misc.erl module_attributes(Module) -> case catch Module:module_info(attributes) of - {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + {'EXIT', {undef, [{Module, module_info, [attributes], []} | _]}} -> []; {'EXIT', Reason} -> exit(Reason); diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index 9f62e6a74..e1589d07c 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -30,6 +30,9 @@ %% Schema and tables -export([copy_schema/1, delete_schema/0, del_schema_copy/1, create_table/2, copy_table/1, copy_table/2]). +-ifdef(TEST). +-compile(export_all). +-endif. %%-------------------------------------------------------------------- %% Start and init mnesia @@ -187,8 +190,8 @@ remove_from_cluster(Node) when Node =/= node() -> ensure_ok(del_schema_copy(Node)), ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {true, false} -> - ensure_ok(zenmq_mnesia:del_schema_copy(Node)), - ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [Node])); + ensure_ok(del_schema_copy(Node)), + ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); {false, _} -> {error, node_not_in_cluster} end. @@ -228,14 +231,14 @@ ensure_tab({aborted, {already_exists, _Name}}) -> ok; ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok; ensure_tab({aborted, Error}) -> Error. -%% @doc Wait for mnesia to start, stop or copy tables. +%% @doc Wait for mnesia to start, stop or tables ready. -spec wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}. wait_for(start) -> case mnesia:system_info(is_running) of yes -> ok; no -> {error, mnesia_unexpectedly_stopped}; stopping -> {error, mnesia_unexpectedly_stopping}; - starting -> timer:sleep(500), wait_for(start) + starting -> timer:sleep(1000), wait_for(start) end; wait_for(stop) -> @@ -243,7 +246,7 @@ wait_for(stop) -> no -> ok; yes -> {error, mnesia_unexpectedly_running}; starting -> {error, mnesia_unexpectedly_starting}; - stopping -> timer:sleep(500), wait_for(stop) + stopping -> timer:sleep(1000), wait_for(stop) end; wait_for(tables) -> diff --git a/src/emqttd_trie.erl b/src/emqttd_trie.erl index a6885ea12..f021888d7 100644 --- a/src/emqttd_trie.erl +++ b/src/emqttd_trie.erl @@ -14,9 +14,13 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Topic Trie: [Trie](http://en.wikipedia.org/wiki/Trie) +%% @doc MQTT Topic Trie: +%% [Trie](http://en.wikipedia.org/wiki/Trie) +%% @end -module(emqttd_trie). +-include("emqttd_trie.hrl"). + %% Mnesia Callbacks -export([mnesia/1]). @@ -26,24 +30,6 @@ %% Trie API -export([insert/1, match/1, delete/1, lookup/1]). --type node_id() :: binary() | atom(). - --record(trie_node, { - node_id :: node_id(), - edge_count = 0 :: non_neg_integer(), - topic :: binary() | undefined -}). - --record(trie_edge, { - node_id :: node_id(), - word :: binary() | atom() -}). - --record(trie, { - edge :: #trie_edge{}, - node_id :: node_id() -}). - %%-------------------------------------------------------------------- %% Mnesia Callbacks %%-------------------------------------------------------------------- @@ -72,7 +58,7 @@ mnesia(copy) -> %% Trie API %%-------------------------------------------------------------------- -%% @doc Insert topic to trie tree. +%% @doc Insert topic to trie -spec insert(Topic :: binary()) -> ok. insert(Topic) when is_binary(Topic) -> case mnesia:read(trie_node, Topic) of @@ -112,7 +98,7 @@ delete(Topic) when is_binary(Topic) -> end. %%-------------------------------------------------------------------- -%% Internal functions +%% Internal Functions %%-------------------------------------------------------------------- %% @private diff --git a/test/emqttd_trie_tests.erl b/test/emqttd_trie_tests.erl index 9953e2293..56e8d3f6a 100644 --- a/test/emqttd_trie_tests.erl +++ b/test/emqttd_trie_tests.erl @@ -4,9 +4,9 @@ %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at -%% +%% %% http://www.apache.org/licenses/LICENSE-2.0 -%% +%% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,15 +16,74 @@ -module(emqttd_trie_tests). +-ifdef(TEST). + -include("emqttd.hrl"). --ifdef(TEST). +-include("emqttd_trie.hrl"). -include_lib("eunit/include/eunit.hrl"). -match_test() -> - mnesia:start(), - emqttd_trie:mnesia(boot), +-define(TRIE, emqttd_trie). + +trie_test_() -> + {setup, fun setup/0, fun teardown/1, + [{foreach, fun() -> ok end, fun(_) -> clear() end, + [?_test(t_insert()), + ?_test(t_match()), + ?_test(t_match2()), + ?_test(t_match3()), + ?_test(t_delete()), + ?_test(t_delete2()), + ?_test(t_delete3())] + }]}. + +setup() -> + emqttd_mnesia:ensure_started(), + ?TRIE:mnesia(boot), + ?TRIE:mnesia(copy). + +teardown(_) -> + emqttd_mnesia:ensure_stopped(), + emqttd_mnesia:delete_schema(). + +t_insert() -> + TN = #trie_node{node_id = <<"sensor">>, + edge_count = 3, + topic = <<"sensor">>, + flags = undefined}, + ?assertEqual({atomic, [TN]}, mnesia:transaction( + fun() -> + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/+/#">>), + ?TRIE:insert(<<"sensor/#">>), + ?TRIE:insert(<<"sensor">>), + ?TRIE:insert(<<"sensor">>), + ?TRIE:lookup(<<"sensor">>) + end)). + +t_match() -> + Machted = [<<"sensor/+/#">>, <<"sensor/#">>], + ?assertEqual({atomic, Machted}, mnesia:transaction( + fun() -> + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/+/#">>), + ?TRIE:insert(<<"sensor/#">>), + ?TRIE:match(<<"sensor/1">>) + end)). + +t_match2() -> + Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []}, + ?assertEqual({atomic, Matched}, mnesia:transaction( + fun() -> + ?TRIE:insert(<<"#">>), + ?TRIE:insert(<<"+/#">>), + ?TRIE:insert(<<"+/+/#">>), + {?TRIE:match(<<"a/b/c">>), + ?TRIE:match(<<"$SYS/broker/zenmq">>)} + end)). + +t_match3() -> Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], mnesia:transaction(fun() -> [emqttd_trie:insert(Topic) || Topic <- Topics] end), Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [<<"a/b/c">>]), @@ -32,4 +91,51 @@ match_test() -> SysMatched = mnesia:async_dirty(fun emqttd_trie:match/1, [<<"$SYS/a/b/c">>]), ?assertEqual([<<"$SYS/#">>], SysMatched). +t_delete() -> + TN = #trie_node{node_id = <<"sensor/1">>, + edge_count = 2, + topic = undefined, + flags = undefined}, + ?assertEqual({atomic, [TN]}, mnesia:transaction( + fun() -> + ?TRIE:insert(<<"sensor/1/#">>), + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/1/metric/3">>), + ?TRIE:delete(<<"sensor/1/metric/2">>), + ?TRIE:delete(<<"sensor/1/metric">>), + ?TRIE:delete(<<"sensor/1/metric">>), + ?TRIE:lookup(<<"sensor/1">>) + end)). + +t_delete2() -> + ?assertEqual({atomic, {[], []}}, mnesia:transaction( + fun() -> + ?TRIE:insert(<<"sensor">>), + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/1/metric/3">>), + ?TRIE:delete(<<"sensor">>), + ?TRIE:delete(<<"sensor/1/metric/2">>), + ?TRIE:delete(<<"sensor/1/metric/3">>), + {?TRIE:lookup(<<"sensor">>), + ?TRIE:lookup(<<"sensor/1">>)} + end)). + +t_delete3() -> + ?assertEqual({atomic, {[], []}}, mnesia:transaction( + fun() -> + ?TRIE:insert(<<"sensor/+">>), + ?TRIE:insert(<<"sensor/+/metric/2">>), + ?TRIE:insert(<<"sensor/+/metric/3">>), + ?TRIE:delete(<<"sensor/+/metric/2">>), + ?TRIE:delete(<<"sensor/+/metric/3">>), + ?TRIE:delete(<<"sensor">>), + ?TRIE:delete(<<"sensor/+">>), + ?TRIE:delete(<<"sensor/+/unknown">>), + {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)} + end)). + +clear() -> + mnesia:clear_table(trie), + mnesia:clear_table(trie_node). + -endif.