fix stats bug
This commit is contained in:
parent
d99dac81a4
commit
547f192cae
|
@ -33,7 +33,7 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Exports
|
%% API Exports
|
||||||
-export([start_link/2]).
|
-export([start_link/3]).
|
||||||
|
|
||||||
-export([lookup/1, register/1, unregister/1]).
|
-export([lookup/1, register/1, unregister/1]).
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@
|
||||||
|
|
||||||
-record(state, {tab, statsfun}).
|
-record(state, {tab, statsfun}).
|
||||||
|
|
||||||
-define(POOL, cm).
|
-define(POOL, cm_pool).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -53,11 +53,12 @@
|
||||||
%% @doc Start client manager
|
%% @doc Start client manager
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link(Id, TabId) -> {ok, pid()} | ignore | {error, any()} when
|
-spec start_link(Id, TabId, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
|
||||||
Id :: pos_integer(),
|
Id :: pos_integer(),
|
||||||
TabId :: ets:tid().
|
TabId :: ets:tid(),
|
||||||
start_link(Id, TabId) ->
|
StatsFun :: fun().
|
||||||
gen_server:start_link(?MODULE, [Id, TabId], []).
|
start_link(Id, TabId, StatsFun) ->
|
||||||
|
gen_server:start_link(?MODULE, [Id, TabId, StatsFun], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Lookup client pid with clientId
|
%% @doc Lookup client pid with clientId
|
||||||
|
@ -92,9 +93,8 @@ unregister(ClientId) when is_binary(ClientId) ->
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([Id, TabId]) ->
|
init([Id, TabId, StatsFun]) ->
|
||||||
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
|
gproc_pool:connect_worker(?POOL, {?MODULE, Id}),
|
||||||
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
|
|
||||||
{ok, #state{tab = TabId, statsfun = StatsFun}}.
|
{ok, #state{tab = TabId, statsfun = StatsFun}}.
|
||||||
|
|
||||||
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
||||||
|
@ -110,7 +110,7 @@ handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) ->
|
||||||
[] ->
|
[] ->
|
||||||
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
|
ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)})
|
||||||
end,
|
end,
|
||||||
{reply, ok, State};
|
{reply, ok, setstats(State)};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:error("unexpected request: ~p", [Req]),
|
lager:error("unexpected request: ~p", [Req]),
|
||||||
|
@ -151,4 +151,3 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
setstats(State = #state{tab = TabId, statsfun = StatsFun}) ->
|
setstats(State = #state{tab = TabId, statsfun = StatsFun}) ->
|
||||||
StatsFun(ets:info(TabId, size)), State.
|
StatsFun(ets:info(TabId, size)), State.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_cm_sup).
|
-module(emqttd_cm_sup).
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
|
@ -49,12 +49,13 @@ init([]) ->
|
||||||
TabId = ets:new(?CLIENT_TAB, [set, named_table, public,
|
TabId = ets:new(?CLIENT_TAB, [set, named_table, public,
|
||||||
{write_concurrency, true}]),
|
{write_concurrency, true}]),
|
||||||
Schedulers = erlang:system_info(schedulers),
|
Schedulers = erlang:system_info(schedulers),
|
||||||
gproc_pool:new(cm, hash, [{size, Schedulers}]),
|
gproc_pool:new(cm_pool, hash, [{size, Schedulers}]),
|
||||||
|
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
|
||||||
Children = lists:map(
|
Children = lists:map(
|
||||||
fun(I) ->
|
fun(I) ->
|
||||||
Name = {emqttd_cm, I},
|
Name = {emqttd_cm, I},
|
||||||
gproc_pool:add_worker(cm, Name, I),
|
gproc_pool:add_worker(cm_pool, Name, I),
|
||||||
{Name, {emqttd_cm, start_link, [I, TabId]},
|
{Name, {emqttd_cm, start_link, [I, TabId, StatsFun]},
|
||||||
permanent, 10000, worker, [emqttd_cm]}
|
permanent, 10000, worker, [emqttd_cm]}
|
||||||
end, lists:seq(1, Schedulers)),
|
end, lists:seq(1, Schedulers)),
|
||||||
{ok, {{one_for_all, 10, 100}, Children}}.
|
{ok, {{one_for_all, 10, 100}, Children}}.
|
||||||
|
|
Loading…
Reference in New Issue