diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl index 681f7bc7f..04b44d2ef 100644 --- a/apps/emqtt/src/emqtt_broker.erl +++ b/apps/emqtt/src/emqtt_broker.erl @@ -26,6 +26,8 @@ %%%----------------------------------------------------------------------------- -module(emqtt_broker). +-include("emqtt_packet.hrl"). + -include("emqtt_topic.hrl"). -behaviour(gen_server). @@ -47,8 +49,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {started_at, sys_interval}). - +-record(state, {started_at, sys_interval, tick_timer}). %% ------------------------------------------------------------------ %% API Function Definitions @@ -72,9 +73,10 @@ uptime() -> init([Options]) -> SysInterval = proplists:get_value(sys_interval, Options, 60), % Create $SYS Topics - [emqtt_pubsub:create(systop(Topic)) || Topic <- ?SYSTOP_BROKER], - ets:new(?MODULE, [set, public, name_table, {write_concurrency, true}]), - {ok, #state{started_at = os:timestamp(), sys_interval = SysInterval}}. + [emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER], + ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]), + State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, + {ok, tick(State)}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; @@ -85,6 +87,12 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(tick, State) -> + publish(true, <<"$SYS/broker/version">>, version()), + publish(false, <<"$SYS/broker/uptime">>, uptime(State)), + publish(true, <<"$SYS/broker/description">>, description()), + {noreply, tick(State)}; + handle_info(_Info, State) -> {noreply, State}. @@ -97,12 +105,15 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ -systop(Topic) -> - <<"$SYS/broker/", Topic/binary>>. +publish(Retain, Topic, Payload) when is_list(Payload) -> + publish(Retain, Topic, list_to_binary(Payload)); + +publish(Retain, Topic, Payload) when is_binary(Payload) -> + emqtt_router:route(#mqtt_message{retain = Retain, topic = Topic, payload = Payload}). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, - uptime(seconds, Secs). + lists:flatten(uptime(seconds, Secs)). uptime(seconds, Secs) when Secs < 60 -> [integer_to_list(Secs), " seconds"]; @@ -119,4 +130,6 @@ uptime(hours, H) -> uptime(days, D) -> [integer_to_list(D), " days,"]. +tick(State = #state{sys_interval = SysInterval}) -> + State#state{tick_timer = erlang:send_after(SysInterval * 1000, self(), tick)}. diff --git a/apps/emqtt/src/emqtt_metrics.erl b/apps/emqtt/src/emqtt_metrics.erl index ca69551a3..2a24faa95 100644 --- a/apps/emqtt/src/emqtt_metrics.erl +++ b/apps/emqtt/src/emqtt_metrics.erl @@ -38,9 +38,11 @@ %% API Function Exports %% ------------------------------------------------------------------ --export([start_link/0]). +-export([start_link/1]). --export([get_all/0, get_value/1, inc/1, dec/2]). +-export([all/0, value/1, + inc/1, inc/2, + dec/1, dec/2]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -48,46 +50,55 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). +-record(state, {pub_interval, tick_timer}). %% ------------------------------------------------------------------ %% API Function Definitions %% ------------------------------------------------------------------ -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(Options) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). -get_all() -> - gen_server:call(?SERVER, get_all). +all() -> + maps:to_list( + lists:foldl( + fun({{Metric, _N}, Val}, Map) -> + case maps:find(Metric, Map) of + {ok, Count} -> maps:put(Metric, Count+Val); + error -> maps:put(Metric, 0) + end + end, #{}, ets:tab2list(?TABLE))). -get_value(Metric) -> - gen_server:call(?SERVER, {get_value, Metric}). +value(Metric) -> + lists:sum(ets:select(?TABLE, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). inc(Metric) -> - ok. + inc(Metric, 1). inc(Metric, Val) -> - ok. + ets:update_counter(?TABLE, key(Metric), {2, Val}). dec(Metric) -> - ok. + dec(Metric, 1). dec(Metric, Val) -> - ok. + %TODO: ok? + ets:update_counter(?TABLE, key(Metric), {2, -Val}). + +key(Metric) -> + {Metric, erlang:system_info(scheduler_id)}. %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ - -init(_Args) -> - % Bytes sent and received +init(Options) -> + % $SYS Topics for metrics [ok = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS], - % $SYS/broker/version - %## Uptime - % $SYS/broker/uptime - % $SYS/broker/clients/connected - % $SYS/broker/clients/disconnected + % Create metrics table ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]), - {ok, #state{}}. + % Init metrics + [new(Metric) || <<"$SYS/broker/", Metric/binary>> <- ?SYSTOP_METRICS], + PubInterval = proplists:get_value(pub_interval, Options, 60), + {ok, tick(#state{pub_interval = PubInterval})}. handle_call(get_metrics, _From, State) -> {reply, [], State}; @@ -98,6 +109,11 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info(tick, State) -> + %TODO:... + % publish metric message + {noreply, tick(State)}; + handle_info(_Info, State) -> {noreply, State}. @@ -111,3 +127,11 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Function Definitions %% ------------------------------------------------------------------ +new(Metric) -> + Key = list_to_tuple([list_to_atom(binary_to_list(Token)) + || Token <- binary:split(Metric, <<"/">>, [global])]), + [ets:insert(?TABLE, {{Key, N}, 0}) || N <- lists:seq(1, erlang:system_info(schedulers))]. + +tick(State = #state{pub_interval = PubInterval}) -> + State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}. + diff --git a/rel/files/app.config b/rel/files/app.config index 399e43c25..323a91701 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -52,6 +52,9 @@ {broker, [ {sys_interval, 60} ]}, + {metrics, [ + {pub_interval, 60} + ]}, {listen, [ {mqtt, 1883, [ {backlog, 512},