system topics

This commit is contained in:
Feng Lee 2015-03-08 12:23:44 +08:00
parent 0f92b73b37
commit cff100f706
4 changed files with 143 additions and 58 deletions

View File

@ -0,0 +1,96 @@
%%%-----------------------------------------------------------------------------
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt system topics.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-define(SYSTOP, <<"$SYS">>).
%%------------------------------------------------------------------------------
%% $SYS Topics of Broker
%%------------------------------------------------------------------------------
-define(SYSTOP_BROKERS, [
version, % Broker version
uptime, % Broker uptime
timestamp, % Broker timestamp
description % Broker description
]).
%%------------------------------------------------------------------------------
%% $SYS Topics of Clients
%%------------------------------------------------------------------------------
-define(SYSTOP_CLIENTS, [
'clients/connected', % ???
'clients/disconnected', % ???
'clients/total', % total clients connected current
'clients/max' % max clients connected
]).
%%------------------------------------------------------------------------------
%% $SYS Topics of Subscribers
%%------------------------------------------------------------------------------
-define(SYSTOP_SUBSCRIBERS, [
'subscribers/total', % ...
'subscribers/max' % ...
]).
%%------------------------------------------------------------------------------
%% Bytes sent and received of Broker
%%------------------------------------------------------------------------------
-define(SYSTOP_BYTES, [
'bytes/received', % Total bytes received
'bytes/sent' % Total bytes sent
]).
%%------------------------------------------------------------------------------
%% Packets sent and received of Broker
%%------------------------------------------------------------------------------
-define(SYSTOP_PACKETS, [
'packets/received', % All Packets received
'packets/sent', % All Packets sent
'packets/connect', % CONNECT Packets received
'packets/connack', % CONNACK Packets sent
'packets/publish/received', % PUBLISH packets received
'packets/publish/sent', % PUBLISH packets sent
'packets/subscribe', % SUBSCRIBE Packets received
'packets/suback', % SUBACK packets sent
'packets/unsubscribe', % UNSUBSCRIBE Packets received
'packets/unsuback', % UNSUBACK Packets sent
'packets/pingreq', % PINGREQ packets received
'packets/pingresp', % PINGRESP Packets sent
'packets/disconnect' % DISCONNECT Packets received
]).
%%------------------------------------------------------------------------------
%% Messages sent and received of broker
%%------------------------------------------------------------------------------
-define(SYSTOP_MESSAGES, [
'messages/received', % Messages received
'messages/sent', % Messages sent
'messages/retained', % Messagea retained
'messages/stored', % Messages stored
'messages/dropped' % Messages dropped
]).

View File

@ -53,37 +53,3 @@
node_id :: binary() | atom()
}).
%%------------------------------------------------------------------------------
%% System Topic
%%------------------------------------------------------------------------------
-define(SYSTOP, <<"$SYS">>).
-define(SYSTOP_BROKER, [
% $SYS Broker Topics
<<"$SYS/broker/version">>,
<<"$SYS/broker/uptime">>,
<<"$SYS/broker/description">>,
<<"$SYS/broker/timestamp">>,
% $SYS Client Topics
<<"$SYS/broker/clients/connected">>,
<<"$SYS/broker/clients/disconnected">>,
<<"$SYS/broker/clients/total">>,
<<"$SYS/broker/clients/max">>,
% $SYS Subscriber Topics
<<"$SYS/broker/subscribers/total">>,
<<"$SYS/broker/subscribers/max">>]).
-define(SYSTOP_METRICS, [
% Bytes sent and received
<<"$SYS/broker/bytes/received">>,
<<"$SYS/broker/bytes/sent">>,
% Packets sent and received
<<"$SYS/broker/packets/received">>,
<<"$SYS/broker/packets/sent">>,
% Messges sent and received
<<"$SYS/broker/messages/received">>,
<<"$SYS/broker/messages/sent">>,
<<"$SYS/broker/messages/retained">>,
<<"$SYS/broker/messages/stored">>,
<<"$SYS/broker/messages/dropped">>]).

View File

