systopics

This commit is contained in:
Ery Lee 2015-03-08 15:04:38 +08:00
parent c7c7b597c5
commit 6f67127d77
4 changed files with 51 additions and 16 deletions

View File

@ -41,16 +41,17 @@
%% $SYS Topics of Clients
%%------------------------------------------------------------------------------
-define(SYSTOP_CLIENTS, [
'clients/connected', % ???
'clients/disconnected', % ???
'clients/total', % total clients connected current
'clients/max' % max clients connected
%'clients/connected',
%'clients/disconnected',
'clients/total', % total clients connected current
'clients/max' % max clients connected
]).
%%------------------------------------------------------------------------------
%% $SYS Topics of Subscribers
%%------------------------------------------------------------------------------
-define(SYSTOP_SUBSCRIBERS, [
-define(SYSTOP_PUBSUB, [
'topics/total', % ...
'subscribers/total', % ...
'subscribers/max' % ...
]).

View File

@ -77,10 +77,10 @@ init([Options]) ->
% Create $SYS Topics
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS],
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS],
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_SUBSCRIBERS],
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_PUBSUB],
ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]),
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_CLIENTS],
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_SUBSCRIBERS],
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_PUBSUB],
% retain version, description
retain(systop(version), list_to_binary(version())),
retain(systop(description), list_to_binary(description())),
@ -99,6 +99,10 @@ handle_cast(_Msg, State) ->
handle_info(tick, State) ->
publish(systop(uptime), list_to_binary(uptime(State))),
[publish(systop(Name), i2b(Val)) || {Name, Val} <- ets:tab2list(?TABLE)],
%%TODO... call emqtt_cm here?
[publish(systop(client, Stat), i2b(Val)) || {Stat, Val} <- emqtt_cm:stats()],
%%TODO... call emqtt_pubsub here?
[publish(systop(Stat), i2b(Val)) || {Stat, Val} <- emqtt_cm:stats()],
{noreply, tick(State)};
handle_info(_Info, State) ->
@ -114,6 +118,9 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Function Definitions
%% ------------------------------------------------------------------
systop(Prefix, Name) ->
systop(list_to_atom(lists:concat([Prefix, '/', Name]))).
systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
@ -154,3 +161,4 @@ tick(State = #state{sys_interval = SysInterval}) ->
i2b(I) when is_integer(I) ->
list_to_binary(integer_to_list(I)).

View File

@ -39,7 +39,7 @@
-export([lookup/1, register/2, unregister/2]).
-export([getstats/0]).
-export([stats/0]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
@ -84,17 +84,16 @@ register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
gen_server:cast(?SERVER, {unregister, ClientId, Pid}).
getstats(?SERVER) ->
gen_server:call(?SERVER, getstats).
stats() ->
gen_server:call(?SERVER, stats).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init([]) ->
%on one node
ets:new(?TABLE, [set, named_table, protected]),
{ok, none}.
{ok, #state{}}.
handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(?TABLE, ClientId) of
@ -108,9 +107,9 @@ handle_call({register, ClientId, Pid}, _From, State) ->
[] ->
insert(ClientId, Pid)
end,
{reply, ok, State};
{reply, ok, set_max(State)};
handle_call(getstats, _From, State = #state{max = Max}) ->
handle_call(stats, _From, State = #state{max = Max}) ->
Stats = [{total, ets:info(?TABLE, size)}, {max, Max}],
{reply, Stats, State};
@ -145,6 +144,16 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
insert(ClientId, Pid) ->
ets:insert(?TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}).
set_max(State = #state{max = Max}) ->
Total = ets:info(?TABLE, size),
if
Total > Max -> State#state{max = Total};
true -> State
end.

View File

@ -48,6 +48,8 @@
dispatch/2,
match/1]).
-export([stats/0]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
@ -77,7 +79,7 @@
%%----------------------------------------------------------------------------
-record(state, {}).
-record(state, {max_subs = 0}).
%% ------------------------------------------------------------------
%% API Function Definitions
@ -89,6 +91,9 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stats() ->
gen_server:call(?SERVER, stats).
%%
%% @doc All topics
%%
@ -173,6 +178,12 @@ init([]) ->
ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
{ok, #state{}}.
handle_call(stats, _From, State = #state{max_subs = Max}) ->
Stats = [{'topics/total', mnesia:table_info(topic, size)},
{'subscribers/total', ets:info(topic_subscriber, size)},
{'subscribers/max', Max}],
{reply, Stats, State};
handle_call({create, Topic}, _From, State) ->
Result = mnesia:transaction(fun trie_add/1, [Topic]),
{reply, Result , State};
@ -184,7 +195,7 @@ handle_call({subscribe, Topics, SubPid}, _From, State) ->
[] -> {ok, [Qos || {ok, Qos} <- Result]};
Errors -> hd(Errors)
end,
{reply, Reply, State};
{reply, Reply, set_maxsubs(State)};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
@ -356,3 +367,9 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
throw({notfound, NodeId})
end.
set_maxsubs(State = #state{max_subs = Max}) ->
Total = ets:info(topic_subscriber, size),
if
Total > Max -> State#state{max_subs = Total};
true -> State
end.