Improve the design of trie, router and broker modules
1. Add do_add_route/1 do_add_route/2, do_delete_route/1, do_delete_route/2 APIs in emqx_router module 2. Improve the code of emqx_trie module 3. Update the emqx_broker module to call the new APIs of emqx_router
This commit is contained in:
parent
faac09eac9
commit
33830d8120
|
@ -27,6 +27,7 @@
|
|||
-export([subscriptions/1, subscribers/1, subscribed/2]).
|
||||
-export([get_subopts/2, set_subopts/2]).
|
||||
-export([topics/0]).
|
||||
|
||||
%% Stats fun
|
||||
-export([stats_fun/0]).
|
||||
|
||||
|
@ -52,9 +53,9 @@
|
|||
|
||||
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
|
||||
start_link(Pool, Id) ->
|
||||
_ = create_tabs(),
|
||||
Name = emqx_misc:proc_name(?BROKER, Id),
|
||||
gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], []).
|
||||
ok = create_tabs(),
|
||||
gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)},
|
||||
?MODULE, [Pool, Id], []).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Create tabs
|
||||
|
@ -75,7 +76,7 @@ create_tabs() ->
|
|||
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
|
||||
|
||||
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
|
||||
%% duplicate_bag: o(1) insert
|
||||
%% bag: o(n) insert
|
||||
ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -114,7 +115,7 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map
|
|||
call(pick({Topic, Shard}), {subscribe, Topic});
|
||||
Group -> %% Shard subscription
|
||||
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
||||
emqx_shard_sub:subscribe(Group, Topic, SubPid)
|
||||
emqx_shared_sub:subscribe(Group, Topic, SubPid)
|
||||
end;
|
||||
true -> ok
|
||||
end.
|
||||
|
@ -367,18 +368,17 @@ pick(Topic) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([Pool, Id]) ->
|
||||
_ = emqx_router:set_mode(protected),
|
||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||
{ok, #{pool => Pool, id => Id}}.
|
||||
|
||||
handle_call({subscribe, Topic}, _From, State) ->
|
||||
case get(Topic) of
|
||||
undefined ->
|
||||
_ = put(Topic, true),
|
||||
emqx_router:add_route(Topic);
|
||||
true -> ok
|
||||
end,
|
||||
{reply, ok, State};
|
||||
Ok = case get(Topic) of
|
||||
undefined ->
|
||||
_ = put(Topic, true),
|
||||
emqx_router:do_add_route(Topic);
|
||||
true -> ok
|
||||
end,
|
||||
{reply, Ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
|
||||
|
@ -387,8 +387,8 @@ handle_call(Req, _From, State) ->
|
|||
handle_cast({unsubscribed, Topic}, State) ->
|
||||
case ets:member(?SUBSCRIBER, Topic) of
|
||||
false ->
|
||||
_ = erase(Topic),
|
||||
emqx_router:delete_route(Topic);
|
||||
_ = erase(Topic),
|
||||
emqx_router:do_delete_route(Topic);
|
||||
true -> ok
|
||||
end,
|
||||
{noreply, State};
|
||||
|
|
|
@ -29,19 +29,19 @@
|
|||
|
||||
%% Route APIs
|
||||
-export([add_route/1, add_route/2]).
|
||||
-export([get_routes/1]).
|
||||
-export([do_add_route/1, do_add_route/2]).
|
||||
-export([match_routes/1, lookup_routes/1, has_routes/1]).
|
||||
-export([delete_route/1, delete_route/2]).
|
||||
-export([has_routes/1, match_routes/1, print_routes/1]).
|
||||
-export([do_delete_route/1, do_delete_route/2]).
|
||||
-export([print_routes/1]).
|
||||
-export([topics/0]).
|
||||
|
||||
%% Mode
|
||||
-export([set_mode/1, get_mode/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-type(destination() :: node() | {binary(), node()}).
|
||||
-type(group() :: binary()).
|
||||
-type(destination() :: node() | {group(), node()}).
|
||||
|
||||
-define(ROUTE, emqx_route).
|
||||
|
||||
|
@ -66,75 +66,76 @@ mnesia(copy) ->
|
|||
|
||||
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
|
||||
start_link(Pool, Id) ->
|
||||
Name = emqx_misc:proc_name(?MODULE, Id),
|
||||
gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
|
||||
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
|
||||
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Route APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}).
|
||||
-spec(add_route(emqx_topic:topic()) -> ok | {error, term()}).
|
||||
add_route(Topic) when is_binary(Topic) ->
|
||||
add_route(#route{topic = Topic, dest = node()});
|
||||
add_route(Route = #route{topic = Topic}) ->
|
||||
case get_mode() of
|
||||
protected -> do_add_route(Route);
|
||||
undefined -> call(pick(Topic), {add_route, Route})
|
||||
end.
|
||||
add_route(Topic, node()).
|
||||
|
||||
-spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
add_route(Topic, Dest) when is_binary(Topic) ->
|
||||
add_route(#route{topic = Topic, dest = Dest}).
|
||||
call(pick(Topic), {add_route, Topic, Dest}).
|
||||
|
||||
%% @private
|
||||
do_add_route(Route = #route{topic = Topic, dest = Dest}) ->
|
||||
case lists:member(Route, get_routes(Topic)) of
|
||||
-spec(do_add_route(emqx_topic:topic()) -> ok | {error, term()}).
|
||||
do_add_route(Topic) when is_binary(Topic) ->
|
||||
do_add_route(Topic, node()).
|
||||
|
||||
-spec(do_add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||
Route = #route{topic = Topic, dest = Dest},
|
||||
case lists:member(Route, lookup_routes(Topic)) of
|
||||
true -> ok;
|
||||
false ->
|
||||
ok = emqx_router_helper:monitor(Dest),
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true -> trans(fun add_trie_route/1, [Route]);
|
||||
false -> add_direct_route(Route)
|
||||
true -> trans(fun insert_trie_route/1, [Route]);
|
||||
false -> insert_direct_route(Route)
|
||||
end
|
||||
end.
|
||||
|
||||
-spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||
get_routes(Topic) ->
|
||||
%% @doc Match routes
|
||||
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||
match_routes(Topic) when is_binary(Topic) ->
|
||||
%% Optimize: routing table will be replicated to all router nodes.
|
||||
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
|
||||
lists:append([lookup_routes(To) || To <- [Topic | Matched]]).
|
||||
|
||||
-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||
lookup_routes(Topic) ->
|
||||
ets:lookup(?ROUTE, Topic).
|
||||
|
||||
-spec(delete_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}).
|
||||
delete_route(Topic) when is_binary(Topic) ->
|
||||
delete_route(#route{topic = Topic, dest = node()});
|
||||
delete_route(Route = #route{topic = Topic}) ->
|
||||
case get_mode() of
|
||||
protected -> do_delete_route(Route);
|
||||
undefined -> call(pick(Topic), {delete_route, Route})
|
||||
end.
|
||||
|
||||
-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
delete_route(Topic, Dest) when is_binary(Topic) ->
|
||||
delete_route(#route{topic = Topic, dest = Dest}).
|
||||
|
||||
%% @private
|
||||
do_delete_route(Route = #route{topic = Topic}) ->
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true -> trans(fun del_trie_route/1, [Route]);
|
||||
false -> del_direct_route(Route)
|
||||
end.
|
||||
|
||||
-spec(has_routes(emqx_topic:topic()) -> boolean()).
|
||||
has_routes(Topic) when is_binary(Topic) ->
|
||||
ets:member(?ROUTE, Topic).
|
||||
|
||||
-spec(topics() -> list(emqx_topic:topic())).
|
||||
topics() -> mnesia:dirty_all_keys(?ROUTE).
|
||||
-spec(delete_route(emqx_topic:topic()) -> ok | {error, term()}).
|
||||
delete_route(Topic) when is_binary(Topic) ->
|
||||
delete_route(Topic, node()).
|
||||
|
||||
%% @doc Match routes
|
||||
%% Optimize: routing table will be replicated to all router nodes.
|
||||
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||
match_routes(Topic) when is_binary(Topic) ->
|
||||
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
|
||||
lists:append([get_routes(To) || To <- [Topic | Matched]]).
|
||||
-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
delete_route(Topic, Dest) when is_binary(Topic) ->
|
||||
call(pick(Topic), {delete_route, Topic, Dest}).
|
||||
|
||||
-spec(do_delete_route(emqx_topic:topic()) -> ok | {error, term()}).
|
||||
do_delete_route(Topic) when is_binary(Topic) ->
|
||||
do_delete_route(Topic, node()).
|
||||
|
||||
-spec(do_delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||
do_delete_route(Topic, Dest) ->
|
||||
Route = #route{topic = Topic, dest = Dest},
|
||||
case emqx_topic:wildcard(Topic) of
|
||||
true -> trans(fun delete_trie_route/1, [Route]);
|
||||
false -> delete_direct_route(Route)
|
||||
end.
|
||||
|
||||
-spec(topics() -> list(emqx_topic:topic())).
|
||||
topics() ->
|
||||
mnesia:dirty_all_keys(?ROUTE).
|
||||
|
||||
%% @doc Print routes to a topic
|
||||
-spec(print_routes(emqx_topic:topic()) -> ok).
|
||||
|
@ -143,13 +144,6 @@ print_routes(Topic) ->
|
|||
io:format("~s -> ~s~n", [To, Dest])
|
||||
end, match_routes(Topic)).
|
||||
|
||||
-spec(set_mode(protected | atom()) -> any()).
|
||||
set_mode(Mode) when is_atom(Mode) ->
|
||||
put('$router_mode', Mode).
|
||||
|
||||
-spec(get_mode() -> protected | undefined | atom()).
|
||||
get_mode() -> get('$router_mode').
|
||||
|
||||
call(Router, Msg) ->
|
||||
gen_server:call(Router, Msg, infinity).
|
||||
|
||||
|
@ -164,11 +158,13 @@ init([Pool, Id]) ->
|
|||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||
{ok, #{pool => Pool, id => Id}}.
|
||||
|
||||
handle_call({add_route, Route}, _From, State) ->
|
||||
{reply, do_add_route(Route), State};
|
||||
handle_call({add_route, Topic, Dest}, _From, State) ->
|
||||
Ok = do_add_route(Topic, Dest),
|
||||
{reply, Ok, State};
|
||||
|
||||
handle_call({delete_route, Route}, _From, State) ->
|
||||
{reply, do_delete_route(Route), State};
|
||||
handle_call({delete_route, Topic, Dest}, _From, State) ->
|
||||
Ok = do_delete_route(Topic, Dest),
|
||||
{reply, Ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Router] unexpected call: ~p", [Req]),
|
||||
|
@ -192,23 +188,23 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
add_direct_route(Route) ->
|
||||
insert_direct_route(Route) ->
|
||||
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
|
||||
|
||||
add_trie_route(Route = #route{topic = Topic}) ->
|
||||
insert_trie_route(Route = #route{topic = Topic}) ->
|
||||
case mnesia:wread({?ROUTE, Topic}) of
|
||||
[] -> emqx_trie:insert(Topic);
|
||||
_ -> ok
|
||||
end,
|
||||
mnesia:write(?ROUTE, Route, sticky_write).
|
||||
|
||||
del_direct_route(Route) ->
|
||||
delete_direct_route(Route) ->
|
||||
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
|
||||
|
||||
del_trie_route(Route = #route{topic = Topic}) ->
|
||||
delete_trie_route(Route = #route{topic = Topic}) ->
|
||||
case mnesia:wread({?ROUTE, Topic}) of
|
||||
[Route] -> %% Remove route and trie
|
||||
mnesia:delete_object(?ROUTE, Route, sticky_write),
|
||||
ok = mnesia:delete_object(?ROUTE, Route, sticky_write),
|
||||
emqx_trie:delete(Topic);
|
||||
[_|_] -> %% Remove route only
|
||||
mnesia:delete_object(?ROUTE, Route, sticky_write);
|
||||
|
@ -219,7 +215,7 @@ del_trie_route(Route = #route{topic = Topic}) ->
|
|||
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
||||
trans(Fun, Args) ->
|
||||
case mnesia:transaction(Fun, Args) of
|
||||
{atomic, _} -> ok;
|
||||
{aborted, Error} -> {error, Error}
|
||||
{atomic, Ok} -> Ok;
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
|
|
|
@ -31,15 +31,11 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
%% internal export
|
||||
%% Internal export
|
||||
-export([stats_fun/0]).
|
||||
|
||||
-record(routing_node, {name, const = unused}).
|
||||
-record(state, {nodes = []}).
|
||||
|
||||
-compile({no_auto_import, [monitor/1]}).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(ROUTE, emqx_route).
|
||||
-define(ROUTING_NODE, emqx_routing_node).
|
||||
-define(LOCK, {?MODULE, cleanup_routes}).
|
||||
|
@ -64,9 +60,9 @@ mnesia(copy) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
%% @doc Starts the router helper
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
||||
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
%% @doc Monitor routing node
|
||||
-spec(monitor(node() | {binary(), node()}) -> ok).
|
||||
|
@ -84,18 +80,18 @@ monitor(Node) when is_atom(Node) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
_ = ekka:monitor(membership),
|
||||
_ = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
|
||||
ok = ekka:monitor(membership),
|
||||
{ok, _} = mnesia:subscribe({table, ?ROUTING_NODE, simple}),
|
||||
Nodes = lists:foldl(
|
||||
fun(Node, Acc) ->
|
||||
case ekka:is_member(Node) of
|
||||
true -> Acc;
|
||||
false -> _ = erlang:monitor_node(Node, true),
|
||||
false -> true = erlang:monitor_node(Node, true),
|
||||
[Node | Acc]
|
||||
end
|
||||
end, [], mnesia:dirty_all_keys(?ROUTING_NODE)),
|
||||
emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0),
|
||||
{ok, #state{nodes = Nodes}, hibernate}.
|
||||
{ok, #{nodes => Nodes}, hibernate}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[RouterHelper] unexpected call: ~p", [Req]),
|
||||
|
@ -105,24 +101,29 @@ handle_cast(Msg, State) ->
|
|||
emqx_logger:error("[RouterHelper] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, #routing_node{name = Node}, _}}, State = #state{nodes = Nodes}) ->
|
||||
emqx_logger:info("[RouterHelper] write routing node: ~s", [Node]),
|
||||
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) ->
|
||||
case ekka:is_member(Node) orelse lists:member(Node, Nodes) of
|
||||
true -> {noreply, State};
|
||||
false -> _ = erlang:monitor_node(Node, true),
|
||||
{noreply, State#state{nodes = [Node | Nodes]}}
|
||||
true -> {noreply, State};
|
||||
false ->
|
||||
true = erlang:monitor_node(Node, true),
|
||||
{noreply, State#{nodes := [Node | Nodes]}}
|
||||
end;
|
||||
|
||||
handle_info({mnesia_table_event, _Event}, State) ->
|
||||
handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
|
||||
%% ignore
|
||||
{noreply, State};
|
||||
|
||||
handle_info({nodedown, Node}, State = #state{nodes = Nodes}) ->
|
||||
handle_info({mnesia_table_event, Event}, State) ->
|
||||
emqx_logger:error("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
|
||||
global:trans({?LOCK, self()},
|
||||
fun() ->
|
||||
mnesia:transaction(fun cleanup_routes/1, [Node])
|
||||
end),
|
||||
mnesia:dirty_delete(?ROUTING_NODE, Node),
|
||||
{noreply, State#state{nodes = lists:delete(Node, Nodes)}, hibernate};
|
||||
ok = mnesia:dirty_delete(?ROUTING_NODE, Node),
|
||||
{noreply, State#{nodes := lists:delete(Node, Nodes)}, hibernate};
|
||||
|
||||
handle_info({membership, {mnesia, down, Node}}, State) ->
|
||||
handle_info({nodedown, Node}, State);
|
||||
|
@ -134,8 +135,8 @@ handle_info(Info, State) ->
|
|||
emqx_logger:error("[RouteHelper] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{}) ->
|
||||
ekka:unmonitor(membership),
|
||||
terminate(_Reason, _State) ->
|
||||
ok = ekka:unmonitor(membership),
|
||||
emqx_stats:cancel_update(route_stats),
|
||||
mnesia:unsubscribe({table, ?ROUTING_NODE, simple}).
|
||||
|
||||
|
|
|
@ -252,7 +252,6 @@ subscribers(Group, Topic) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
_ = emqx_router:set_mode(protected),
|
||||
mnesia:subscribe({table, ?TAB, simple}),
|
||||
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
|
||||
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
|
||||
|
@ -269,7 +268,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
|
|||
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||
true -> ok;
|
||||
false -> ok = emqx_router:add_route(Topic, {Group, node()})
|
||||
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
|
||||
end,
|
||||
ok = maybe_insert_alive_tab(SubPid),
|
||||
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||
|
@ -280,7 +279,7 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
|||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||
true -> ok;
|
||||
false -> ok = emqx_router:delete_route(Topic, {Group, node()})
|
||||
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
||||
end,
|
||||
{reply, ok, State};
|
||||
|
||||
|
@ -334,7 +333,7 @@ cleanup_down(SubPid) ->
|
|||
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||
true -> ok;
|
||||
false -> ok = emqx_router:delete_route(Topic, {Group, node()})
|
||||
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
||||
end
|
||||
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
%% @doc Create or replicate trie tables.
|
||||
-spec(mnesia(boot | copy) -> ok).
|
||||
mnesia(boot) ->
|
||||
%% Optimize
|
||||
%% Optimize storage
|
||||
StoreProps = [{ets, [{read_concurrency, true},
|
||||
{write_concurrency, true}]}],
|
||||
%% Trie table
|
||||
|
@ -72,7 +72,7 @@ insert(Topic) when is_binary(Topic) ->
|
|||
write_trie_node(TrieNode#trie_node{topic = Topic});
|
||||
[] ->
|
||||
%% Add trie path
|
||||
lists:foreach(fun add_path/1, emqx_topic:triples(Topic)),
|
||||
ok = lists:foreach(fun add_path/1, emqx_topic:triples(Topic)),
|
||||
%% Add last node
|
||||
write_trie_node(#trie_node{node_id = Topic, topic = Topic})
|
||||
end.
|
||||
|
@ -93,7 +93,7 @@ lookup(NodeId) ->
|
|||
delete(Topic) when is_binary(Topic) ->
|
||||
case mnesia:wread({?TRIE_NODE, Topic}) of
|
||||
[#trie_node{edge_count = 0}] ->
|
||||
mnesia:delete({?TRIE_NODE, Topic}),
|
||||
ok = mnesia:delete({?TRIE_NODE, Topic}),
|
||||
delete_path(lists:reverse(emqx_topic:triples(Topic)));
|
||||
[TrieNode] ->
|
||||
write_trie_node(TrieNode#trie_node{topic = undefined});
|
||||
|
@ -112,12 +112,12 @@ add_path({Node, Word, Child}) ->
|
|||
[TrieNode = #trie_node{edge_count = Count}] ->
|
||||
case mnesia:wread({?TRIE, Edge}) of
|
||||
[] ->
|
||||
write_trie_node(TrieNode#trie_node{edge_count = Count + 1}),
|
||||
ok = write_trie_node(TrieNode#trie_node{edge_count = Count + 1}),
|
||||
write_trie(#trie{edge = Edge, node_id = Child});
|
||||
[_] -> ok
|
||||
end;
|
||||
[] ->
|
||||
write_trie_node(#trie_node{node_id = Node, edge_count = 1}),
|
||||
ok = write_trie_node(#trie_node{node_id = Node, edge_count = 1}),
|
||||
write_trie(#trie{edge = Edge, node_id = Child})
|
||||
end.
|
||||
|
||||
|
@ -154,10 +154,10 @@ match_node(NodeId, [W|Words], ResAcc) ->
|
|||
delete_path([]) ->
|
||||
ok;
|
||||
delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||
mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}),
|
||||
case mnesia:read(?TRIE_NODE, NodeId) of
|
||||
ok = mnesia:delete({?TRIE, #trie_edge{node_id = NodeId, word = Word}}),
|
||||
case mnesia:wread({?TRIE_NODE, NodeId}) of
|
||||
[#trie_node{edge_count = 1, topic = undefined}] ->
|
||||
mnesia:delete({?TRIE_NODE, NodeId}),
|
||||
ok = mnesia:delete({?TRIE_NODE, NodeId}),
|
||||
delete_path(RestPath);
|
||||
[TrieNode = #trie_node{edge_count = 1, topic = _}] ->
|
||||
write_trie_node(TrieNode#trie_node{edge_count = 0});
|
||||
|
@ -167,9 +167,11 @@ delete_path([{NodeId, Word, _} | RestPath]) ->
|
|||
mnesia:abort({node_not_found, NodeId})
|
||||
end.
|
||||
|
||||
%% @private
|
||||
write_trie(Trie) ->
|
||||
mnesia:write(?TRIE, Trie, write).
|
||||
|
||||
%% @private
|
||||
write_trie_node(TrieNode) ->
|
||||
mnesia:write(?TRIE_NODE, TrieNode, write).
|
||||
|
||||
|
|
|
@ -21,17 +21,16 @@
|
|||
-compile(nowarn_export_all).
|
||||
|
||||
-define(R, emqx_router).
|
||||
-define(TABS, [emqx_route, emqx_trie, emqx_trie_node]).
|
||||
|
||||
all() ->
|
||||
[{group, route}].
|
||||
|
||||
groups() ->
|
||||
[{route, [sequence],
|
||||
[add_del_route,
|
||||
match_routes,
|
||||
has_routes,
|
||||
router_add_del]}].
|
||||
[t_add_delete,
|
||||
t_do_add_delete,
|
||||
t_match_routes,
|
||||
t_has_routes]}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_broker_helpers:run_setup_steps(),
|
||||
|
@ -47,77 +46,47 @@ init_per_testcase(_TestCase, Config) ->
|
|||
end_per_testcase(_TestCase, _Config) ->
|
||||
clear_tables().
|
||||
|
||||
add_del_route(_) ->
|
||||
From = {self(), make_ref()},
|
||||
?R:add_route(From, <<"a/b/c">>, node()),
|
||||
timer:sleep(1),
|
||||
|
||||
?R:add_route(From, <<"a/b/c">>, node()),
|
||||
timer:sleep(1),
|
||||
|
||||
?R:add_route(From, <<"a/+/b">>, node()),
|
||||
ct:log("Topics: ~p ~n", [emqx_topic:wildcard(<<"a/+/b">>)]),
|
||||
timer:sleep(1),
|
||||
|
||||
t_add_delete(_) ->
|
||||
?R:add_route(<<"a/b/c">>, node()),
|
||||
?R:add_route(<<"a/b/c">>, node()),
|
||||
?R:add_route(<<"a/+/b">>, node()),
|
||||
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
|
||||
|
||||
?R:del_route(From, <<"a/b/c">>, node()),
|
||||
?R:delete_route(<<"a/b/c">>),
|
||||
?R:delete_route(<<"a/+/b">>, node()),
|
||||
?assertEqual([], ?R:topics()).
|
||||
|
||||
?R:del_route(From, <<"a/+/b">>, node()),
|
||||
timer:sleep(120),
|
||||
?assertEqual([], lists:sort(?R:topics())).
|
||||
t_do_add_delete(_) ->
|
||||
?R:do_add_route(<<"a/b/c">>, node()),
|
||||
?R:do_add_route(<<"a/b/c">>, node()),
|
||||
?R:do_add_route(<<"a/+/b">>, node()),
|
||||
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
|
||||
|
||||
match_routes(_) ->
|
||||
From = {self(), make_ref()},
|
||||
?R:add_route(From, <<"a/b/c">>, node()),
|
||||
?R:add_route(From, <<"a/+/c">>, node()),
|
||||
?R:add_route(From, <<"a/b/#">>, node()),
|
||||
?R:add_route(From, <<"#">>, node()),
|
||||
timer:sleep(1000),
|
||||
?R:do_delete_route(<<"a/b/c">>, node()),
|
||||
?R:do_delete_route(<<"a/+/b">>),
|
||||
?assertEqual([], ?R:topics()).
|
||||
|
||||
t_match_routes(_) ->
|
||||
?R:add_route(<<"a/b/c">>, node()),
|
||||
?R:add_route(<<"a/+/c">>, node()),
|
||||
?R:add_route(<<"a/b/#">>, node()),
|
||||
?R:add_route(<<"#">>, node()),
|
||||
?assertEqual([#route{topic = <<"#">>, dest = node()},
|
||||
#route{topic = <<"a/+/c">>, dest = node()},
|
||||
#route{topic = <<"a/b/#">>, dest = node()},
|
||||
#route{topic = <<"a/b/c">>, dest = node()}],
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))).
|
||||
lists:sort(?R:match_routes(<<"a/b/c">>))),
|
||||
?R:delete_route(<<"a/b/c">>, node()),
|
||||
?R:delete_route(<<"a/+/c">>, node()),
|
||||
?R:delete_route(<<"a/b/#">>, node()),
|
||||
?R:delete_route(<<"#">>, node()),
|
||||
?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))).
|
||||
|
||||
has_routes(_) ->
|
||||
From = {self(), make_ref()},
|
||||
?R:add_route(From, <<"devices/+/messages">>, node()),
|
||||
timer:sleep(200),
|
||||
?assert(?R:has_routes(<<"devices/+/messages">>)).
|
||||
t_has_routes(_) ->
|
||||
?R:add_route(<<"devices/+/messages">>, node()),
|
||||
?assert(?R:has_routes(<<"devices/+/messages">>)),
|
||||
?R:delete_route(<<"devices/+/messages">>).
|
||||
|
||||
clear_tables() ->
|
||||
lists:foreach(fun mnesia:clear_table/1, ?TABS).
|
||||
|
||||
router_add_del(_) ->
|
||||
?R:add_route(<<"#">>),
|
||||
?R:add_route(<<"a/b/c">>, node()),
|
||||
?R:add_route(<<"+/#">>),
|
||||
Routes = [R1, R2 | _] = [
|
||||
#route{topic = <<"#">>, dest = node()},
|
||||
#route{topic = <<"+/#">>, dest = node()},
|
||||
#route{topic = <<"a/b/c">>, dest = node()}],
|
||||
timer:sleep(500),
|
||||
?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
|
||||
|
||||
?R:print_routes(<<"a/b/c">>),
|
||||
|
||||
%% Batch Add
|
||||
lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
|
||||
?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
|
||||
|
||||
%% Del
|
||||
?R:del_route(<<"a/b/c">>, node()),
|
||||
timer:sleep(500),
|
||||
[R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)),
|
||||
{atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
|
||||
|
||||
%% Batch Del
|
||||
R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'},
|
||||
?R:add_route(R3),
|
||||
?R:del_route(<<"#">>),
|
||||
?R:del_route(R2),
|
||||
?R:del_route(R3),
|
||||
timer:sleep(500),
|
||||
[] = lists:sort(?R:match_routes(<<"a/b/c">>)).
|
||||
lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]).
|
||||
|
||||
|
|
|
@ -47,36 +47,36 @@ t_insert(_) ->
|
|||
edge_count = 3,
|
||||
topic = <<"sensor">>,
|
||||
flags = undefined},
|
||||
{atomic, [TN]} = mnesia:transaction(
|
||||
fun() ->
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/+/#">>),
|
||||
?TRIE:insert(<<"sensor/#">>),
|
||||
?TRIE:insert(<<"sensor">>),
|
||||
?TRIE:insert(<<"sensor">>),
|
||||
?TRIE:lookup(<<"sensor">>)
|
||||
end).
|
||||
Fun = fun() ->
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/+/#">>),
|
||||
?TRIE:insert(<<"sensor/#">>),
|
||||
?TRIE:insert(<<"sensor">>),
|
||||
?TRIE:insert(<<"sensor">>),
|
||||
?TRIE:lookup(<<"sensor">>)
|
||||
end,
|
||||
?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)).
|
||||
|
||||
t_match(_) ->
|
||||
Machted = [<<"sensor/+/#">>, <<"sensor/#">>],
|
||||
{atomic, Machted} = mnesia:transaction(
|
||||
fun() ->
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/+/#">>),
|
||||
?TRIE:insert(<<"sensor/#">>),
|
||||
?TRIE:match(<<"sensor/1">>)
|
||||
end).
|
||||
Fun = fun() ->
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/+/#">>),
|
||||
?TRIE:insert(<<"sensor/#">>),
|
||||
?TRIE:match(<<"sensor/1">>)
|
||||
end,
|
||||
?assertEqual({atomic, Machted}, mnesia:transaction(Fun)).
|
||||
|
||||
t_match2(_) ->
|
||||
Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []},
|
||||
{atomic, Matched} = mnesia:transaction(
|
||||
fun() ->
|
||||
?TRIE:insert(<<"#">>),
|
||||
?TRIE:insert(<<"+/#">>),
|
||||
?TRIE:insert(<<"+/+/#">>),
|
||||
{?TRIE:match(<<"a/b/c">>),
|
||||
?TRIE:match(<<"$SYS/broker/zenmq">>)}
|
||||
end).
|
||||
Fun = fun() ->
|
||||
?TRIE:insert(<<"#">>),
|
||||
?TRIE:insert(<<"+/#">>),
|
||||
?TRIE:insert(<<"+/+/#">>),
|
||||
{?TRIE:match(<<"a/b/c">>),
|
||||
?TRIE:match(<<"$SYS/broker/zenmq">>)}
|
||||
end,
|
||||
?assertEqual({atomic, Matched}, mnesia:transaction(Fun)).
|
||||
|
||||
t_match3(_) ->
|
||||
Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
|
||||
|
@ -91,43 +91,42 @@ t_delete(_) ->
|
|||
edge_count = 2,
|
||||
topic = undefined,
|
||||
flags = undefined},
|
||||
{atomic, [TN]} = mnesia:transaction(
|
||||
fun() ->
|
||||
?TRIE:insert(<<"sensor/1/#">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/3">>),
|
||||
?TRIE:delete(<<"sensor/1/metric/2">>),
|
||||
?TRIE:delete(<<"sensor/1/metric">>),
|
||||
?TRIE:delete(<<"sensor/1/metric">>),
|
||||
?TRIE:lookup(<<"sensor/1">>)
|
||||
end).
|
||||
Fun = fun() ->
|
||||
?TRIE:insert(<<"sensor/1/#">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/3">>),
|
||||
?TRIE:delete(<<"sensor/1/metric/2">>),
|
||||
?TRIE:delete(<<"sensor/1/metric">>),
|
||||
?TRIE:delete(<<"sensor/1/metric">>),
|
||||
?TRIE:lookup(<<"sensor/1">>)
|
||||
end,
|
||||
?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)).
|
||||
|
||||
t_delete2(_) ->
|
||||
{atomic, {[], []}} = mnesia:transaction(
|
||||
fun() ->
|
||||
?TRIE:insert(<<"sensor">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/3">>),
|
||||
?TRIE:delete(<<"sensor">>),
|
||||
?TRIE:delete(<<"sensor/1/metric/2">>),
|
||||
?TRIE:delete(<<"sensor/1/metric/3">>),
|
||||
{?TRIE:lookup(<<"sensor">>),
|
||||
?TRIE:lookup(<<"sensor/1">>)}
|
||||
end).
|
||||
Fun = fun() ->
|
||||
?TRIE:insert(<<"sensor">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/1/metric/3">>),
|
||||
?TRIE:delete(<<"sensor">>),
|
||||
?TRIE:delete(<<"sensor/1/metric/2">>),
|
||||
?TRIE:delete(<<"sensor/1/metric/3">>),
|
||||
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/1">>)}
|
||||
end,
|
||||
?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)).
|
||||
|
||||
t_delete3(_) ->
|
||||
{atomic, {[], []}} = mnesia:transaction(
|
||||
fun() ->
|
||||
?TRIE:insert(<<"sensor/+">>),
|
||||
?TRIE:insert(<<"sensor/+/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/+/metric/3">>),
|
||||
?TRIE:delete(<<"sensor/+/metric/2">>),
|
||||
?TRIE:delete(<<"sensor/+/metric/3">>),
|
||||
?TRIE:delete(<<"sensor">>),
|
||||
?TRIE:delete(<<"sensor/+">>),
|
||||
?TRIE:delete(<<"sensor/+/unknown">>),
|
||||
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)}
|
||||
end).
|
||||
Fun = fun() ->
|
||||
?TRIE:insert(<<"sensor/+">>),
|
||||
?TRIE:insert(<<"sensor/+/metric/2">>),
|
||||
?TRIE:insert(<<"sensor/+/metric/3">>),
|
||||
?TRIE:delete(<<"sensor/+/metric/2">>),
|
||||
?TRIE:delete(<<"sensor/+/metric/3">>),
|
||||
?TRIE:delete(<<"sensor">>),
|
||||
?TRIE:delete(<<"sensor/+">>),
|
||||
?TRIE:delete(<<"sensor/+/unknown">>),
|
||||
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)}
|
||||
end,
|
||||
?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)).
|
||||
|
||||
clear_tables() ->
|
||||
lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS).
|
||||
|
|
Loading…
Reference in New Issue