merge zenmq_trie, zenmq_mnesia, zenmq_boot

This commit is contained in:
Feng 2016-02-13 14:26:40 +08:00
parent bd1e618cc8
commit 3c53cf2b09
4 changed files with 132 additions and 33 deletions

View File

@ -18,6 +18,10 @@
-export([apply_module_attributes/1, all_module_attributes/1]).
-ifdef(TEST).
-compile(export_all).
-endif.
%% only {F, Args}...
apply_module_attributes(Name) ->
[{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} ||
@ -43,7 +47,7 @@ all_module_attributes(Name) ->
%% Copy from rabbit_misc.erl
module_attributes(Module) ->
case catch Module:module_info(attributes) of
{'EXIT', {undef, [{Module, module_info, _} | _]}} ->
{'EXIT', {undef, [{Module, module_info, [attributes], []} | _]}} ->
[];
{'EXIT', Reason} ->
exit(Reason);

View File

@ -30,6 +30,9 @@
%% Schema and tables
-export([copy_schema/1, delete_schema/0, del_schema_copy/1,
create_table/2, copy_table/1, copy_table/2]).
-ifdef(TEST).
-compile(export_all).
-endif.
%%--------------------------------------------------------------------
%% Start and init mnesia
@ -187,8 +190,8 @@ remove_from_cluster(Node) when Node =/= node() ->
ensure_ok(del_schema_copy(Node)),
ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
{true, false} ->
ensure_ok(zenmq_mnesia:del_schema_copy(Node)),
ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [Node]));
ensure_ok(del_schema_copy(Node)),
ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
{false, _} ->
{error, node_not_in_cluster}
end.
@ -228,14 +231,14 @@ ensure_tab({aborted, {already_exists, _Name}}) -> ok;
ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok;
ensure_tab({aborted, Error}) -> Error.
%% @doc Wait for mnesia to start, stop or copy tables.
%% @doc Wait for mnesia to start, stop or tables ready.
-spec wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}.
wait_for(start) ->
case mnesia:system_info(is_running) of
yes -> ok;
no -> {error, mnesia_unexpectedly_stopped};
stopping -> {error, mnesia_unexpectedly_stopping};
starting -> timer:sleep(500), wait_for(start)
starting -> timer:sleep(1000), wait_for(start)
end;
wait_for(stop) ->
@ -243,7 +246,7 @@ wait_for(stop) ->
no -> ok;
yes -> {error, mnesia_unexpectedly_running};
starting -> {error, mnesia_unexpectedly_starting};
stopping -> timer:sleep(500), wait_for(stop)
stopping -> timer:sleep(1000), wait_for(stop)
end;
wait_for(tables) ->

View File

