diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 5ccb6bc40..8bcaa7dc2 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -102,7 +102,7 @@ publish(Topic, Msg) -> ?MODULE:dispatch(To, Msg); (#mqtt_route{topic = To, node = Node}) -> rpc:cast(Node, ?MODULE, dispatch, [To, Msg]) - end, emqttd_router:lookup(Topic)). + end, emqttd_router:match(Topic)). %% @doc Dispatch Message to Subscribers -spec(dispatch(binary(), mqtt_message()) -> ok). diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 732cb448d..d946e945e 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -16,6 +16,8 @@ -module(emqttd_router). +-author("Feng Lee "). + -behaviour(gen_server). -include("emqttd.hrl"). @@ -27,18 +29,25 @@ -copy_mnesia({mnesia, [copy]}). %% Start/Stop --export([start_link/0, stop/0]). +-export([start_link/0, topics/0, local_topics/0, stop/0]). %% Route APIs --export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1, +-export([add_route/1, add_route/2, add_routes/1, match/1, print/1, del_route/1, del_route/2, del_routes/1, has_route/1]). +%% Local Route API +-export([add_local_route/1, del_local_route/1, match_local/1]). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([dump/0]). + -record(state, {stats_timer}). +-define(ROUTER, ?MODULE). + %%-------------------------------------------------------------------- %% Mnesia Bootstrap %%-------------------------------------------------------------------- @@ -58,15 +67,21 @@ mnesia(copy) -> %%-------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?ROUTER}, ?MODULE, [], []). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -%% @doc Lookup Routes. --spec(lookup(Topic:: binary()) -> [mqtt_route()]). -lookup(Topic) when is_binary(Topic) -> +topics() -> + mnesia:dirty_all_keys(route). + +local_topics() -> + ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]). + +%% @doc Match Routes. +-spec(match(Topic:: binary()) -> [mqtt_route()]). +match(Topic) when is_binary(Topic) -> Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), %% Optimize: route table will be replicated to all nodes. lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]). @@ -75,7 +90,7 @@ lookup(Topic) when is_binary(Topic) -> -spec(print(Topic :: binary()) -> [ok]). print(Topic) -> [io:format("~s -> ~s~n", [To, Node]) || - #mqtt_route{topic = To, node = Node} <- lookup(Topic)]. + #mqtt_route{topic = To, node = Node} <- match(Topic)]. %% @doc Add Route -spec(add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}). @@ -166,13 +181,35 @@ trans(Fun) -> {aborted, Error} -> {error, Error} end. -stop() -> gen_server:call(?MODULE, stop). +%%-------------------------------------------------------------------- +%% Local Route API +%%-------------------------------------------------------------------- + +-spec(add_local_route(binary()) -> ok). +add_local_route(Topic) -> + gen_server:cast(?ROUTER, {add_local_route, Topic}). + +-spec(del_local_route(binary()) -> ok). +del_local_route(Topic) -> + gen_server:cast(?ROUTER, {del_local_route, Topic}). + +-spec(match_local(binary()) -> [mqtt_route()]). +match_local(Name) -> + [#mqtt_route{topic = {local, Filter}, node = Node} + || {Filter, Node} <- ets:tab2list(local_route), + emqttd_topic:match(Name, Filter)]. + +dump() -> + [{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}]. + +stop() -> gen_server:call(?ROUTER, stop). %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- init([]) -> + ets:new(local_route, [set, named_table, protected]), mnesia:subscribe(system), {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. @@ -183,6 +220,15 @@ handle_call(stop, _From, State) -> handle_call(_Req, _From, State) -> {reply, ignore, State}. +handle_cast({add_local_route, Topic}, State) -> + %% why node()...? + ets:insert(local_route, {Topic, node()}), + {noreply, State}; + +handle_cast({del_local_route, Topic}, State) -> + ets:delete(local_route, Topic), + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}.