TABLE -> METRIC_TAB

This commit is contained in:
Ery Lee 2015-03-09 14:05:40 +08:00
parent e2c1eda808
commit a8a7fd0299
1 changed files with 24 additions and 20 deletions

View File

@ -34,29 +34,32 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(TABLE, ?MODULE). -define(METRIC_TAB, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports %% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/1]). -export([start_link/1]).
-export([all/0, value/1, -export([all/0, value/1,
inc/1, inc/2, inc/1, inc/2,
dec/1, dec/2]). dec/1, dec/2]).
%% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {pub_interval, tick_timer}). -record(state, {pub_interval, tick_timer}).
%% ------------------------------------------------------------------ %%%=============================================================================
%% API Function Definitions %%% API
%% ------------------------------------------------------------------ %%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start emqtt metrics.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
start_link(Options) -> start_link(Options) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
@ -75,7 +78,7 @@ all() ->
{ok, Count} -> maps:put(Metric, Count+Val, Map); {ok, Count} -> maps:put(Metric, Count+Val, Map);
error -> maps:put(Metric, Val, Map) error -> maps:put(Metric, Val, Map)
end end
end, #{}, ?TABLE)). end, #{}, ?METRIC_TAB)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -85,7 +88,7 @@ all() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec value(atom()) -> non_neg_integer(). -spec value(atom()) -> non_neg_integer().
value(Metric) -> value(Metric) ->
lists:sum(ets:select(?TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -105,7 +108,7 @@ inc(Metric) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec inc(atom(), pos_integer()) -> pos_integer(). -spec inc(atom(), pos_integer()) -> pos_integer().
inc(Metric, Val) -> inc(Metric, Val) ->
ets:update_counter(?TABLE, key(Metric), {2, Val}). ets:update_counter(?METRIC_TAB, key(Metric), {2, Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -126,7 +129,7 @@ dec(Metric) ->
-spec dec(atom(), pos_integer()) -> integer(). -spec dec(atom(), pos_integer()) -> integer().
dec(Metric, Val) -> dec(Metric, Val) ->
%TODO: ok? %TODO: ok?
ets:update_counter(?TABLE, key(Metric), {2, -Val}). ets:update_counter(?METRIC_TAB, key(Metric), {2, -Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -141,15 +144,16 @@ key(Metric) ->
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init(Options) -> init(Options) ->
random:seed(now()), random:seed(now()),
Topics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, Topics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% $SYS Topics for metrics
[{atomic, _} = emqtt_pubsub:create(systop(Topic)) || Topic <- Topics],
% Create metrics table % Create metrics table
ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]), ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
% Init metrics % Init metrics
[new_metric(Topic) || Topic <- Topics], [new_metric(Topic) || Topic <- Topics],
% $SYS Topics for metrics
[{atomic, _} = emqtt_pubsub:create(systop(Topic)) || Topic <- Topics],
PubInterval = proplists:get_value(pub_interval, Options, 60), PubInterval = proplists:get_value(pub_interval, Options, 60),
{ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}. {ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}.
@ -173,9 +177,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%% ------------------------------------------------------------------ %%%=============================================================================
%% Internal Function Definitions %%% Internal functions
%% ------------------------------------------------------------------ %%%=============================================================================
systop(Name) when is_atom(Name) -> systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
@ -185,7 +189,7 @@ publish(Topic, Payload) ->
new_metric(Name) -> new_metric(Name) ->
Schedulers = lists:seq(1, erlang:system_info(schedulers)), Schedulers = lists:seq(1, erlang:system_info(schedulers)),
[ets:insert(?TABLE, {{Name, I}, 0}) || I <- Schedulers]. [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
tick(State = #state{pub_interval = PubInterval}) -> tick(State = #state{pub_interval = PubInterval}) ->
tick(PubInterval, State). tick(PubInterval, State).