diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index f7916e562..359499cf4 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/2]). +-export([start_link/3]). -export([lookup/1, register/1, unregister/1]). @@ -43,7 +43,7 @@ -record(state, {tab, statsfun}). --define(POOL, cm). +-define(POOL, cm_pool). %%%============================================================================= %%% API @@ -53,11 +53,12 @@ %% @doc Start client manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, TabId) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - TabId :: ets:tid(). -start_link(Id, TabId) -> - gen_server:start_link(?MODULE, [Id, TabId], []). + TabId :: ets:tid(), + StatsFun :: fun(). +start_link(Id, TabId, StatsFun) -> + gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Lookup client pid with clientId @@ -92,9 +93,8 @@ unregister(ClientId) when is_binary(ClientId) -> %%% gen_server callbacks %%%============================================================================= -init([Id, TabId]) -> +init([Id, TabId, StatsFun]) -> gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), {ok, #state{tab = TabId, statsfun = StatsFun}}. handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> @@ -110,7 +110,7 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> [] -> ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) end, - {reply, ok, State}; + {reply, ok, setstats(State)}; handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), @@ -151,4 +151,3 @@ code_change(_OldVsn, State, _Extra) -> setstats(State = #state{tab = TabId, statsfun = StatsFun}) -> StatsFun(ets:info(TabId, size)), State. - diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 853349b55..a067492eb 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -26,7 +26,7 @@ %%%----------------------------------------------------------------------------- -module(emqttd_cm_sup). --author('feng@emqtt.io'). +-author("Feng Lee "). -include("emqttd.hrl"). @@ -49,12 +49,13 @@ init([]) -> TabId = ets:new(?CLIENT_TAB, [set, named_table, public, {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), - gproc_pool:new(cm, hash, [{size, Schedulers}]), + gproc_pool:new(cm_pool, hash, [{size, Schedulers}]), + StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), Children = lists:map( fun(I) -> Name = {emqttd_cm, I}, - gproc_pool:add_worker(cm, Name, I), - {Name, {emqttd_cm, start_link, [I, TabId]}, + gproc_pool:add_worker(cm_pool, Name, I), + {Name, {emqttd_cm, start_link, [I, TabId, StatsFun]}, permanent, 10000, worker, [emqttd_cm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}.