support local route

This commit is contained in:
Feng 2016-08-07 18:09:07 +08:00
parent 285421f073
commit 37bd5465bd
2 changed files with 55 additions and 9 deletions

View File

@ -102,7 +102,7 @@ publish(Topic, Msg) ->
?MODULE:dispatch(To, Msg); ?MODULE:dispatch(To, Msg);
(#mqtt_route{topic = To, node = Node}) -> (#mqtt_route{topic = To, node = Node}) ->
rpc:cast(Node, ?MODULE, dispatch, [To, Msg]) rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
end, emqttd_router:lookup(Topic)). end, emqttd_router:match(Topic)).
%% @doc Dispatch Message to Subscribers %% @doc Dispatch Message to Subscribers
-spec(dispatch(binary(), mqtt_message()) -> ok). -spec(dispatch(binary(), mqtt_message()) -> ok).

View File

@ -16,6 +16,8 @@
-module(emqttd_router). -module(emqttd_router).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server). -behaviour(gen_server).
-include("emqttd.hrl"). -include("emqttd.hrl").
@ -27,18 +29,25 @@
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% Start/Stop %% Start/Stop
-export([start_link/0, stop/0]). -export([start_link/0, topics/0, local_topics/0, stop/0]).
%% Route APIs %% Route APIs
-export([add_route/1, add_route/2, add_routes/1, lookup/1, print/1, -export([add_route/1, add_route/2, add_routes/1, match/1, print/1,
del_route/1, del_route/2, del_routes/1, has_route/1]). del_route/1, del_route/2, del_routes/1, has_route/1]).
%% Local Route API
-export([add_local_route/1, del_local_route/1, match_local/1]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export([dump/0]).
-record(state, {stats_timer}). -record(state, {stats_timer}).
-define(ROUTER, ?MODULE).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia Bootstrap %% Mnesia Bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -58,15 +67,21 @@ mnesia(copy) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?ROUTER}, ?MODULE, [], []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Lookup Routes. topics() ->
-spec(lookup(Topic:: binary()) -> [mqtt_route()]). mnesia:dirty_all_keys(route).
lookup(Topic) when is_binary(Topic) ->
local_topics() ->
ets:select(local_route, [{{'$1', '_'}, [], ['$1']}]).
%% @doc Match Routes.
-spec(match(Topic:: binary()) -> [mqtt_route()]).
match(Topic) when is_binary(Topic) ->
Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]), Matched = mnesia:async_dirty(fun emqttd_trie:match/1, [Topic]),
%% Optimize: route table will be replicated to all nodes. %% Optimize: route table will be replicated to all nodes.
lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]). lists:append([ets:lookup(route, To) || To <- [Topic | Matched]]).
@ -75,7 +90,7 @@ lookup(Topic) when is_binary(Topic) ->
-spec(print(Topic :: binary()) -> [ok]). -spec(print(Topic :: binary()) -> [ok]).
print(Topic) -> print(Topic) ->
[io:format("~s -> ~s~n", [To, Node]) || [io:format("~s -> ~s~n", [To, Node]) ||
#mqtt_route{topic = To, node = Node} <- lookup(Topic)]. #mqtt_route{topic = To, node = Node} <- match(Topic)].
%% @doc Add Route %% @doc Add Route
-spec(add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}). -spec(add_route(binary() | mqtt_route()) -> ok | {error, Reason :: any()}).
@ -166,13 +181,35 @@ trans(Fun) ->
{aborted, Error} -> {error, Error} {aborted, Error} -> {error, Error}
end. end.
stop() -> gen_server:call(?MODULE, stop). %%--------------------------------------------------------------------
%% Local Route API
%%--------------------------------------------------------------------
-spec(add_local_route(binary()) -> ok).
add_local_route(Topic) ->
gen_server:cast(?ROUTER, {add_local_route, Topic}).
-spec(del_local_route(binary()) -> ok).
del_local_route(Topic) ->
gen_server:cast(?ROUTER, {del_local_route, Topic}).
-spec(match_local(binary()) -> [mqtt_route()]).
match_local(Name) ->
[#mqtt_route{topic = {local, Filter}, node = Node}
|| {Filter, Node} <- ets:tab2list(local_route),
emqttd_topic:match(Name, Filter)].
dump() ->
[{route, ets:tab2list(route)}, {local_route, ets:tab2list(local_route)}].
stop() -> gen_server:call(?ROUTER, stop).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(local_route, [set, named_table, protected]),
mnesia:subscribe(system), mnesia:subscribe(system),
{ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_timer = TRef}}. {ok, #state{stats_timer = TRef}}.
@ -183,6 +220,15 @@ handle_call(stop, _From, State) ->
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
{reply, ignore, State}. {reply, ignore, State}.
handle_cast({add_local_route, Topic}, State) ->
%% why node()...?
ets:insert(local_route, {Topic, node()}),
{noreply, State};
handle_cast({del_local_route, Topic}, State) ->
ets:delete(local_route, Topic),
{noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.