From cff100f706b1ca6ff9e14e95b94a16b75ed2fa89 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 8 Mar 2015 12:23:44 +0800 Subject: [PATCH] system topics --- apps/emqtt/include/emqtt_systop.hrl | 96 +++++++++++++++++++++++++++++ apps/emqtt/include/emqtt_topic.hrl | 34 ---------- apps/emqtt/src/emqtt_broker.erl | 39 +++++++++--- apps/emqtt/src/emqtt_metrics.erl | 32 +++++----- 4 files changed, 143 insertions(+), 58 deletions(-) create mode 100644 apps/emqtt/include/emqtt_systop.hrl diff --git a/apps/emqtt/include/emqtt_systop.hrl b/apps/emqtt/include/emqtt_systop.hrl new file mode 100644 index 000000000..e6615e5a5 --- /dev/null +++ b/apps/emqtt/include/emqtt_systop.hrl @@ -0,0 +1,96 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt system topics. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-define(SYSTOP, <<"$SYS">>). + +%%------------------------------------------------------------------------------ +%% $SYS Topics of Broker +%%------------------------------------------------------------------------------ +-define(SYSTOP_BROKERS, [ + version, % Broker version + uptime, % Broker uptime + timestamp, % Broker timestamp + description % Broker description +]). + +%%------------------------------------------------------------------------------ +%% $SYS Topics of Clients +%%------------------------------------------------------------------------------ +-define(SYSTOP_CLIENTS, [ + 'clients/connected', % ??? + 'clients/disconnected', % ??? + 'clients/total', % total clients connected current + 'clients/max' % max clients connected +]). + +%%------------------------------------------------------------------------------ +%% $SYS Topics of Subscribers +%%------------------------------------------------------------------------------ +-define(SYSTOP_SUBSCRIBERS, [ + 'subscribers/total', % ... + 'subscribers/max' % ... +]). + +%%------------------------------------------------------------------------------ +%% Bytes sent and received of Broker +%%------------------------------------------------------------------------------ +-define(SYSTOP_BYTES, [ + 'bytes/received', % Total bytes received + 'bytes/sent' % Total bytes sent +]). + +%%------------------------------------------------------------------------------ +%% Packets sent and received of Broker +%%------------------------------------------------------------------------------ +-define(SYSTOP_PACKETS, [ + 'packets/received', % All Packets received + 'packets/sent', % All Packets sent + 'packets/connect', % CONNECT Packets received + 'packets/connack', % CONNACK Packets sent + 'packets/publish/received', % PUBLISH packets received + 'packets/publish/sent', % PUBLISH packets sent + 'packets/subscribe', % SUBSCRIBE Packets received + 'packets/suback', % SUBACK packets sent + 'packets/unsubscribe', % UNSUBSCRIBE Packets received + 'packets/unsuback', % UNSUBACK Packets sent + 'packets/pingreq', % PINGREQ packets received + 'packets/pingresp', % PINGRESP Packets sent + 'packets/disconnect' % DISCONNECT Packets received +]). + +%%------------------------------------------------------------------------------ +%% Messages sent and received of broker +%%------------------------------------------------------------------------------ +-define(SYSTOP_MESSAGES, [ + 'messages/received', % Messages received + 'messages/sent', % Messages sent + 'messages/retained', % Messagea retained + 'messages/stored', % Messages stored + 'messages/dropped' % Messages dropped +]). + + diff --git a/apps/emqtt/include/emqtt_topic.hrl b/apps/emqtt/include/emqtt_topic.hrl index 7c3e9eddf..4313d4885 100644 --- a/apps/emqtt/include/emqtt_topic.hrl +++ b/apps/emqtt/include/emqtt_topic.hrl @@ -53,37 +53,3 @@ node_id :: binary() | atom() }). -%%------------------------------------------------------------------------------ -%% System Topic -%%------------------------------------------------------------------------------ --define(SYSTOP, <<"$SYS">>). - --define(SYSTOP_BROKER, [ - % $SYS Broker Topics - <<"$SYS/broker/version">>, - <<"$SYS/broker/uptime">>, - <<"$SYS/broker/description">>, - <<"$SYS/broker/timestamp">>, - % $SYS Client Topics - <<"$SYS/broker/clients/connected">>, - <<"$SYS/broker/clients/disconnected">>, - <<"$SYS/broker/clients/total">>, - <<"$SYS/broker/clients/max">>, - % $SYS Subscriber Topics - <<"$SYS/broker/subscribers/total">>, - <<"$SYS/broker/subscribers/max">>]). - --define(SYSTOP_METRICS, [ - % Bytes sent and received - <<"$SYS/broker/bytes/received">>, - <<"$SYS/broker/bytes/sent">>, - % Packets sent and received - <<"$SYS/broker/packets/received">>, - <<"$SYS/broker/packets/sent">>, - % Messges sent and received - <<"$SYS/broker/messages/received">>, - <<"$SYS/broker/messages/sent">>, - <<"$SYS/broker/messages/retained">>, - <<"$SYS/broker/messages/stored">>, - <<"$SYS/broker/messages/dropped">>]). - diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl index c195f1a4d..f55004ecb 100644 --- a/apps/emqtt/src/emqtt_broker.erl +++ b/apps/emqtt/src/emqtt_broker.erl @@ -28,12 +28,14 @@ -include("emqtt_packet.hrl"). --include("emqtt_topic.hrl"). +-include("emqtt_systop.hrl"). -behaviour(gen_server). -define(SERVER, ?MODULE). +-define(TABLE, ?MODULE). + %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ @@ -73,8 +75,15 @@ uptime() -> init([Options]) -> SysInterval = proplists:get_value(sys_interval, Options, 60), % Create $SYS Topics - [{atomic, _} = emqtt_pubsub:create(SysTopic) || SysTopic <- ?SYSTOP_BROKER], + [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS], + [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS], + [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_SUBSCRIBERS], ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]), + [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_CLIENTS], + [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_SUBSCRIBERS], + % retain version, description + retain(systop(version), version()), + retain(systop(description), description()), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, {ok, tick(State)}. @@ -88,9 +97,8 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info(tick, State) -> - publish(true, <<"$SYS/broker/version">>, version()), - publish(false, <<"$SYS/broker/uptime">>, uptime(State)), - publish(true, <<"$SYS/broker/description">>, description()), + publish(systop(uptime), uptime(State)), + [publish(systop(Name), i2b(Val)) || {Name, Val} <- ets:tab2list(?TABLE)], {noreply, tick(State)}; handle_info(_Info, State) -> @@ -105,11 +113,21 @@ code_change(_OldVsn, State, _Extra) -> %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------ -publish(Retain, Topic, Payload) when is_list(Payload) -> - publish(Retain, Topic, list_to_binary(Payload)); -publish(Retain, Topic, Payload) when is_binary(Payload) -> - emqtt_router:route(#mqtt_message{retain = Retain, topic = Topic, payload = Payload}). +systop(Name) when is_atom(Name) -> + list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). + +create(Topic) -> + emqtt_pubsub:create(Topic). + +retain(Topic, Payload) when is_list(Payload) -> + emqtt_router:route(#mqtt_message{retain = true, + topic = Topic, + payload = Payload}). + +publish(Topic, Payload) when is_binary(Payload) -> + emqtt_router:route(#mqtt_message{topic = Topic, + payload = Payload}). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, @@ -133,3 +151,6 @@ uptime(days, D) -> tick(State = #state{sys_interval = SysInterval}) -> State#state{tick_timer = erlang:send_after(SysInterval * 1000, self(), tick)}. +i2b(I) when is_integer(I) -> + list_to_binary(integer_to_list(I)). + diff --git a/apps/emqtt/src/emqtt_metrics.erl b/apps/emqtt/src/emqtt_metrics.erl index 53e08f7e1..e7997723e 100644 --- a/apps/emqtt/src/emqtt_metrics.erl +++ b/apps/emqtt/src/emqtt_metrics.erl @@ -28,7 +28,7 @@ -include("emqtt_packet.hrl"). --include("emqtt_topic.hrl"). +-include("emqtt_systop.hrl"). -behaviour(gen_server). @@ -142,12 +142,13 @@ key(Metric) -> %% gen_server Function Definitions %% ------------------------------------------------------------------ init(Options) -> + Topics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % $SYS Topics for metrics - [{atomic, _} = emqtt_pubsub:create(Topic) || Topic <- ?SYSTOP_METRICS], + [{atomic, _} = emqtt_pubsub:create(systop(Topic)) || Topic <- Topics], % Create metrics table ets:new(?TABLE, [set, public, named_table, {write_concurrency, true}]), % Init metrics - [new(Metric) || <<"$SYS/broker/", Metric/binary>> <- ?SYSTOP_METRICS], + [new_metric(Topic) || Topic <- Topics], PubInterval = proplists:get_value(pub_interval, Options, 60), {ok, tick(#state{pub_interval = PubInterval}), hibernate}. @@ -162,12 +163,7 @@ handle_cast(_Msg, State) -> handle_info(tick, State) -> % publish metric message - lists:foreach( - fun({Metric, Val}) -> - Topic = list_to_binary(atom_to_list(Metric)), - Payload = list_to_binary(integer_to_list(Val)), - publish(<<"$SYS/broker/", Topic/binary>>, Payload) - end, all()), + [publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()], {noreply, tick(State), hibernate}; handle_info(_Info, State) -> @@ -183,13 +179,19 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Function Definitions %% ------------------------------------------------------------------ -new(Metric) -> - Key = list_to_atom(binary_to_list(Metric)), - [ets:insert(?TABLE, {{Key, N}, 0}) || N <- lists:seq(1, erlang:system_info(schedulers))]. - -tick(State = #state{pub_interval = PubInterval}) -> - State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}. +systop(Name) when is_atom(Name) -> + list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). publish(Topic, Payload) -> emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}). +new_metric(Name) -> + Schedulers = lists:seq(1, erlang:system_info(schedulers)), + [ets:insert(?TABLE, {{Name, I}, 0}) || I <- Schedulers]. + +tick(State = #state{pub_interval = PubInterval}) -> + State#state{tick_timer = erlang:send_after(PubInterval * 1000, self(), tick)}. + +i2b(I) -> + list_to_binary(integer_to_list(I)). +