From 20ede24d832905f6362c985ed5a1b97014985fc3 Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 12 Jan 2016 14:01:28 +0800 Subject: [PATCH] fix issue #427 - Optimization for Route ETS insertion --- src/emqttd_router.erl | 341 +++++++++++++++++++++++++++--------------- 1 file changed, 218 insertions(+), 123 deletions(-) diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index 9f10f3b58..b80e8bdd3 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,167 +19,262 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc MQTT Message Router on Local Node -%%% -%%% Route Table: -%%% -%%% Topic -> Pid1, Pid2, ... -%%% -%%% Reverse Route Table: -%%% -%%% Pid -> Topic1, Topic2, ... -%%% -%%% @end +%%% @doc Message Router on Local Node. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- -module(emqttd_router). +-behaviour(gen_server2). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). --export([init/1, route/2, lookup_routes/1, has_route/1, - add_routes/2, delete_routes/1, delete_routes/2]). +-include("emqttd_internal.hrl"). --ifdef(TEST). --compile(export_all). --endif. +-export([start_link/4]). -%%------------------------------------------------------------------------------ -%% @doc Create route tables. -%% @end -%%------------------------------------------------------------------------------ -init(_Opts) -> - TabOpts = [bag, public, named_table, - {write_concurrency, true}], - %% Route Table: Topic -> {Pid, QoS} - %% Route Shard: {Topic, Shard} -> {Pid, QoS} - ensure_tab(route, TabOpts), +%% Route API +-export([route/2]). - %% Reverse Route Table: Pid -> {Topic, QoS} - ensure_tab(reverse_route, TabOpts). +%% Route Admin API +-export([add_route/2, lookup_routes/1, has_route/1, delete_route/2]). -ensure_tab(Tab, Opts) -> - case ets:info(Tab, name) of - undefined -> - ets:new(Tab, Opts); - _ -> - ok - end. +%% Batch API +-export([add_routes/2, delete_routes/2]). --ifdef(TEST). -destory() -> - ets:delete(route), - ets:delete(reverse_route). --endif. +%% gen_server Callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -%%------------------------------------------------------------------------------ -%% @doc Add Routes. -%% @end -%%------------------------------------------------------------------------------ --spec add_routes(list(binary()), pid()) -> ok. -add_routes(Topics, Pid) when is_pid(Pid) -> - with_stats(fun() -> - case lookup_routes(Pid) of - [] -> - erlang:monitor(process, Pid), - insert_routes(Topics, Pid); - InEts -> - insert_routes(Topics -- InEts, Pid) - end - end). +-record(aging, {topics, time, tref}). -%%------------------------------------------------------------------------------ -%% @doc Lookup Routes -%% @end -%%------------------------------------------------------------------------------ --spec lookup_routes(pid()) -> list(binary()). -lookup_routes(Pid) when is_pid(Pid) -> - [Topic || {_, Topic} <- ets:lookup(reverse_route, Pid)]. +-record(state, {pool, id, statsfun, aging :: #aging{}}). -%%------------------------------------------------------------------------------ -%% @doc Has Route? -%% @end -%%------------------------------------------------------------------------------ --spec has_route(binary()) -> boolean(). -has_route(Topic) -> - ets:member(route, Topic). +-type topic() :: binary(). -%%------------------------------------------------------------------------------ -%% @doc Delete Routes -%% @end -%%------------------------------------------------------------------------------ --spec delete_routes(list(binary()), pid()) -> ok. -delete_routes(Topics, Pid) -> - with_stats(fun() -> - Routes = [{Topic, Pid} || Topic <- Topics], - lists:foreach(fun delete_route/1, Routes) - end). +%% @doc Start a local router. +-spec start_link(atom(), pos_integer(), fun(), list()) -> {ok, pid()} | {error, any}. +start_link(Pool, Id, StatsFun, Env) -> + gen_server2:start_link({local, name(Id)}, ?MODULE, [Pool, Id, StatsFun, Env], []). --spec delete_routes(pid()) -> ok. -delete_routes(Pid) when is_pid(Pid) -> - with_stats(fun() -> - Routes = [{Topic, Pid} || Topic <- lookup_routes(Pid)], - ets:delete(reverse_route, Pid), - lists:foreach(fun delete_route_only/1, Routes) - end). +name(Id) -> + list_to_atom("emqttd_router_" ++ integer_to_list(Id)). -%%------------------------------------------------------------------------------ -%% @doc Route Message on Local Node. -%% @end -%%------------------------------------------------------------------------------ --spec route(binary(), mqtt_message()) -> non_neg_integer(). +%% @doc Route Message on the local node. +-spec route(topic(), mqtt_message()) -> any(). route(Queue = <<"$Q/", _Q>>, Msg) -> - case ets:lookup(route, Queue) of + case lookup_routes(Queue) of [] -> emqttd_metrics:inc('messages/dropped'); + [SubPid] -> + SubPid ! {dispatch, Queue, Msg}; Routes -> Idx = crypto:rand_uniform(1, length(Routes) + 1), - {_, SubPid} = lists:nth(Idx, Routes), - dispatch(SubPid, Queue, Msg) + SubPid = lists:nth(Idx, Routes), + SubPid ! {dispatch, Queue, Msg} end; route(Topic, Msg) -> - case ets:lookup(route, Topic) of + case lookup_routes(Topic) of [] -> emqttd_metrics:inc('messages/dropped'); + [SubPid] -> %% optimize + SubPid ! {dispatch, Topic, Msg}; Routes -> - lists:foreach(fun({_Topic, SubPid}) -> - dispatch(SubPid, Topic, Msg) - end, Routes) + lists:foreach(fun(SubPid) -> + SubPid ! {dispatch, Topic, Msg} + end, Routes) end. -dispatch(SubPid, Topic, Msg) -> SubPid ! {dispatch, Topic, Msg}. +%% @doc Has Route? +-spec has_route(topic()) -> boolean(). +has_route(Topic) -> + ets:member(route, Topic). -%%%============================================================================= -%%% Internal Functions -%%%============================================================================= +%% @doc Lookup Routes +-spec lookup_routes(topic()) -> list(pid()). +lookup_routes(Topic) when is_binary(Topic) -> + case ets:member(route, Topic) of + true -> + try ets:lookup_element(route, Topic, 2) catch error:badarg -> [] end; + false -> + [] + end. -insert_routes([], _Pid) -> - ok; -insert_routes(Topics, Pid) -> - {Routes, ReverseRoutes} = routes(Topics, Pid), - ets:insert(route, Routes), - ets:insert(reverse_route, ReverseRoutes). +%% @doc Add Route. +-spec add_route(topic(), pid()) -> ok. +add_route(Topic, Pid) when is_pid(Pid) -> + call(pick(Topic), {add_route, Topic, Pid}). -routes(Topics, Pid) -> - lists:unzip([{{Topic, Pid}, {Pid, Topic}} || Topic <- Topics]). +%% @doc Add Routes. +-spec add_routes(list(topic()), pid()) -> ok. +add_routes([Topic], Pid) -> + add_route(Topic, Pid); -delete_route({Topic, Pid}) -> - ets:delete_object(reverse_route, {Pid, Topic}), - ets:delete_object(route, {Topic, Pid}). +add_routes(Topics, Pid) -> + lists:foreach(fun({Router, Slice}) -> + call(Router, {add_routes, Slice, Pid}) + end, slice(Topics)). -delete_route_only({Topic, Pid}) -> - ets:delete_object(route, {Topic, Pid}). +%% @doc Delete Route. +-spec delete_route(topic(), pid()) -> ok. +delete_route(Topic, Pid) -> + cast(pick(Topic), {delete_route, Topic, Pid}). -with_stats(Fun) -> - Ok = Fun(), setstats(), Ok. +%% @doc Delete Routes. +-spec delete_routes(list(topic()), pid()) -> ok. +delete_routes([Topic], Pid) -> + delete_route(Topic, Pid); -setstats() -> - lists:foreach(fun setstat/1, [{route, 'routes/count'}, - {reverse_route, 'routes/reverse'}]). +delete_routes(Topics, Pid) -> + lists:foreach(fun({Router, Slice}) -> + cast(Router, {delete_routes, Slice, Pid}) + end, slice(Topics)). -setstat({Tab, Stat}) -> - emqttd_stats:setstat(Stat, ets:info(Tab, size)). +%% @private Slice topics. +slice(Topics) -> + dict:to_list(lists:foldl(fun(Topic, Dict) -> + Router = pick(Topic), + case dict:find(Router, Dict) of + {ok, L} -> + dict:store(Router, [Topic | L], Dict); + error -> + dict:store(Router, [Topic], Dict) + end + end, dict:new(), Topics)). + +%% @private Pick a router. +pick(Topic) -> + gproc_pool:pick_worker(router, Topic). + +call(Router, Request) -> + gen_server2:call(Router, Request, infinity). + +cast(Router, Msg) -> + gen_server2:cast(Router, Msg). + +init([Pool, Id, StatsFun, Opts]) -> + + %% Calls from pubsub should be scheduled first? + process_flag(priority, high), + + ?GPROC_POOL(join, Pool, Id), + + random:seed(os:timestamp()), + + AgingSecs = proplists:get_value(route_aging, Opts, 5), + + %% Aging Timer + {ok, AgingTref} = start_tick(AgingSecs + random:uniform(AgingSecs)), + + Aging = #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref}, + + {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, aging = Aging}}. + +start_tick(Secs) -> + timer:send_interval(timer:seconds(Secs), {clean, aged}). + +handle_call({add_route, Topic, Pid}, _From, State) -> + ets:insert(route, {Topic, Pid}), + {reply, ok, State}; + +handle_call({add_routes, Topics, Pid}, _From, State) -> + ets:insert(route, [{Topic, Pid} || Topic <- Topics]), + {reply, ok, State}; + +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). + +handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) -> + ets:delete_object(route, {Topic, Pid}), + case has_route(Topic) of + false -> + {noreply, State#state{aging = store_aged(Topic, Aging)}}; + true -> + {noreply, State} + end; + +handle_cast({delete_routes, Topics, Pid}, State) -> + NewAging = + lists:foldl(fun(Topic, Aging) -> + ets:delete_object(route, {Topic, Pid}), + case has_route(Topic) of + false -> store_aged(Topic, Aging); + true -> Aging + end + end, State#state.aging, Topics), + {noreply, State#state{aging = NewAging}}; + +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). + +handle_info({clean, aged}, State = #state{aging = Aging}) -> + + #aging{topics = Dict, time = Time} = Aging, + + ByTime = emqttd_util:now_to_secs() - Time, + + Dict1 = try_clean(ByTime, dict:to_list(Dict)), + + NewAging = Aging#aging{topics = dict:from_list(Dict1)}, + + {noreply, State#state{aging = NewAging}, hibernate}; + +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). + +terminate(_Reason, #state{pool = Pool, id = Id, aging = #aging{tref = TRef}}) -> + timer:cancel(TRef), + ?GPROC_POOL(leave, Pool, Id). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +try_clean(ByTime, List) -> + try_clean(ByTime, List, []). + +try_clean(_ByTime, [], Acc) -> + Acc; + +try_clean(ByTime, [{Topic, TS} | Left], Acc) -> + case has_route(Topic) of + false -> + try_clean2(ByTime, {Topic, TS}, Left, Acc); + true -> + try_clean(ByTime, Left, Acc) + end. + +try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime -> + try_clean(ByTime, Left, [{Topic, TS} | Acc]); + +try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> + TopicR = #mqtt_topic{topic = Topic, node = node()}, + case mnesia:transaction(fun try_remove_topic/1, [TopicR]) of + {atomic, _} -> ok; + {aborted, Error} -> lager:error("Clean Topic '~s' Error: ~p", [Topic, Error]) + end, + try_clean(ByTime, Left, Acc). + +try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) -> + %% Lock topic first + case mnesia:wread({topic, Topic}) of + [] -> + mnesia:abort(not_found); + [TopicR] -> + %% Remove topic and trie + delete_topic(TopicR), + emqttd_trie:delete(Topic); + _More -> + %% Remove topic only + delete_topic(TopicR) + end. + +delete_topic(TopicR) -> + mnesia:delete_object(topic, TopicR, write). + +store_aged(Topic, Aging = #aging{topics = Dict}) -> + Now = emqttd_util:now_to_secs(), + Aging#aging{topics = dict:store(Topic, Now, Dict)}.