diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index cae57207f..73a1471ac 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,34 +14,39 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc emqttd statistics -module(emqttd_stats). --include("emqttd.hrl"). - -behaviour(gen_server). --define(SERVER, ?MODULE). +-author("Feng Lee "). + +-include("emqttd.hrl"). -export([start_link/0, stop/0]). -%% statistics API. --export([statsfun/1, statsfun/2, - getstats/0, getstat/1, - setstat/2, setstats/3]). +%% Client and Session Stats +-export([set_client_stats/2, get_client_stats/1, del_client_stats/1, + set_session_stats/2, get_session_stats/1, del_session_stats/1]). + +%% Statistics API. +-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstats/3]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tick_tref}). +-record(state, {tick}). + +-type(stats() :: list({atom(), non_neg_integer()})). -define(STATS_TAB, mqtt_stats). +-define(CLIENT_STATS_TAB, mqtt_client_stats). +-define(SESSION_STATS_TAB, mqtt_session_stats). %% $SYS Topics for Clients -define(SYSTOP_CLIENTS, [ - 'clients/count', % clients connected current - 'clients/max' % max clients connected + 'clients/count', % clients connected current + 'clients/max' % max clients connected ]). %% $SYS Topics for Sessions @@ -75,10 +80,40 @@ %% @doc Start stats server -spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). stop() -> - gen_server:call(?SERVER, stop). + gen_server:call(?MODULE, stop). + +-spec(set_client_stats(binary(), stats()) -> true). +set_client_stats(ClientId, Stats) -> + ets:insert(?CLIENT_STATS_TAB, {ClientId, [{'$ts', emqttd_time:now_secs()}|Stats]}). + +-spec(get_client_stats(binary()) -> stats()). +get_client_stats(ClientId) -> + case ets:lookup(?CLIENT_STATS_TAB, ClientId) of + [{_, Stats}] -> Stats; + [] -> [] + end. + +-spec(del_client_stats(binary()) -> true). +del_client_stats(ClientId) -> + ets:delete(?CLIENT_STATS_TAB, ClientId). + +-spec(set_session_stats(binary(), stats()) -> true). +set_session_stats(ClientId, Stats) -> + ets:insert(?SESSION_STATS_TAB, {ClientId, [{'$ts', emqttd_time:now_secs()}|Stats]}). + +-spec(get_session_stats(binary()) -> stats()). +get_session_stats(ClientId) -> + case ets:lookup(?SESSION_STATS_TAB, ClientId) of + [{_, Stats}] -> Stats; + [] -> [] + end. + +-spec(del_session_stats(binary()) -> true). +del_session_stats(ClientId) -> + ets:delete(?SESSION_STATS_TAB, ClientId). %% @doc Generate stats fun -spec(statsfun(Stat :: atom()) -> fun()). @@ -118,13 +153,14 @@ setstats(Stat, MaxStat, Val) -> init([]) -> emqttd_time:seed(), - ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), + lists:foreach( + fun(Tab) -> + Tab = ets:new(Tab, [set, public, named_table, {write_concurrency, true}]) + end, [?STATS_TAB, ?CLIENT_STATS_TAB, ?SESSION_STATS_TAB]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), - % Create $SYS Topics - % [ok = emqttd:create(topic, stats_topic(Topic)) || Topic <- Topics], % Tick to publish stats - {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. + {ok, #state{tick = emqttd_broker:start_tick(tick)}, hibernate}. handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -154,7 +190,7 @@ handle_info(tick, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{tick_tref = TRef}) -> +terminate(_Reason, #state{tick = TRef}) -> emqttd_broker:stop_tick(TRef). code_change(_OldVsn, State, _Extra) ->