diff --git a/apps/emqtt/include/emqtt_topic.hrl b/apps/emqtt/include/emqtt_topic.hrl index cb1b019b7..fc7dc1a2d 100644 --- a/apps/emqtt/include/emqtt_topic.hrl +++ b/apps/emqtt/include/emqtt_topic.hrl @@ -25,7 +25,8 @@ %%------------------------------------------------------------------------------ -record(topic, { name :: binary(), - node :: node() + type :: static | dynamic | bridge + node :: node(), }). -type topic() :: #topic{}. @@ -58,3 +59,32 @@ %%------------------------------------------------------------------------------ -define(SYSTOP, <<"$SYS">>). +-define(SYSTOP_BROKER, [ + % $SYS Broker Topics + <<"$SYS/broker/version">>, + <<"$SYS/broker/uptime">>, + <<"$SYS/broker/description">>, + <<"$SYS/broker/timestamp">>, + % $SYS Client Topics + <<"$SYS/broker/clients/connected">>, + <<"$SYS/broker/clients/disconnected">>, + <<"$SYS/broker/clients/total">>, + <<"$SYS/broker/clients/max">>, + % $SYS Subscriber Topics + <<"$SYS/broker/subscribers/total">>, + <<"$SYS/broker/subscribers/max">>]). + +-define(SYSTOP_METRICS, [ + % Bytes sent and received + <<"$SYS/broker/bytes/received">>, + <<"$SYS/broker/bytes/sent">>, + % Packets sent and received + <<"$SYS/broker/packets/received">>, + <<"$SYS/broker/packets/sent">>, + % Messges sent and received + <<"$SYS/broker/messages/received">>, + <<"$SYS/broker/messages/sent">>, + <<"$SYS/broker/messages/retained">>, + <<"$SYS/broker/messages/stored">>, + <<"$SYS/broker/messages/dropped">>]). + diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl index e4298a82e..e6d7fa48e 100644 --- a/apps/emqtt/src/emqtt_broker.erl +++ b/apps/emqtt/src/emqtt_broker.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqtt_broker). +-include("emqtt_topic.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -45,24 +47,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {started_at}). +-record(state, {started_at, sys_interval}). --define(SYS_TOPICS, [ - % $SYS Broker Topics - <<"$SYS/broker/version">>, - <<"$SYS/broker/uptime">>, - <<"$SYS/broker/description">>, - <<"$SYS/broker/timestamp">>, - - % $SYS Client Topics - <<"$SYS/broker/clients/connected">>, - <<"$SYS/broker/clients/disconnected">>, - <<"$SYS/broker/clients/total">>, - <<"$SYS/broker/clients/max">>, - - % $SYS Subscriber Topics - <<"$SYS/broker/subscribers/total">>, - <<"$SYS/broker/subscribers/max">>]). %% ------------------------------------------------------------------ %% API Function Definitions @@ -86,14 +72,15 @@ uptime() -> %% gen_server Function Definitions %% ------------------------------------------------------------------ init([Options]) -> + SysInterval = proplists:get_value(sys_interval, Options, 60), % Create $SYS Topics - [emqtt_pubsub:create(Topic) || Topic <- ?SYS_TOPICS], + [emqtt_pubsub:create(systop(Topic)) || Topic <- ?SYSTOP_BROKER], ets:new(?MODULE, [set, public, name_table, {write_concurrency, true}]), - {ok, #state{started_at = os:timestamp()}}. + {ok, #state{started_at = os:timestamp(), sys_interval = SysInterval}}. handle_call(uptime, _From, State = #state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, - {reply, format(seconds, Secs), State}; + {reply, uptime(seconds, Secs), State}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -113,9 +100,22 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ -format(seconds, Secs) when Secs < 60 -> - integer_to_list - <<(integer_to_list(Secs), +systop(Topic) -> + <<"$SYS/broker/", Topic/binary>>. + +uptime(seconds, Secs) when Secs < 60 -> + [integer_to_list(Secs), " seconds"]; +uptime(seconds, Secs) -> + [uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"]; +uptime(minutes, M) when M < 60 -> + [integer_to_list(M), " minutes, "]; +uptime(minutes, M) -> + [uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "]; +uptime(hours, H) when H < 24 -> + [integer_to_list(H), " hours, "]; +uptime(hours, H) -> + [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "]; +uptime(days, D) -> + [integer_to_list(D), " days,"]. - diff --git a/apps/emqtt/src/emqtt_metrics.erl b/apps/emqtt/src/emqtt_metrics.erl index dde8087cb..8b9ea662a 100644 --- a/apps/emqtt/src/emqtt_metrics.erl +++ b/apps/emqtt/src/emqtt_metrics.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqtt_metrics). +-include("emqtt_topic.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -38,8 +40,7 @@ -export([start_link/0]). --export([get_metrics/0, inc/1, inc/2]). - +-export([get_all/0, get_value/1, inc/1, dec/2]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -55,8 +56,23 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -get_metrics() -> - gen_server:call(?SERVER, get_metrics). +get_all() -> + gen_server:call(?SERVER, get_all). + +get_value(Metric) -> + gen_server:call(?SERVER, {get_value, Metric}). + +inc(Metric) -> + ok. + +inc(Metric, Val) -> + ok. + +dec(Metric) -> + ok. + +dec(Metric, Val) -> + ok. %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -64,7 +80,7 @@ get_metrics() -> init(_Args) -> % Bytes sent and received - emqtt_pubsub:create(<<"$SYS/broker/version">>), + [ok = emqtt_pubsub:create(<<"$SYS/broker/", Topic/binary>>) || Topic <- ?SYSTOP_METRICS], % $SYS/broker/version %## Uptime % $SYS/broker/uptime diff --git a/apps/emqtt/src/emqtt_vm.erl b/apps/emqtt/src/emqtt_vm.erl index e69de29bb..d2270a9b0 100644 --- a/apps/emqtt/src/emqtt_vm.erl +++ b/apps/emqtt/src/emqtt_vm.erl @@ -0,0 +1,23 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2012-2015, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_vm).