@ -28,12 +28,14 @@
-include("emqtt_packet.hrl").
-include("emqtt_topic.hrl").
-include("emqtt_systop.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(TABLE, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
@ -73,8 +75,15 @@ uptime() ->
init([Options]) ->
SysInterval = proplists:get_value(sys_interval, Options, 60),
% Create $SYS Topics
[{atomic, _} = emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER],
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS],
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS],
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_SUBSCRIBERS],
ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]),
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_CLIENTS],
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_SUBSCRIBERS],
% retain version, description
retain(systop(version), version()),
retain(systop(description), description()),
State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
{ok, tick(State)}.
@ -88,9 +97,8 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(tick, State) ->
publish(true, <<"$SYS/broker/version">>, version()),
publish(false, <<"$SYS/broker/uptime">>, uptime(State)),
publish(true, <<"$SYS/broker/description">>, description()),
publish(systop(uptime), uptime(State)),
[publish(systop(Name), i2b(Val)) || {Name, Val} <- ets:tab2list(?TABLE)],
{noreply, tick(State)};
handle_info(_Info, State) ->
@ -105,11 +113,21 @@ code_change(_OldVsn, State, _Extra) ->
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
publish(Retain, Topic, Payload) when is_list(Payload) ->
publish(Retain, Topic, list_to_binary(Payload));
publish(Retain, Topic, Payload) when is_binary(Payload) ->
emqtt_router:route(#mqtt_message{retain = Retain, topic = Topic, payload = Payload}).
systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
create(Topic) ->
emqtt_pubsub:create(Topic).
retain(Topic, Payload) when is_list(Payload) ->
emqtt_router:route(#mqtt_message{retain = true,
topic = Topic,
payload = Payload}).
publish(Topic, Payload) when is_binary(Payload) ->
emqtt_router:route(#mqtt_message{topic = Topic,
payload = Payload}).
uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
@ -133,3 +151,6 @@ uptime(days, D) ->
tick(State = #state{sys_interval = SysInterval}) ->
State#state{tick_timer = erlang:send_after(SysInterval * 1000, self(), tick)}.
i2b(I) when is_integer(I) ->
list_to_binary(integer_to_list(I)).

View File

@ -28,7 +28,7 @@
-include("emqtt_packet.hrl").
-include("emqtt_topic.hrl").
-include("emqtt_systop.hrl").
-behaviour(gen_server).
@ -142,12 +142,13 @@ key(Metric) ->
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(Options) ->
Topics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% $SYS Topics for metrics
[{atomic, _} = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS],
[{atomic, _} = emqtt_pubsub:create(systop(Topic)) || Topic <- Topics],
% Create metrics table
ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]),
% Init metrics
[new(Metric) || <<"$SYS/broker/", Metric/binary>> <- ?SYSTOP_METRICS],
[new_metric(Topic) || Topic <- Topics],
PubInterval = proplists:get_value(pub_interval, Options, 60),
{ok, tick(#state{pub_interval = PubInterval}), hibernate}.
@ -162,12 +163,7 @@ handle_cast(_Msg, State) ->
handle_info(tick, State) ->
% publish metric message
lists:foreach(
fun({Metric, Val}) ->
Topic = list_to_binary(atom_to_list(Metric)),
Payload = list_to_binary(integer_to_list(Val)),
publish(<<"$SYS/broker/", Topic/binary>>, Payload)
end, all()),
[publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()],
{noreply, tick(State), hibernate};
handle_info(_Info, State) ->
@ -183,13 +179,19 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal Function Definitions
%% ------------------------------------------------------------------
new(Metric) ->
Key = list_to_atom(binary_to_list(Metric)),
[ets:insert(?TABLE, {{Key, N}, 0}) || N <- lists:seq(1, erlang:system_info(schedulers))].
tick(State = #state{pub_interval = PubInterval}) ->
State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}.
systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
publish(Topic, Payload) ->
emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}).
new_metric(Name) ->
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
[ets:insert(?TABLE, {{Name, I}, 0}) || I <- Schedulers].
tick(State = #state{pub_interval = PubInterval}) ->
State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}.
i2b(I) ->
list_to_binary(integer_to_list(I)).