diff --git a/apps/emqtt/include/emqtt_systop.hrl b/apps/emqtt/include/emqtt_systop.hrl index e6615e5a5..f2f4c01be 100644 --- a/apps/emqtt/include/emqtt_systop.hrl +++ b/apps/emqtt/include/emqtt_systop.hrl @@ -41,16 +41,17 @@ %% $SYS Topics of Clients %%------------------------------------------------------------------------------ -define(SYSTOP_CLIENTS, [ - 'clients/connected', % ??? - 'clients/disconnected', % ??? - 'clients/total', % total clients connected current - 'clients/max' % max clients connected + %'clients/connected', + %'clients/disconnected', + 'clients/total', % total clients connected current + 'clients/max' % max clients connected ]). %%------------------------------------------------------------------------------ %% $SYS Topics of Subscribers %%------------------------------------------------------------------------------ --define(SYSTOP_SUBSCRIBERS, [ +-define(SYSTOP_PUBSUB, [ + 'topics/total', % ... 'subscribers/total', % ... 'subscribers/max' % ... ]). diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl index bc97fae81..2c809b0c6 100644 --- a/apps/emqtt/src/emqtt_broker.erl +++ b/apps/emqtt/src/emqtt_broker.erl @@ -77,10 +77,10 @@ init([Options]) -> % Create $SYS Topics [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS], [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS], - [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_SUBSCRIBERS], + [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_PUBSUB], ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]), [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_CLIENTS], - [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_SUBSCRIBERS], + [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_PUBSUB], % retain version, description retain(systop(version), list_to_binary(version())), retain(systop(description), list_to_binary(description())), @@ -99,6 +99,10 @@ handle_cast(_Msg, State) -> handle_info(tick, State) -> publish(systop(uptime), list_to_binary(uptime(State))), [publish(systop(Name), i2b(Val)) || {Name, Val} <- ets:tab2list(?TABLE)], + %%TODO... call emqtt_cm here? + [publish(systop(client, Stat), i2b(Val)) || {Stat, Val} <- emqtt_cm:stats()], + %%TODO... call emqtt_pubsub here? + [publish(systop(Stat), i2b(Val)) || {Stat, Val} <- emqtt_cm:stats()], {noreply, tick(State)}; handle_info(_Info, State) -> @@ -114,6 +118,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Function Definitions %% ------------------------------------------------------------------ +systop(Prefix, Name) -> + systop(list_to_atom(lists:concat([Prefix, '/', Name]))). + systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). @@ -154,3 +161,4 @@ tick(State = #state{sys_interval = SysInterval}) -> i2b(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). + diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index d4089eebd..dbfee80fa 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -39,7 +39,7 @@ -export([lookup/1, register/2, unregister/2]). --export([getstats/0]). +-export([stats/0]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -84,17 +84,16 @@ register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> gen_server:cast(?SERVER, {unregister, ClientId, Pid}). -getstats(?SERVER) -> - gen_server:call(?SERVER, getstats). +stats() -> + gen_server:call(?SERVER, stats). %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ init([]) -> - %on one node ets:new(?TABLE, [set, named_table, protected]), - {ok, none}. + {ok, #state{}}. handle_call({register, ClientId, Pid}, _From, State) -> case ets:lookup(?TABLE, ClientId) of @@ -108,9 +107,9 @@ handle_call({register, ClientId, Pid}, _From, State) -> [] -> insert(ClientId, Pid) end, - {reply, ok, State}; + {reply, ok, set_max(State)}; -handle_call(getstats, _From, State = #state{max = Max}) -> +handle_call(stats, _From, State = #state{max = Max}) -> Stats = [{total, ets:info(?TABLE, size)}, {max, Max}], {reply, Stats, State}; @@ -145,6 +144,16 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ + insert(ClientId, Pid) -> ets:insert(?TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}). +set_max(State = #state{max = Max}) -> + Total = ets:info(?TABLE, size), + if + Total > Max -> State#state{max = Total}; + true -> State + end. diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index fd5afe7d8..18848cb4f 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -48,6 +48,8 @@ dispatch/2, match/1]). +-export([stats/0]). + %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ @@ -77,7 +79,7 @@ %%---------------------------------------------------------------------------- --record(state, {}). +-record(state, {max_subs = 0}). %% ------------------------------------------------------------------ %% API Function Definitions @@ -89,6 +91,9 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +stats() -> + gen_server:call(?SERVER, stats). + %% %% @doc All topics %% @@ -173,6 +178,12 @@ init([]) -> ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), {ok, #state{}}. +handle_call(stats, _From, State = #state{max_subs = Max}) -> + Stats = [{'topics/total', mnesia:table_info(topic, size)}, + {'subscribers/total', ets:info(topic_subscriber, size)}, + {'subscribers/max', Max}], + {reply, Stats, State}; + handle_call({create, Topic}, _From, State) -> Result = mnesia:transaction(fun trie_add/1, [Topic]), {reply, Result , State}; @@ -184,7 +195,7 @@ handle_call({subscribe, Topics, SubPid}, _From, State) -> [] -> {ok, [Qos || {ok, Qos} <- Result]}; Errors -> hd(Errors) end, - {reply, Reply, State}; + {reply, Reply, set_maxsubs(State)}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. @@ -356,3 +367,9 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> throw({notfound, NodeId}) end. +set_maxsubs(State = #state{max_subs = Max}) -> + Total = ets:info(topic_subscriber, size), + if + Total > Max -> State#state{max_subs = Total}; + true -> State + end.