diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index 63d6efd6e..c7185414b 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -1,5 +1,5 @@ %%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% Copyright (c) 2012-2016 eMQTT.IO, All Rights Reserved. %%% %%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% of this software and associated documentation files (the "Software"), to deal @@ -19,7 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc PubSub Supervisor +%%% @doc PubSub Supervisor. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- @@ -31,6 +31,8 @@ -define(HELPER, emqttd_pubsub_helper). +-define(CONCURRENCY_OPTS, [{read_concurrency, true}, {write_concurrency, true}]). + %% API -export([start_link/0]). @@ -40,24 +42,51 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_broker:env(pubsub)]). -init([Opts]) -> +init([Env]) -> + %% Create tabs + create_tab(route), create_tab(reverse_route), + %% PubSub Helper - Helper = {helper, {?HELPER, start_link, [fun stats/1, Opts]}, + Helper = {helper, {?HELPER, start_link, [fun setstats/1]}, permanent, infinity, worker, [?HELPER]}, + %% Router Pool Sup + RouterMFA = {emqttd_router, start_link, [fun setstats/1, Env]}, + %% Pool_size / 2 + RouterSup = emqttd_pool_sup:spec(router_pool, [router, hash, pool_size(Env) div 2, RouterMFA]), + %% PubSub Pool Sup - MFA = {emqttd_pubsub, start_link, [fun stats/1, Opts]}, - PoolSup = emqttd_pool_sup:spec([pubsub, hash, pool_size(Opts), MFA]), - {ok, {{one_for_all, 10, 60}, [Helper, PoolSup]}}. + PubSubMFA = {emqttd_pubsub, start_link, [fun setstats/1, Env]}, + PubSubSup = emqttd_pool_sup:spec(pubsub_pool, [pubsub, hash, pool_size(Env), PubSubMFA]), -pool_size(Opts) -> + {ok, {{one_for_all, 10, 60}, [Helper, RouterSup, PubSubSup]}}. + +create_tab(route) -> + %% Route Table: Topic -> Pid1, Pid2, ..., PidN + %% duplicate_bag: o(1) insert + ensure_tab(route, [public, named_table, duplicate_bag | ?CONCURRENCY_OPTS]); + +create_tab(reverse_route) -> + %% Reverse Route Table: Pid -> Topic1, Topic2, ..., TopicN + ensure_tab(reverse_route, [public, named_table, bag | ?CONCURRENCY_OPTS]). + +ensure_tab(Tab, Opts) -> + case ets:info(Tab, name) of + undefined -> ets:new(Tab, Opts); + _ -> ok + end. + +pool_size(Env) -> Schedulers = erlang:system_info(schedulers), - proplists:get_value(pool_size, Opts, Schedulers). + proplists:get_value(pool_size, Env, Schedulers). -stats(topic) -> - emqttd_stats:setstats('topics/count', 'topics/max', - mnesia:table_info(topic, size)); -stats(subscription) -> +setstats(route) -> + emqttd_stats:setstat('routes/count', ets:info(route, size)); + +setstats(topic) -> + emqttd_stats:setstats('topics/count', 'topics/max', mnesia:table_info(topic, size)); + +setstats(subscription) -> emqttd_stats:setstats('subscriptions/count', 'subscriptions/max', mnesia:table_info(subscription, size)). diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index b80e8bdd3..ec7f79bd5 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -192,7 +192,7 @@ handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) -> case has_route(Topic) of false -> {noreply, State#state{aging = store_aged(Topic, Aging)}}; - true -> + true -> {noreply, State} end;