From cca5081e021c509dbdba07bb9037e9321f8cf546 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 10 Dec 2018 18:37:42 +0800 Subject: [PATCH] Improve the design of trie, router and broker modules 1. Add do_add_route/1 do_add_route/2, do_delete_route/1, do_delete_route/2 APIs in emqx_router module 2. Improve the code of emqx_trie module 3. Update the emqx_broker module to call the new APIs of emqx_router --- src/emqx_broker.erl | 30 ++++----- src/emqx_router.erl | 134 ++++++++++++++++++------------------- src/emqx_router_helper.erl | 45 +++++++------ src/emqx_shared_sub.erl | 7 +- src/emqx_trie.erl | 18 ++--- test/emqx_router_SUITE.erl | 103 ++++++++++------------------ test/emqx_trie_SUITE.erl | 113 ++++++++++++++++--------------- 7 files changed, 208 insertions(+), 242 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index a2ddf5b60..a00dc17b8 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -27,6 +27,7 @@ -export([subscriptions/1, subscribers/1, subscribed/2]). -export([get_subopts/2, set_subopts/2]). -export([topics/0]). + %% Stats fun -export([stats_fun/0]). @@ -52,9 +53,9 @@ -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> - _ = create_tabs(), - Name = emqx_misc:proc_name(?BROKER, Id), - gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], []). + ok = create_tabs(), + gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)}, + ?MODULE, [Pool, Id], []). %%------------------------------------------------------------------------------ %% Create tabs @@ -75,7 +76,7 @@ create_tabs() -> ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... - %% duplicate_bag: o(1) insert + %% bag: o(n) insert ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ @@ -114,7 +115,7 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map call(pick({Topic, Shard}), {subscribe, Topic}); Group -> %% Shard subscription true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - emqx_shard_sub:subscribe(Group, Topic, SubPid) + emqx_shared_sub:subscribe(Group, Topic, SubPid) end; true -> ok end. @@ -367,18 +368,17 @@ pick(Topic) -> %%------------------------------------------------------------------------------ init([Pool, Id]) -> - _ = emqx_router:set_mode(protected), true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. handle_call({subscribe, Topic}, _From, State) -> - case get(Topic) of - undefined -> - _ = put(Topic, true), - emqx_router:add_route(Topic); - true -> ok - end, - {reply, ok, State}; + Ok = case get(Topic) of + undefined -> + _ = put(Topic, true), + emqx_router:do_add_route(Topic); + true -> ok + end, + {reply, Ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), @@ -387,8 +387,8 @@ handle_call(Req, _From, State) -> handle_cast({unsubscribed, Topic}, State) -> case ets:member(?SUBSCRIBER, Topic) of false -> - _ = erase(Topic), - emqx_router:delete_route(Topic); + _ = erase(Topic), + emqx_router:do_delete_route(Topic); true -> ok end, {noreply, State}; diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 313adc475..c165b97ca 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -29,19 +29,19 @@ %% Route APIs -export([add_route/1, add_route/2]). --export([get_routes/1]). +-export([do_add_route/1, do_add_route/2]). +-export([match_routes/1, lookup_routes/1, has_routes/1]). -export([delete_route/1, delete_route/2]). --export([has_routes/1, match_routes/1, print_routes/1]). +-export([do_delete_route/1, do_delete_route/2]). +-export([print_routes/1]). -export([topics/0]). -%% Mode --export([set_mode/1, get_mode/0]). - %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --type(destination() :: node() | {binary(), node()}). +-type(group() :: binary()). +-type(destination() :: node() | {group(), node()}). -define(ROUTE, emqx_route). @@ -66,75 +66,76 @@ mnesia(copy) -> -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> - Name = emqx_misc:proc_name(?MODULE, Id), - gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %%------------------------------------------------------------------------------ %% Route APIs %%------------------------------------------------------------------------------ --spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}). +-spec(add_route(emqx_topic:topic()) -> ok | {error, term()}). add_route(Topic) when is_binary(Topic) -> - add_route(#route{topic = Topic, dest = node()}); -add_route(Route = #route{topic = Topic}) -> - case get_mode() of - protected -> do_add_route(Route); - undefined -> call(pick(Topic), {add_route, Route}) - end. + add_route(Topic, node()). -spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). add_route(Topic, Dest) when is_binary(Topic) -> - add_route(#route{topic = Topic, dest = Dest}). + call(pick(Topic), {add_route, Topic, Dest}). -%% @private -do_add_route(Route = #route{topic = Topic, dest = Dest}) -> - case lists:member(Route, get_routes(Topic)) of +-spec(do_add_route(emqx_topic:topic()) -> ok | {error, term()}). +do_add_route(Topic) when is_binary(Topic) -> + do_add_route(Topic, node()). + +-spec(do_add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +do_add_route(Topic, Dest) when is_binary(Topic) -> + Route = #route{topic = Topic, dest = Dest}, + case lists:member(Route, lookup_routes(Topic)) of true -> ok; false -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of - true -> trans(fun add_trie_route/1, [Route]); - false -> add_direct_route(Route) + true -> trans(fun insert_trie_route/1, [Route]); + false -> insert_direct_route(Route) end end. --spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]). -get_routes(Topic) -> +%% @doc Match routes +-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). +match_routes(Topic) when is_binary(Topic) -> + %% Optimize: routing table will be replicated to all router nodes. + Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), + lists:append([lookup_routes(To) || To <- [Topic | Matched]]). + +-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]). +lookup_routes(Topic) -> ets:lookup(?ROUTE, Topic). --spec(delete_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}). -delete_route(Topic) when is_binary(Topic) -> - delete_route(#route{topic = Topic, dest = node()}); -delete_route(Route = #route{topic = Topic}) -> - case get_mode() of - protected -> do_delete_route(Route); - undefined -> call(pick(Topic), {delete_route, Route}) - end. - --spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). -delete_route(Topic, Dest) when is_binary(Topic) -> - delete_route(#route{topic = Topic, dest = Dest}). - -%% @private -do_delete_route(Route = #route{topic = Topic}) -> - case emqx_topic:wildcard(Topic) of - true -> trans(fun del_trie_route/1, [Route]); - false -> del_direct_route(Route) - end. - -spec(has_routes(emqx_topic:topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> ets:member(?ROUTE, Topic). --spec(topics() -> list(emqx_topic:topic())). -topics() -> mnesia:dirty_all_keys(?ROUTE). +-spec(delete_route(emqx_topic:topic()) -> ok | {error, term()}). +delete_route(Topic) when is_binary(Topic) -> + delete_route(Topic, node()). -%% @doc Match routes -%% Optimize: routing table will be replicated to all router nodes. --spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). -match_routes(Topic) when is_binary(Topic) -> - Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]), - lists:append([get_routes(To) || To <- [Topic | Matched]]). +-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +delete_route(Topic, Dest) when is_binary(Topic) -> + call(pick(Topic), {delete_route, Topic, Dest}). + +-spec(do_delete_route(emqx_topic:topic()) -> ok | {error, term()}). +do_delete_route(Topic) when is_binary(Topic) -> + do_delete_route(Topic, node()). + +-spec(do_delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +do_delete_route(Topic, Dest) -> + Route = #route{topic = Topic, dest = Dest}, + case emqx_topic:wildcard(Topic) of + true -> trans(fun delete_trie_route/1, [Route]); + false -> delete_direct_route(Route) + end. + +-spec(topics() -> list(emqx_topic:topic())). +topics() -> + mnesia:dirty_all_keys(?ROUTE). %% @doc Print routes to a topic -spec(print_routes(emqx_topic:topic()) -> ok). @@ -143,13 +144,6 @@ print_routes(Topic) -> io:format("~s -> ~s~n", [To, Dest]) end, match_routes(Topic)). --spec(set_mode(protected | atom()) -> any()). -set_mode(Mode) when is_atom(Mode) -> - put('$router_mode', Mode). - --spec(get_mode() -> protected | undefined | atom()). -get_mode() -> get('$router_mode'). - call(Router, Msg) -> gen_server:call(Router, Msg, infinity). @@ -164,11 +158,13 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({add_route, Route}, _From, State) -> - {reply, do_add_route(Route), State}; +handle_call({add_route, Topic, Dest}, _From, State) -> + Ok = do_add_route(Topic, Dest), + {reply, Ok, State}; -handle_call({delete_route, Route}, _From, State) -> - {reply, do_delete_route(Route), State}; +handle_call({delete_route, Topic, Dest}, _From, State) -> + Ok = do_delete_route(Topic, Dest), + {reply, Ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Router] unexpected call: ~p", [Req]), @@ -192,23 +188,23 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -add_direct_route(Route) -> +insert_direct_route(Route) -> mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]). -add_trie_route(Route = #route{topic = Topic}) -> +insert_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE, Topic}) of [] -> emqx_trie:insert(Topic); _ -> ok end, mnesia:write(?ROUTE, Route, sticky_write). -del_direct_route(Route) -> +delete_direct_route(Route) -> mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]). -del_trie_route(Route = #route{topic = Topic}) -> +delete_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE, Topic}) of [Route] -> %% Remove route and trie - mnesia:delete_object(?ROUTE, Route, sticky_write), + ok = mnesia:delete_object(?ROUTE, Route, sticky_write), emqx_trie:delete(Topic); [_|_] -> %% Remove route only mnesia:delete_object(?ROUTE, Route, sticky_write); @@ -219,7 +215,7 @@ del_trie_route(Route = #route{topic = Topic}) -> -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> case mnesia:transaction(Fun, Args) of - {atomic, _} -> ok; - {aborted, Error} -> {error, Error} + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} end. diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index c24b10715..efeaabc74 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -31,15 +31,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% internal export +%% Internal export -export([stats_fun/0]). -record(routing_node, {name, const = unused}). --record(state, {nodes = []}). --compile({no_auto_import, [monitor/1]}). - --define(SERVER, ?MODULE). -define(ROUTE, emqx_route). -define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). @@ -64,9 +60,9 @@ mnesia(copy) -> %%------------------------------------------------------------------------------ %% @doc Starts the router helper --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). %% @doc Monitor routing node -spec(monitor(node() | {binary(), node()}) -> ok). @@ -84,18 +80,18 @@ monitor(Node) when is_atom(Node) -> %%------------------------------------------------------------------------------ init([]) -> - _ = ekka:monitor(membership), - _ = mnesia:subscribe({table, ?ROUTING_NODE, simple}), + ok = ekka:monitor(membership), + {ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}), Nodes = lists:foldl( fun(Node, Acc) -> case ekka:is_member(Node) of true -> Acc; - false -> _ = erlang:monitor_node(Node, true), + false -> true = erlang:monitor_node(Node, true), [Node | Acc] end end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), - {ok, #state{nodes = Nodes}, hibernate}. + {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> emqx_logger:error("[RouterHelper] unexpected call: ~p", [Req]), @@ -105,24 +101,29 @@ handle_cast(Msg, State) -> emqx_logger:error("[RouterHelper] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({mnesia_table_event, {write, #routing_node{name = Node}, _}}, State = #state{nodes = Nodes}) -> - emqx_logger:info("[RouterHelper] write routing node: ~s", [Node]), +handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) -> case ekka:is_member(Node) orelse lists:member(Node, Nodes) of - true -> {noreply, State}; - false -> _ = erlang:monitor_node(Node, true), - {noreply, State#state{nodes = [Node | Nodes]}} + true -> {noreply, State}; + false -> + true = erlang:monitor_node(Node, true), + {noreply, State#{nodes := [Node | Nodes]}} end; -handle_info({mnesia_table_event, _Event}, State) -> +handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) -> + %% ignore {noreply, State}; -handle_info({nodedown, Node}, State = #state{nodes = Nodes}) -> +handle_info({mnesia_table_event, Event}, State) -> + emqx_logger:error("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]), + {noreply, State}; + +handle_info({nodedown, Node}, State = #{nodes := Nodes}) -> global:trans({?LOCK, self()}, fun() -> mnesia:transaction(fun cleanup_routes/1, [Node]) end), - mnesia:dirty_delete(?ROUTING_NODE, Node), - {noreply, State#state{nodes = lists:delete(Node, Nodes)}, hibernate}; + ok = mnesia:dirty_delete(?ROUTING_NODE, Node), + {noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate}; handle_info({membership, {mnesia, down, Node}}, State) -> handle_info({nodedown, Node}, State); @@ -134,8 +135,8 @@ handle_info(Info, State) -> emqx_logger:error("[RouteHelper] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{}) -> - ekka:unmonitor(membership), +terminate(_Reason, _State) -> + ok = ekka:unmonitor(membership), emqx_stats:cancel_update(route_stats), mnesia:unsubscribe({table, ?ROUTING_NODE, simple}). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index d1d0d921d..b7e41213b 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -252,7 +252,6 @@ subscribers(Group, Topic) -> %%------------------------------------------------------------------------------ init([]) -> - _ = emqx_router:set_mode(protected), mnesia:subscribe({table, ?TAB, simple}), {atomic, PMon} = mnesia:transaction(fun init_monitors/0), ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), @@ -269,7 +268,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; - false -> ok = emqx_router:add_route(Topic, {Group, node()}) + false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) end, ok = maybe_insert_alive_tab(SubPid), true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), @@ -280,7 +279,7 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; - false -> ok = emqx_router:delete_route(Topic, {Group, node()}) + false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end, {reply, ok, State}; @@ -334,7 +333,7 @@ cleanup_down(SubPid) -> true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), case ets:member(?SHARED_SUBS, {Group, Topic}) of true -> ok; - false -> ok = emqx_router:delete_route(Topic, {Group, node()}) + false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) end end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index 79f6042b7..27ff52827 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -36,7 +36,7 @@ %% @doc Create or replicate trie tables. -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> - %% Optimize + %% Optimize storage StoreProps = [{ets, [{read_concurrency, true}, {write_concurrency, true}]}], %% Trie table @@ -72,7 +72,7 @@ insert(Topic) when is_binary(Topic) -> write_trie_node(TrieNode#trie_node{topic = Topic}); [] -> %% Add trie path - lists:foreach(fun add_path/1, emqx_topic:triples(Topic)), + ok = lists:foreach(fun add_path/1, emqx_topic:triples(Topic)), %% Add last node write_trie_node(#trie_node{node_id = Topic, topic = Topic}) end. @@ -93,7 +93,7 @@ lookup(NodeId) -> delete(Topic) when is_binary(Topic) -> case mnesia:wread({?TRIE_NODE, Topic}) of [#trie_node{edge_count = 0}] -> - mnesia:delete({?TRIE_NODE, Topic}), + ok = mnesia:delete({?TRIE_NODE, Topic}), delete_path(lists:reverse(emqx_topic:triples(Topic))); [TrieNode] -> write_trie_node(TrieNode#trie_node{topic = undefined}); @@ -112,12 +112,12 @@ add_path({Node, Word, Child}) -> [TrieNode = #trie_node{edge_count = Count}] -> case mnesia:wread({?TRIE, Edge}) of [] -> - write_trie_node(TrieNode#trie_node{edge_count = Count + 1}), + ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}), write_trie(#trie{edge = Edge, node_id = Child}); [_] -> ok end; [] -> - write_trie_node(#trie_node{node_id = Node, edge_count = 1}), + ok = write_trie_node(#trie_node{node_id = Node, edge_count = 1}), write_trie(#trie{edge = Edge, node_id = Child}) end. @@ -154,10 +154,10 @@ match_node(NodeId, [W|Words], ResAcc) -> delete_path([]) -> ok; delete_path([{NodeId, Word, _} | RestPath]) -> - mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}), - case mnesia:read(?TRIE_NODE, NodeId) of + ok = mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}), + case mnesia:wread({?TRIE_NODE, NodeId}) of [#trie_node{edge_count = 1, topic = undefined}] -> - mnesia:delete({?TRIE_NODE, NodeId}), + ok = mnesia:delete({?TRIE_NODE, NodeId}), delete_path(RestPath); [TrieNode = #trie_node{edge_count = 1, topic = _}] -> write_trie_node(TrieNode#trie_node{edge_count = 0}); @@ -167,9 +167,11 @@ delete_path([{NodeId, Word, _} | RestPath]) -> mnesia:abort({node_not_found, NodeId}) end. +%% @private write_trie(Trie) -> mnesia:write(?TRIE, Trie, write). +%% @private write_trie_node(TrieNode) -> mnesia:write(?TRIE_NODE, TrieNode, write). diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index e317ec7b3..c115fd0cd 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -21,17 +21,16 @@ -compile(nowarn_export_all). -define(R, emqx_router). --define(TABS, [emqx_route, emqx_trie, emqx_trie_node]). all() -> [{group, route}]. groups() -> [{route, [sequence], - [add_del_route, - match_routes, - has_routes, - router_add_del]}]. + [t_add_delete, + t_do_add_delete, + t_match_routes, + t_has_routes]}]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), @@ -47,77 +46,47 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> clear_tables(). -add_del_route(_) -> - From = {self(), make_ref()}, - ?R:add_route(From, <<"a/b/c">>, node()), - timer:sleep(1), - - ?R:add_route(From, <<"a/b/c">>, node()), - timer:sleep(1), - - ?R:add_route(From, <<"a/+/b">>, node()), - ct:log("Topics: ~p ~n", [emqx_topic:wildcard(<<"a/+/b">>)]), - timer:sleep(1), - +t_add_delete(_) -> + ?R:add_route(<<"a/b/c">>, node()), + ?R:add_route(<<"a/b/c">>, node()), + ?R:add_route(<<"a/+/b">>, node()), ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), - ?R:del_route(From, <<"a/b/c">>, node()), + ?R:delete_route(<<"a/b/c">>), + ?R:delete_route(<<"a/+/b">>, node()), + ?assertEqual([], ?R:topics()). - ?R:del_route(From, <<"a/+/b">>, node()), - timer:sleep(120), - ?assertEqual([], lists:sort(?R:topics())). +t_do_add_delete(_) -> + ?R:do_add_route(<<"a/b/c">>, node()), + ?R:do_add_route(<<"a/b/c">>, node()), + ?R:do_add_route(<<"a/+/b">>, node()), + ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), -match_routes(_) -> - From = {self(), make_ref()}, - ?R:add_route(From, <<"a/b/c">>, node()), - ?R:add_route(From, <<"a/+/c">>, node()), - ?R:add_route(From, <<"a/b/#">>, node()), - ?R:add_route(From, <<"#">>, node()), - timer:sleep(1000), + ?R:do_delete_route(<<"a/b/c">>, node()), + ?R:do_delete_route(<<"a/+/b">>), + ?assertEqual([], ?R:topics()). + +t_match_routes(_) -> + ?R:add_route(<<"a/b/c">>, node()), + ?R:add_route(<<"a/+/c">>, node()), + ?R:add_route(<<"a/b/#">>, node()), + ?R:add_route(<<"#">>, node()), ?assertEqual([#route{topic = <<"#">>, dest = node()}, #route{topic = <<"a/+/c">>, dest = node()}, #route{topic = <<"a/b/#">>, dest = node()}, #route{topic = <<"a/b/c">>, dest = node()}], - lists:sort(?R:match_routes(<<"a/b/c">>))). + lists:sort(?R:match_routes(<<"a/b/c">>))), + ?R:delete_route(<<"a/b/c">>, node()), + ?R:delete_route(<<"a/+/c">>, node()), + ?R:delete_route(<<"a/b/#">>, node()), + ?R:delete_route(<<"#">>, node()), + ?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))). -has_routes(_) -> - From = {self(), make_ref()}, - ?R:add_route(From, <<"devices/+/messages">>, node()), - timer:sleep(200), - ?assert(?R:has_routes(<<"devices/+/messages">>)). +t_has_routes(_) -> + ?R:add_route(<<"devices/+/messages">>, node()), + ?assert(?R:has_routes(<<"devices/+/messages">>)), + ?R:delete_route(<<"devices/+/messages">>). clear_tables() -> - lists:foreach(fun mnesia:clear_table/1, ?TABS). - -router_add_del(_) -> - ?R:add_route(<<"#">>), - ?R:add_route(<<"a/b/c">>, node()), - ?R:add_route(<<"+/#">>), - Routes = [R1, R2 | _] = [ - #route{topic = <<"#">>, dest = node()}, - #route{topic = <<"+/#">>, dest = node()}, - #route{topic = <<"a/b/c">>, dest = node()}], - timer:sleep(500), - ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), - - ?R:print_routes(<<"a/b/c">>), - - %% Batch Add - lists:foreach(fun(R) -> ?R:add_route(R) end, Routes), - ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))), - - %% Del - ?R:del_route(<<"a/b/c">>, node()), - timer:sleep(500), - [R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)), - {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]), - - %% Batch Del - R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'}, - ?R:add_route(R3), - ?R:del_route(<<"#">>), - ?R:del_route(R2), - ?R:del_route(R3), - timer:sleep(500), - [] = lists:sort(?R:match_routes(<<"a/b/c">>)). + lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]). diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 85637a447..09226979f 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -47,36 +47,36 @@ t_insert(_) -> edge_count = 3, topic = <<"sensor">>, flags = undefined}, - {atomic, [TN]} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/+/#">>), - ?TRIE:insert(<<"sensor/#">>), - ?TRIE:insert(<<"sensor">>), - ?TRIE:insert(<<"sensor">>), - ?TRIE:lookup(<<"sensor">>) - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/+/#">>), + ?TRIE:insert(<<"sensor/#">>), + ?TRIE:insert(<<"sensor">>), + ?TRIE:insert(<<"sensor">>), + ?TRIE:lookup(<<"sensor">>) + end, + ?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)). t_match(_) -> Machted = [<<"sensor/+/#">>, <<"sensor/#">>], - {atomic, Machted} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/+/#">>), - ?TRIE:insert(<<"sensor/#">>), - ?TRIE:match(<<"sensor/1">>) - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/+/#">>), + ?TRIE:insert(<<"sensor/#">>), + ?TRIE:match(<<"sensor/1">>) + end, + ?assertEqual({atomic, Machted}, mnesia:transaction(Fun)). t_match2(_) -> Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []}, - {atomic, Matched} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"#">>), - ?TRIE:insert(<<"+/#">>), - ?TRIE:insert(<<"+/+/#">>), - {?TRIE:match(<<"a/b/c">>), - ?TRIE:match(<<"$SYS/broker/zenmq">>)} - end). + Fun = fun() -> + ?TRIE:insert(<<"#">>), + ?TRIE:insert(<<"+/#">>), + ?TRIE:insert(<<"+/+/#">>), + {?TRIE:match(<<"a/b/c">>), + ?TRIE:match(<<"$SYS/broker/zenmq">>)} + end, + ?assertEqual({atomic, Matched}, mnesia:transaction(Fun)). t_match3(_) -> Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>], @@ -91,43 +91,42 @@ t_delete(_) -> edge_count = 2, topic = undefined, flags = undefined}, - {atomic, [TN]} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/1/#">>), - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/1/metric/3">>), - ?TRIE:delete(<<"sensor/1/metric/2">>), - ?TRIE:delete(<<"sensor/1/metric">>), - ?TRIE:delete(<<"sensor/1/metric">>), - ?TRIE:lookup(<<"sensor/1">>) - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/1/#">>), + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/1/metric/3">>), + ?TRIE:delete(<<"sensor/1/metric/2">>), + ?TRIE:delete(<<"sensor/1/metric">>), + ?TRIE:delete(<<"sensor/1/metric">>), + ?TRIE:lookup(<<"sensor/1">>) + end, + ?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)). t_delete2(_) -> - {atomic, {[], []}} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor">>), - ?TRIE:insert(<<"sensor/1/metric/2">>), - ?TRIE:insert(<<"sensor/1/metric/3">>), - ?TRIE:delete(<<"sensor">>), - ?TRIE:delete(<<"sensor/1/metric/2">>), - ?TRIE:delete(<<"sensor/1/metric/3">>), - {?TRIE:lookup(<<"sensor">>), - ?TRIE:lookup(<<"sensor/1">>)} - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor">>), + ?TRIE:insert(<<"sensor/1/metric/2">>), + ?TRIE:insert(<<"sensor/1/metric/3">>), + ?TRIE:delete(<<"sensor">>), + ?TRIE:delete(<<"sensor/1/metric/2">>), + ?TRIE:delete(<<"sensor/1/metric/3">>), + {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/1">>)} + end, + ?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)). t_delete3(_) -> - {atomic, {[], []}} = mnesia:transaction( - fun() -> - ?TRIE:insert(<<"sensor/+">>), - ?TRIE:insert(<<"sensor/+/metric/2">>), - ?TRIE:insert(<<"sensor/+/metric/3">>), - ?TRIE:delete(<<"sensor/+/metric/2">>), - ?TRIE:delete(<<"sensor/+/metric/3">>), - ?TRIE:delete(<<"sensor">>), - ?TRIE:delete(<<"sensor/+">>), - ?TRIE:delete(<<"sensor/+/unknown">>), - {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)} - end). + Fun = fun() -> + ?TRIE:insert(<<"sensor/+">>), + ?TRIE:insert(<<"sensor/+/metric/2">>), + ?TRIE:insert(<<"sensor/+/metric/3">>), + ?TRIE:delete(<<"sensor/+/metric/2">>), + ?TRIE:delete(<<"sensor/+/metric/3">>), + ?TRIE:delete(<<"sensor">>), + ?TRIE:delete(<<"sensor/+">>), + ?TRIE:delete(<<"sensor/+/unknown">>), + {?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)} + end, + ?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)). clear_tables() -> lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS).