This commit is contained in:
Ery Lee 2015-04-20 04:31:52 +08:00
parent eff6bed994
commit fb8833bb86
4 changed files with 34 additions and 32 deletions

View File

@ -34,7 +34,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(BROKER_TABLE, mqtt_broker). -define(BROKER_TAB, mqtt_broker).
%% API Function Exports %% API Function Exports
-export([start_link/1]). -export([start_link/1]).
@ -115,7 +115,7 @@ datetime() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec getstats() -> [{atom(), non_neg_integer()}]. -spec getstats() -> [{atom(), non_neg_integer()}].
getstats() -> getstats() ->
ets:tab2list(?BROKER_TABLE). ets:tab2list(?BROKER_TAB).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -125,7 +125,7 @@ getstats() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec getstat(atom()) -> non_neg_integer() | undefined. -spec getstat(atom()) -> non_neg_integer() | undefined.
getstat(Name) -> getstat(Name) ->
case ets:lookup(?BROKER_TABLE, Name) of case ets:lookup(?BROKER_TAB, Name) of
[{Name, Val}] -> Val; [{Name, Val}] -> Val;
[] -> undefined [] -> undefined
end. end.
@ -138,7 +138,7 @@ getstat(Name) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean(). -spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean().
setstat(Stat, Val) -> setstat(Stat, Val) ->
ets:update_element(?BROKER_TABLE, Stat, {2, Val}). ets:update_element(?BROKER_TAB, Stat, {2, Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -148,13 +148,13 @@ setstat(Stat, Val) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean(). -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
setstats(Stat, MaxStat, Val) -> setstats(Stat, MaxStat, Val) ->
MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2), MaxVal = ets:lookup_element(?BROKER_TAB, MaxStat, 2),
if if
Val > MaxVal -> Val > MaxVal ->
ets:update_element(?BROKER_TABLE, MaxStat, {2, Val}); ets:update_element(?BROKER_TAB, MaxStat, {2, Val});
true -> ok true -> ok
end, end,
ets:update_element(?BROKER_TABLE, Stat, {2, Val}). ets:update_element(?BROKER_TAB, Stat, {2, Val}).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -162,9 +162,9 @@ setstats(Stat, MaxStat, Val) ->
init([Options]) -> init([Options]) ->
random:seed(now()), random:seed(now()),
ets:new(?BROKER_TABLE, [set, public, named_table, {write_concurrency, true}]), ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
[ets:insert(?BROKER_TABLE, {Topic, 0}) || Topic <- Topics], [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics],
% Create $SYS Topics % Create $SYS Topics
[ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
[ok = create(systop(Topic)) || Topic <- Topics], [ok = create(systop(Topic)) || Topic <- Topics],
@ -191,7 +191,7 @@ handle_info(tick, State) ->
publish(systop(uptime), list_to_binary(uptime(State))), publish(systop(uptime), list_to_binary(uptime(State))),
publish(systop(datetime), list_to_binary(datetime())), publish(systop(datetime), list_to_binary(datetime())),
[publish(systop(Stat), i2b(Val)) [publish(systop(Stat), i2b(Val))
|| {Stat, Val} <- ets:tab2list(?BROKER_TABLE)], || {Stat, Val} <- ets:tab2list(?BROKER_TAB)],
{noreply, tick(State), hibernate}; {noreply, tick(State), hibernate};
handle_info(_Info, State) -> handle_info(_Info, State) ->

View File

@ -50,7 +50,7 @@
-record(state, {tab}). -record(state, {tab}).
-define(CLIENT_TABLE, mqtt_client). -define(CLIENT_TAB, mqtt_client).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
@ -74,7 +74,7 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined. -spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) when is_binary(ClientId) -> lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(?CLIENT_TABLE, ClientId) of case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> Pid; [{_, Pid, _}] -> Pid;
[] -> undefined [] -> undefined
end. end.
@ -87,7 +87,7 @@ lookup(ClientId) when is_binary(ClientId) ->
register(ClientId) when is_binary(ClientId) -> register(ClientId) when is_binary(ClientId) ->
Pid = self(), Pid = self(),
%% this is atomic %% this is atomic
case ets:insert_new(?CLIENT_TABLE, {ClientId, Pid, undefined}) of case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of
true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid}); true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid});
false -> gen_server:cast(?SERVER, {register, ClientId, Pid}) false -> gen_server:cast(?SERVER, {register, ClientId, Pid})
end. end.
@ -117,7 +117,7 @@ getstats() ->
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
TabId = ets:new(?CLIENT_TABLE, [set, TabId = ets:new(?CLIENT_TAB, [set,
named_table, named_table,
public, public,
{write_concurrency, true}]), {write_concurrency, true}]),
@ -144,10 +144,10 @@ handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) ->
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_cast({unregister, ClientId, Pid}, State) -> handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?CLIENT_TABLE, ClientId) of case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, MRef}] -> [{_, Pid, MRef}] ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
ets:delete(?CLIENT_TABLE, ClientId); ets:delete(?CLIENT_TAB, ClientId);
[_] -> [_] ->
ignore; ignore;
[] -> [] ->
@ -159,7 +159,7 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(?CLIENT_TABLE, {'_', DownPid, MRef}), ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}),
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_info(_Info, State) -> handle_info(_Info, State) ->
@ -191,6 +191,6 @@ registerd(Tab, {ClientId, Pid}) ->
setstats(State) -> setstats(State) ->
emqttd_broker:setstats('clients/count', emqttd_broker:setstats('clients/count',
'clients/max', 'clients/max',
ets:info(?CLIENT_TABLE, size)), State. ets:info(?CLIENT_TAB, size)), State.

View File

@ -36,7 +36,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(METRIC_TABLE, mqtt_broker_metric). -define(METRIC_TAB, mqtt_broker_metric).
%% API Function Exports %% API Function Exports
-export([start_link/1]). -export([start_link/1]).
@ -81,7 +81,7 @@ all() ->
{ok, Count} -> maps:put(Metric, Count+Val, Map); {ok, Count} -> maps:put(Metric, Count+Val, Map);
error -> maps:put(Metric, Val, Map) error -> maps:put(Metric, Val, Map)
end end
end, #{}, ?METRIC_TABLE)). end, #{}, ?METRIC_TAB)).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -91,7 +91,7 @@ all() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec value(atom()) -> non_neg_integer(). -spec value(atom()) -> non_neg_integer().
value(Metric) -> value(Metric) ->
lists:sum(ets:select(?METRIC_TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -125,9 +125,9 @@ inc(Metric, Val) when is_atom(Metric) and is_integer(Val) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer(). -spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
inc(gauge, Metric, Val) -> inc(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, Val}); ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val});
inc(counter, Metric, Val) -> inc(counter, Metric, Val) ->
ets:update_counter(?METRIC_TABLE, key(counter, Metric), {2, Val}). ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -147,7 +147,7 @@ dec(gauge, Metric) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec dec(gauge, atom(), pos_integer()) -> integer(). -spec dec(gauge, atom(), pos_integer()) -> integer().
dec(gauge, Metric, Val) -> dec(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TABLE, key(gauge, Metric), {2, -Val}). ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -158,7 +158,7 @@ dec(gauge, Metric, Val) ->
set(Metric, Val) when is_atom(Metric) -> set(Metric, Val) when is_atom(Metric) ->
set(gauge, Metric, Val). set(gauge, Metric, Val).
set(gauge, Metric, Val) -> set(gauge, Metric, Val) ->
ets:insert(?METRIC_TABLE, {key(gauge, Metric), Val}). ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -180,7 +180,7 @@ init([Options]) ->
random:seed(now()), random:seed(now()),
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% Create metrics table % Create metrics table
ets:new(?METRIC_TABLE, [set, public, named_table, {write_concurrency, true}]), ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
% Init metrics % Init metrics
[new_metric(Metric) || Metric <- Metrics], [new_metric(Metric) || Metric <- Metrics],
% $SYS Topics for metrics % $SYS Topics for metrics
@ -224,11 +224,11 @@ publish(Topic, Payload) ->
payload = Payload}). payload = Payload}).
new_metric({gauge, Name}) -> new_metric({gauge, Name}) ->
ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); ets:insert(?METRIC_TAB, {{Name, 0}, 0});
new_metric({counter, Name}) -> new_metric({counter, Name}) ->
Schedulers = lists:seq(1, erlang:system_info(schedulers)), Schedulers = lists:seq(1, erlang:system_info(schedulers)),
[ets:insert(?METRIC_TABLE, {{Name, I}, 0}) || I <- Schedulers]. [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
tick(State = #state{pub_interval = PubInterval}) -> tick(State = #state{pub_interval = PubInterval}) ->
tick(PubInterval, State). tick(PubInterval, State).

View File

@ -44,7 +44,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(SESSION_TABLE, mqtt_session). -define(SESSION_TAB, mqtt_session).
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
@ -72,7 +72,7 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup_session(binary()) -> pid() | undefined. -spec lookup_session(binary()) -> pid() | undefined.
lookup_session(ClientId) -> lookup_session(ClientId) ->
case ets:lookup(?SESSION_TABLE, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _}] -> SessPid; [{_, SessPid, _}] -> SessPid;
[] -> undefined [] -> undefined
end. end.
@ -103,7 +103,7 @@ destroy_session(ClientId) ->
init([]) -> init([]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
TabId = ets:new(?SESSION_TABLE, [set, protected, named_table]), TabId = ets:new(?SESSION_TAB, [set, protected, named_table]),
{ok, #state{tab = TabId}}. {ok, #state{tab = TabId}}.
handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) -> handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tab = Tab}) ->
@ -157,8 +157,10 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================= %%%=============================================================================
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
setstats(State) -> setstats(State) ->
emqttd_broker:setstats('sessions/count', emqttd_broker:setstats('sessions/count',
'sessions/max', 'sessions/max',
ets:info(?SESSION_TABLE, size)), State. ets:info(?SESSION_TAB, size)), State.