From 778c34f11dcbbaff945a68a329108862789f6830 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Mon, 9 Mar 2015 13:36:00 +0800 Subject: [PATCH] client, session, topics, subscribers statatistics --- apps/emqtt/src/emqtt_broker.erl | 113 +++++++++++++---- apps/emqtt/src/emqtt_client.erl | 1 + apps/emqtt/src/emqtt_cm.erl | 153 +++++++++++++---------- apps/emqtt/src/emqtt_protocol.erl | 1 + apps/emqtt/src/emqtt_pubsub.erl | 194 ++++++++++++++++++------------ apps/emqtt/src/emqtt_server.erl | 1 + apps/emqtt/src/emqtt_sm.erl | 171 +++++++++++++------------- 7 files changed, 389 insertions(+), 245 deletions(-) diff --git a/apps/emqtt/src/emqtt_broker.erl b/apps/emqtt/src/emqtt_broker.erl index 5905a6337..2c5d99ad8 100644 --- a/apps/emqtt/src/emqtt_broker.erl +++ b/apps/emqtt/src/emqtt_broker.erl @@ -34,7 +34,7 @@ -define(SERVER, ?MODULE). --define(TABLE, ?MODULE). +-define(BROKER_TAB, ?MODULE). %% ------------------------------------------------------------------ %% API Function Exports @@ -42,7 +42,10 @@ -export([start_link/1]). --export([version/0, uptime/0, datetime/0, description/0]). +-export([version/0, uptime/0, datetime/0, sysdescr/0]). + +%% Statistics. +-export([getstats/0, getstat/1, setstat/2]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -53,42 +56,109 @@ -record(state, {started_at, sys_interval, tick_timer}). -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% API +%%%============================================================================= +%%------------------------------------------------------------------------------ +%% @doc +%% Start emqtt broker. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. start_link(Options) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). +%%------------------------------------------------------------------------------ +%% @doc +%% Get broker version. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec version() -> string(). version() -> {ok, Version} = application:get_key(emqtt, vsn), Version. -description() -> +%%------------------------------------------------------------------------------ +%% @doc +%% Get broker description. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec sysdescr() -> string(). +sysdescr() -> {ok, Descr} = application:get_key(emqtt, description), Descr. +%%------------------------------------------------------------------------------ +%% @doc +%% Get broker uptime. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec uptime() -> string(). uptime() -> gen_server:call(?SERVER, uptime). +%%------------------------------------------------------------------------------ +%% @doc +%% Get broker datetime. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec datetime() -> string(). datetime() -> {{Y, M, D}, {H, MM, S}} = calendar:local_time(), lists:flatten( io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%------------------------------------------------------------------------------ +%% @doc +%% Get broker statistics. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec getstats() -> [{atom(), non_neg_integer()}]. +getstats() -> + ets:tab2list(?BROKER_TAB). + +%%------------------------------------------------------------------------------ +%% @doc +%% Get stats by name. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec getstat(atom()) -> non_neg_integer() | undefined. +getstat(Name) -> + case ets:lookup(?BROKER_TAB, Name) of + [{Name, Val}] -> Val; + [] -> undefined + end. + +%%------------------------------------------------------------------------------ +%% @doc +%% Set broker stats. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec setstat(atom(), pos_integer()) -> boolean(). +setstat(Name, Val) -> + ets:insert(?BROKER_TAB, {Name, Val}). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + init([Options]) -> random:seed(now()), - SysInterval = proplists:get_value(sys_interval, Options, 60), % Create $SYS Topics [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS], [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS], + [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_SESSIONS], [{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_PUBSUB], - ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]), - [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_CLIENTS], - [ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_PUBSUB], - % retain version, description + ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]), + SysInterval = proplists:get_value(sys_interval, Options, 60), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, {ok, tick(random:uniform(SysInterval), State)}. @@ -103,13 +173,11 @@ handle_cast(_Msg, State) -> handle_info(tick, State) -> retain(systop(version), list_to_binary(version())), - retain(systop(description), list_to_binary(description())), + retain(systop(sysdescr), list_to_binary(sysdescr())), publish(systop(uptime), list_to_binary(uptime(State))), publish(systop(datetime), list_to_binary(datetime())), - %%TODO... call emqtt_cm here? - [publish(systop(Stat), i2b(Val)) || {Stat, Val} <- emqtt_cm:stats()], - %%TODO... call emqtt_pubsub here? - [publish(systop(Stat), i2b(Val)) || {Stat, Val} <- emqtt_pubsub:stats()], + [publish(systop(Stat), i2b(Val)) + || {Stat, Val} <- ets:tab2list(?BROKER_TAB)], {noreply, tick(State)}; handle_info(_Info, State) -> @@ -121,9 +189,10 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index 55b21dc6e..baffc9a95 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -219,6 +219,7 @@ received_stats(?PACKET(Type)) -> inc(?CONNECT) -> emqtt_metrics:inc('packets/connect'); inc(?PUBLISH) -> + emqtt_metrics:inc('messages/received'), emqtt_metrics:inc('packets/publish/received'); inc(?SUBSCRIBE) -> emqtt_metrics:inc('packets/subscribe'); diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index a43c609a6..84c70dbaa 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -1,26 +1,29 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - -%client manager +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt client manager. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_cm). -author('feng@emqtt.io'). @@ -29,21 +32,16 @@ -define(SERVER, ?MODULE). --define(TABLE, emqtt_client). - -%% ------------------------------------------------------------------ -%% API Function Exports -%% ------------------------------------------------------------------ +-define(CLIENT_TAB, emqtt_client). +%% API Exports -export([start_link/0]). -export([lookup/1, register/2, unregister/2]). --export([stats/0]). +-export([getstats/0]). -%% ------------------------------------------------------------------ %% gen_server Function Exports -%% ------------------------------------------------------------------ -export([init/1, handle_call/3, @@ -54,15 +52,26 @@ -record(state, {max = 0}). -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc +%% Start client manager. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%%------------------------------------------------------------------------------ +%% @doc +%% Lookup client pid with clientId. %% -%% @doc lookup client pid with clientId. -%% +%% @end +%%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> case ets:lookup(emqtt_client, ClientId) of @@ -70,33 +79,45 @@ lookup(ClientId) when is_binary(ClientId) -> [] -> undefined end. +%%------------------------------------------------------------------------------ +%% @doc +%% Register clientId with pid. %% -%% @doc register clientId with pid. -%% +%% @end +%%------------------------------------------------------------------------------ -spec register(ClientId :: binary(), Pid :: pid()) -> ok. register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> gen_server:call(?SERVER, {register, ClientId, Pid}). +%%------------------------------------------------------------------------------ +%% @doc +%% Unregister clientId with pid. %% -%% @doc unregister clientId with pid. -%% +%% @end +%%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary(), Pid :: pid()) -> ok. unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> gen_server:cast(?SERVER, {unregister, ClientId, Pid}). -stats() -> - gen_server:call(?SERVER, stats). +%%------------------------------------------------------------------------------ +%% @doc +%% Get statistics of client manager. +%% +%% @end +%%------------------------------------------------------------------------------ +getstats() -> + gen_server:call(?SERVER, getstats). -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= init([]) -> - ets:new(?TABLE, [set, named_table, protected]), + ets:new(?CLIENT_TAB, [set, named_table, protected]), {ok, #state{}}. handle_call({register, ClientId, Pid}, _From, State) -> - case ets:lookup(?TABLE, ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, _}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; @@ -107,10 +128,10 @@ handle_call({register, ClientId, Pid}, _From, State) -> [] -> insert(ClientId, Pid) end, - {reply, ok, set_max(State)}; + {reply, ok, setstats(State)}; -handle_call(stats, _From, State = #state{max = Max}) -> - Stats = [{'clients/total', ets:info(?TABLE, size)}, +handle_call(getstats, _From, State = #state{max = Max}) -> + Stats = [{'clients/count', ets:info(?CLIENT_TAB, size)}, {'clients/max', Max}], {reply, Stats, State}; @@ -118,23 +139,23 @@ handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast({unregister, ClientId, Pid}, State) -> - case ets:lookup(?TABLE, ClientId) of + case ets:lookup(?CLIENT_TAB, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef), - ets:delete(?TABLE, ClientId); + ets:delete(?CLIENT_TAB, ClientId); [_] -> ignore; [] -> lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) end, - {noreply, State}; + {noreply, setstats(State)}; handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?TABLE, {{'_', DownPid, MRef}}), - {noreply, State}; + ets:match_delete(?CLIENT_TAB, {{'_', DownPid, MRef}}), + {noreply, setstats(State)}; handle_info(_Info, State) -> {noreply, State}. @@ -145,16 +166,22 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal functions +%%%============================================================================= insert(ClientId, Pid) -> - ets:insert(?TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}). + ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}). -set_max(State = #state{max = Max}) -> - Total = ets:info(?TABLE, size), +setstats(State = #state{max = Max}) -> + Count = ets:info(?CLIENT_TAB, size), + emqtt_broker:setstat('client/count', Count), if - Total > Max -> State#state{max = Total}; - true -> State + Count > Max -> + emqtt_broker:setstat('client/max', Count), + State#state{max = Count}; + true -> + State end. + + diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index aa59a16fa..984ace5c4 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -307,6 +307,7 @@ sent_stats(?PACKET(Type)) -> inc(?CONNACK) -> emqtt_metrics:inc('packets/connack'); inc(?PUBLISH) -> + emqtt_metrics:inc('messages/sent'), emqtt_metrics:inc('packets/publish/sent'); inc(?SUBACK) -> emqtt_metrics:inc('packets/suback'); diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index 18848cb4f..2d28e9cc4 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -1,29 +1,37 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt core pubsub. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_pubsub). -author('feng@emqtt.io'). +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + -include("emqtt.hrl"). -include("emqtt_topic.hrl"). @@ -32,11 +40,9 @@ -include_lib("stdlib/include/qlc.hrl"). -%% ------------------------------------------------------------------ -%% API Function Exports -%% ------------------------------------------------------------------ +%% API Exports --export([start_link/0]). +-export([start_link/0, getstats/0]). -export([topics/0, create/1, @@ -48,15 +54,7 @@ dispatch/2, match/1]). --export([stats/0]). - -%% ------------------------------------------------------------------ %% gen_server Function Exports -%% ------------------------------------------------------------------ - --behaviour(gen_server). - --define(SERVER, ?MODULE). -export([init/1, handle_call/3, @@ -65,68 +63,85 @@ terminate/2, code_change/3]). -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec topics() -> list(topic()). - --spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}. - --spec unsubscribe(binary() | list(binary()), pid()) -> ok. - --endif. - -%%---------------------------------------------------------------------------- -record(state, {max_subs = 0}). -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% API +%%%============================================================================= +%%------------------------------------------------------------------------------ +%% @doc +%% Start Pubsub. %% -%% @doc Start Pubsub. -%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -stats() -> - gen_server:call(?SERVER, stats). +%%------------------------------------------------------------------------------ +%% @doc +%% Get stats of PubSub. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec getstats() -> [{atom(), non_neg_integer()}]. +getstats() -> + gen_server:call(?SERVER, getstats). +%%------------------------------------------------------------------------------ +%% @doc +%% All Topics. %% -%% @doc All topics -%% +%% @end +%%------------------------------------------------------------------------------ +-spec topics() -> list(binary()). topics() -> mnesia:dirty_all_keys(topic). +%%------------------------------------------------------------------------------ +%% @doc +%% Create static topic. %% -%% @doc Create static topic. -%% +%% @end +%%------------------------------------------------------------------------------ +-spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}. create(Topic) -> gen_server:call(?SERVER, {create, Topic}). +%%------------------------------------------------------------------------------ +%% @doc +%% Subscribe Topic or Topics %% -%% @doc Subscribe Topic or Topics -%% +%% @end +%%------------------------------------------------------------------------------ +-spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}. subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> subscribe([{Topic, Qos}], SubPid); subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> gen_server:call(?SERVER, {subscribe, Topics, SubPid}). +%%------------------------------------------------------------------------------ +%% @doc +%% Unsubscribe Topic or Topics %% -%% @doc Unsubscribe Topic or Topics -%% +%% @end +%%------------------------------------------------------------------------------ +-spec unsubscribe(binary() | list(binary()), pid()) -> ok. unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) -> unsubscribe([Topic], SubPid); unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) -> gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}). +%%------------------------------------------------------------------------------ +%% @doc +%% Publish to cluster node. %% -%% @doc Publish to cluster node. -%% +%% @end +%%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. publish(Msg=#mqtt_message{topic=Topic}) -> publish(Topic, Msg). @@ -140,7 +155,14 @@ publish(Topic, Msg) when is_binary(Topic) -> end end, match(Topic)). -%dispatch locally, should only be called by publish +%%TODO: dispatch counts.... + +%%------------------------------------------------------------------------------ +%% @doc +%% Dispatch Locally. Should only be called by publish. +%% +%% @end +%%------------------------------------------------------------------------------ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> lists:foreach(fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> Msg1 = if @@ -150,6 +172,13 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> SubPid ! {dispatch, {self(), Msg1}} end, ets:lookup(topic_subscriber, Topic)). +%%------------------------------------------------------------------------------ +%% @doc +%% @private +%% Match topic. +%% +%% @end +%%------------------------------------------------------------------------------ -spec match(Topic :: binary()) -> [topic()]. match(Topic) when is_binary(Topic) -> TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]), @@ -178,15 +207,15 @@ init([]) -> ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]), {ok, #state{}}. -handle_call(stats, _From, State = #state{max_subs = Max}) -> - Stats = [{'topics/total', mnesia:table_info(topic, size)}, - {'subscribers/total', ets:info(topic_subscriber, size)}, +handle_call(getstats, _From, State = #state{max_subs = Max}) -> + Stats = [{'topics/count', mnesia:table_info(topic, size)}, + {'subscribers/count', ets:info(topic_subscriber, size)}, {'subscribers/max', Max}], {reply, Stats, State}; handle_call({create, Topic}, _From, State) -> Result = mnesia:transaction(fun trie_add/1, [Topic]), - {reply, Result , State}; + {reply, Result, setstats(State)}; handle_call({subscribe, Topics, SubPid}, _From, State) -> Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics], @@ -195,7 +224,7 @@ handle_call({subscribe, Topics, SubPid}, _From, State) -> [] -> {ok, [Qos || {ok, Qos} <- Result]}; Errors -> hd(Errors) end, - {reply, Reply, set_maxsubs(State)}; + {reply, Reply, setstats(State)}; handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. @@ -205,7 +234,7 @@ handle_cast({unsubscribe, Topics, SubPid}, State) -> ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}), try_remove_topic(Topic) end, Topics), - {noreply, State}; + {noreply, setstats(State)}; handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. @@ -221,7 +250,7 @@ handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) -> [ets:delete_object(topic_subscriber, Sub) || Sub <- Subs], [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs] end, - {noreply, State}; + {noreply, setstats(State)}; handle_info(Info, State) -> {stop, {badinfo, Info}, State}. @@ -232,9 +261,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal functions +%%%============================================================================= subscribe_topic({Topic, Qos}, SubPid) -> case mnesia:transaction(fun trie_add/1, [Topic]) of {atomic, _} -> @@ -367,9 +396,16 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> throw({notfound, NodeId}) end. -set_maxsubs(State = #state{max_subs = Max}) -> - Total = ets:info(topic_subscriber, size), +setstats(State = #state{max_subs = Max}) -> + emqtt_broker:setstat('topics/count', mnesia:table_info(topic, size)), + SubCount = ets:info(topic_subscriber, size), + emqtt_broker:setstat('subscribers/count', SubCount), if - Total > Max -> State#state{max_subs = Total}; - true -> State + SubCount > Max -> + emqtt_broker:setstat('subscribers/max', SubCount), + State#state{max_subs = SubCount}; + true -> + State end. + + diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl index 69f718569..8f41253b7 100644 --- a/apps/emqtt/src/emqtt_server.erl +++ b/apps/emqtt/src/emqtt_server.erl @@ -103,6 +103,7 @@ handle_cast({retain, Msg = #mqtt_message{ qos = Qos, Size when Size >= Limit -> lager:error("Server dropped message(retain) for table is full: ~p", [Msg]); _ -> + %emqtt_metrics:update('messages/retained', Size), lager:info("Server retained message: ~p", [Msg]), mnesia:dirty_write(#mqtt_retained{ topic = Topic, qos = Qos, diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl index 6eb9521c4..e19076792 100644 --- a/apps/emqtt/src/emqtt_sm.erl +++ b/apps/emqtt/src/emqtt_sm.erl @@ -1,44 +1,41 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - - -%%------------------------------------------------------------------------------ -%% -%% The Session state in the Server consists of: -%% The existence of a Session, even if the rest of the Session state is empty. -%% The Client’s subscriptions. -%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely -%% acknowledged. -%% QoS 1 and QoS 2 messages pending transmission to the Client. -%% QoS 2 messages which have been received from the Client, but have not been completely -%% acknowledged. -%% Optionally, QoS 0 messages pending transmission to the Client. -%% -%%------------------------------------------------------------------------------ - +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt session manager. +%%% +%%% The Session state in the Server consists of: +%%% The existence of a Session, even if the rest of the Session state is empty. +%%% The Client’s subscriptions. +%%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not +%%% been completely acknowledged. +%%% QoS 1 and QoS 2 messages pending transmission to the Client. +%%% QoS 2 messages which have been received from the Client, but have not been +%%% completely acknowledged. +%%% Optionally, QoS 0 messages pending transmission to the Client. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_sm). -%%emqtt session manager... - %%cleanSess: true | false -include("emqtt.hrl"). @@ -47,73 +44,71 @@ -define(SERVER, ?MODULE). --define(TABLE, emqtt_session). +-define(SESSION_TAB, emqtt_session). -%% ------------------------------------------------------------------ %% API Function Exports -%% ------------------------------------------------------------------ - -export([start_link/0]). -export([lookup_session/1, start_session/2, destroy_session/1]). -%% ------------------------------------------------------------------ %% gen_server Function Exports -%% ------------------------------------------------------------------ - -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(state, {max = 0}). -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/0 :: () -> {ok, pid()}). - --spec(lookup_session/1 :: (binary()) -> pid() | undefined). - --spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}). - --spec(destroy_session/1 :: (binary()) -> ok). - --endif. - -%%---------------------------------------------------------------------------- - --record(state, {}). - -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ - +%%%============================================================================= +%%% API +%%%============================================================================= +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%%------------------------------------------------------------------------------ +%% @doc +%% Lookup Session Pid. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> - case ets:lookup(?TABLE, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _}] -> SessPid; [] -> undefined end. +%%------------------------------------------------------------------------------ +%% @doc +%% Start Session. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}. start_session(ClientId, ClientPid) -> gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). +%%------------------------------------------------------------------------------ +%% @doc +%% Destroy Session. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec destroy_session(binary()) -> ok. destroy_session(ClientId) -> gen_server:call(?SERVER, {destroy_session, ClientId}). -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= init([]) -> process_flag(trap_exit, true), - ets:new(?TABLE, [set, protected, named_table]), + ets:new(?SESSION_TAB, [set, protected, named_table]), {ok, #state{}}. handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = - case ets:lookup(?TABLE, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, _MRef}] -> emqtt_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; @@ -121,24 +116,24 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) -> case emqtt_session_sup:start_session(ClientId, ClientPid) of {ok, SessPid} -> MRef = erlang:monitor(process, SessPid), - ets:insert(?TABLE, {ClientId, SessPid, MRef}), + ets:insert(?SESSION_TAB, {ClientId, SessPid, MRef}), {ok, SessPid}; {error, Error} -> {error, Error} end end, - {reply, Reply, State}; + {reply, Reply, setstats(State)}; handle_call({destroy_session, ClientId}, _From, State) -> - case ets:lookup(?TABLE, ClientId) of + case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> erlang:demonitor(MRef), emqtt_session:destroy(SessPid, ClientId), - ets:delete(?TABLE, ClientId); + ets:delete(?SESSION_TAB, ClientId); [] -> ignore end, - {reply, ok, State}; + {reply, ok, setstats(State)}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -147,8 +142,8 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?TABLE, {{'_', DownPid, MRef}}), - {noreply, State}; + ets:match_delete(?SESSION_TAB, {{'_', DownPid, MRef}}), + {noreply, setstats(State)}; handle_info(_Info, State) -> {noreply, State}. @@ -159,4 +154,18 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +setstats(State = #state{max = Max}) -> + Count = ets:info(?SESSION_TAB, size), + emqtt_broker:setstat('sessions/count', Count), + if + Count > Max -> + emqtt_broker:setstat('sessions/max', Count), + State#state{max = Count}; + true -> + State + end.