@ -14,9 +14,13 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Topic Trie: [Trie](http://en.wikipedia.org/wiki/Trie)
%% @doc MQTT Topic Trie:
%% [Trie](http://en.wikipedia.org/wiki/Trie)
%% @end
-module(emqttd_trie).
-include("emqttd_trie.hrl").
%% Mnesia Callbacks
-export([mnesia/1]).
@ -26,24 +30,6 @@
%% Trie API
-export([insert/1, match/1, delete/1, lookup/1]).
-type node_id() :: binary() | atom().
-record(trie_node, {
node_id :: node_id(),
edge_count = 0 :: non_neg_integer(),
topic :: binary() | undefined
}).
-record(trie_edge, {
node_id :: node_id(),
word :: binary() | atom()
}).
-record(trie, {
edge :: #trie_edge{},
node_id :: node_id()
}).
%%--------------------------------------------------------------------
%% Mnesia Callbacks
%%--------------------------------------------------------------------
@ -72,7 +58,7 @@ mnesia(copy) ->
%% Trie API
%%--------------------------------------------------------------------
%% @doc Insert topic to trie tree.
%% @doc Insert topic to trie
-spec insert(Topic :: binary()) -> ok.
insert(Topic) when is_binary(Topic) ->
case mnesia:read(trie_node, Topic) of
@ -112,7 +98,7 @@ delete(Topic) when is_binary(Topic) ->
end.
%%--------------------------------------------------------------------
%% Internal functions
%% Internal Functions
%%--------------------------------------------------------------------
%% @private

View File

@ -4,9 +4,9 @@
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -16,15 +16,74 @@
-module(emqttd_trie_tests).
-ifdef(TEST).
-include("emqttd.hrl").
-ifdef(TEST).
-include("emqttd_trie.hrl").
-include_lib("eunit/include/eunit.hrl").
match_test() ->
mnesia:start(),
emqttd_trie:mnesia(boot),
-define(TRIE, emqttd_trie).
trie_test_() ->
{setup, fun setup/0, fun teardown/1,
[{foreach, fun() -> ok end, fun(_) -> clear() end,
[?_test(t_insert()),
?_test(t_match()),
?_test(t_match2()),
?_test(t_match3()),
?_test(t_delete()),
?_test(t_delete2()),
?_test(t_delete3())]
}]}.
setup() ->
emqttd_mnesia:ensure_started(),
?TRIE:mnesia(boot),
?TRIE:mnesia(copy).
teardown(_) ->
emqttd_mnesia:ensure_stopped(),
emqttd_mnesia:delete_schema().
t_insert() ->
TN = #trie_node{node_id = <<"sensor">>,
edge_count = 3,
topic = <<"sensor">>,
flags = undefined},
?assertEqual({atomic, [TN]}, mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/+/#">>),
?TRIE:insert(<<"sensor/#">>),
?TRIE:insert(<<"sensor">>),
?TRIE:insert(<<"sensor">>),
?TRIE:lookup(<<"sensor">>)
end)).
t_match() ->
Machted = [<<"sensor/+/#">>, <<"sensor/#">>],
?assertEqual({atomic, Machted}, mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/+/#">>),
?TRIE:insert(<<"sensor/#">>),
?TRIE:match(<<"sensor/1">>)
end)).
t_match2() ->
Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []},
?assertEqual({atomic, Matched}, mnesia:transaction(
fun() ->
?TRIE:insert(<<"#">>),
?TRIE:insert(<<"+/#">>),
?TRIE:insert(<<"+/+/#">>),
{?TRIE:match(<<"a/b/c">>),
?TRIE:match(<<"$SYS/broker/zenmq">>)}
end)).
t_match3() ->
Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
mnesia:transaction(fun() -> [emqttd_trie:insert(Topic) || Topic <- Topics] end),
Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [<<"a/b/c">>]),
@ -32,4 +91,51 @@ match_test() ->
SysMatched = mnesia:async_dirty(fun emqttd_trie:match/1, [<<"$SYS/a/b/c">>]),
?assertEqual([<<"$SYS/#">>], SysMatched).
t_delete() ->
TN = #trie_node{node_id = <<"sensor/1">>,
edge_count = 2,
topic = undefined,
flags = undefined},
?assertEqual({atomic, [TN]}, mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/1/#">>),
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/1/metric/3">>),
?TRIE:delete(<<"sensor/1/metric/2">>),
?TRIE:delete(<<"sensor/1/metric">>),
?TRIE:delete(<<"sensor/1/metric">>),
?TRIE:lookup(<<"sensor/1">>)
end)).
t_delete2() ->
?assertEqual({atomic, {[], []}}, mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor">>),
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/1/metric/3">>),
?TRIE:delete(<<"sensor">>),
?TRIE:delete(<<"sensor/1/metric/2">>),
?TRIE:delete(<<"sensor/1/metric/3">>),
{?TRIE:lookup(<<"sensor">>),
?TRIE:lookup(<<"sensor/1">>)}
end)).
t_delete3() ->
?assertEqual({atomic, {[], []}}, mnesia:transaction(
fun() ->
?TRIE:insert(<<"sensor/+">>),
?TRIE:insert(<<"sensor/+/metric/2">>),
?TRIE:insert(<<"sensor/+/metric/3">>),
?TRIE:delete(<<"sensor/+/metric/2">>),
?TRIE:delete(<<"sensor/+/metric/3">>),
?TRIE:delete(<<"sensor">>),
?TRIE:delete(<<"sensor/+">>),
?TRIE:delete(<<"sensor/+/unknown">>),
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)}
end)).
clear() ->
mnesia:clear_table(trie),
mnesia:clear_table(trie_node).
-endif.