This commit is contained in:
Feng Lee 2015-04-25 23:57:38 +08:00
parent d5e8a28db3
commit 7e63e179da
3 changed files with 13 additions and 55 deletions

View File

@ -83,6 +83,7 @@ start_servers(Sup) ->
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
%{"emqttd router", emqttd_router}, %{"emqttd router", emqttd_router},
{"emqttd broker", emqttd_broker, BrokerOpts}, {"emqttd broker", emqttd_broker, BrokerOpts},
{"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics, MetricOpts}, {"emqttd metrics", emqttd_metrics, MetricOpts},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},
{"emqttd access control", emqttd_access_control, AccessOpts}, {"emqttd access control", emqttd_access_control, AccessOpts},

View File

@ -96,59 +96,6 @@ datetime() ->
io_lib:format( io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
%%------------------------------------------------------------------------------
%% @doc Generate stats fun
%% @end
%%------------------------------------------------------------------------------
-spec statsfun(Stat :: atom()) -> fun().
statsfun(Stat) ->
fun(Val) -> setstat(Stat, Val) end.
-spec statsfun(Stat :: atom(), MaxStat :: atom()) -> fun().
statsfun(Stat, MaxStat) ->
fun(Val) -> setstats(Stat, MaxStat, Val) end.
%%------------------------------------------------------------------------------
%% @doc Get broker statistics
%% @end
%%------------------------------------------------------------------------------
-spec getstats() -> [{atom(), non_neg_integer()}].
getstats() ->
ets:tab2list(?BROKER_TAB).
%%------------------------------------------------------------------------------
%% @doc Get stats by name
%% @end
%%------------------------------------------------------------------------------
-spec getstat(atom()) -> non_neg_integer() | undefined.
getstat(Name) ->
case ets:lookup(?BROKER_TAB, Name) of
[{Name, Val}] -> Val;
[] -> undefined
end.
%%------------------------------------------------------------------------------
%% @doc Set broker stats
%% @end
%%------------------------------------------------------------------------------
-spec setstat(Stat :: atom(), Val :: pos_integer()) -> boolean().
setstat(Stat, Val) ->
ets:update_element(?BROKER_TAB, Stat, {2, Val}).
%%------------------------------------------------------------------------------
%% @doc Set stats with max
%% @end
%%------------------------------------------------------------------------------
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
setstats(Stat, MaxStat, Val) ->
MaxVal = ets:lookup_element(?BROKER_TAB, MaxStat, 2),
if
Val > MaxVal ->
ets:update_element(?BROKER_TAB, MaxStat, {2, Val});
true -> ok
end,
ets:update_element(?BROKER_TAB, Stat, {2, Val}).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
@ -156,11 +103,9 @@ setstats(Stat, MaxStat, Val) ->
init([Options]) -> init([Options]) ->
random:seed(now()), random:seed(now()),
ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
[ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics], [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics],
% Create $SYS Topics % Create $SYS Topics
[ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
[ok = create(systop(Topic)) || Topic <- Topics],
SysInterval = proplists:get_value(sys_interval, Options, 60), SysInterval = proplists:get_value(sys_interval, Options, 60),
State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
Delay = if Delay = if

View File

@ -119,6 +119,11 @@ setstats(Stat, MaxStat, Val) ->
init([]) -> init([]) ->
random:seed(now()), random:seed(now()),
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
% Create $SYS Topics
[ok = emqttd_pubsub:create(systop(Topic)) || Topic <- Topics],
SysInterval = proplists:get_value(sys_interval, Options, 60),
{ok, #state{}}. {ok, #state{}}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
@ -127,6 +132,11 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info(tick, State) ->
[publish(systop(Stat), i2b(Val))
|| {Stat, Val} <- ets:tab2list(?STATS_TAB)],
{noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -140,4 +150,6 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).