upstats
This commit is contained in:
parent
5d4704acc2
commit
f2b0449117
|
@ -150,7 +150,8 @@ setstat(Stat, Val) ->
|
|||
setstats(Stat, MaxStat, Val) ->
|
||||
MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2),
|
||||
if
|
||||
Val > MaxVal -> ets:update_element(?BROKER_TABLE, MaxStat, {2, Val});
|
||||
Val > MaxVal ->
|
||||
ets:update_element(?BROKER_TABLE, MaxStat, {2, Val});
|
||||
true -> ok
|
||||
end,
|
||||
ets:update_element(?BROKER_TABLE, Stat, {2, Val}).
|
||||
|
|
|
@ -141,7 +141,7 @@ handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) ->
|
|||
true -> ok;
|
||||
false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid])
|
||||
end,
|
||||
{noreply, State};
|
||||
{noreply, setstats(State)};
|
||||
|
||||
handle_cast({unregister, ClientId, Pid}, State) ->
|
||||
case ets:lookup(?CLIENT_TABLE, ClientId) of
|
||||
|
|
|
@ -240,14 +240,22 @@ handle_cast(Msg, State) ->
|
|||
|
||||
%% a new record has been written.
|
||||
handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) ->
|
||||
%%TODO: rewrite...
|
||||
erlang:monitor(process, Pid),
|
||||
{noreply, setstats(State)};
|
||||
upstats(subscriber),
|
||||
{noreply, State};
|
||||
%% TODO:...
|
||||
|
||||
handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) ->
|
||||
upstats(topic),
|
||||
{noreply, State};
|
||||
|
||||
%% {write, #topic{}, _ActivityId}
|
||||
%% {delete_object, _OldRecord, _ActivityId}
|
||||
%% {delete, {Tab, Key}, ActivityId}
|
||||
handle_info({mnesia_table_event, _Event}, State) ->
|
||||
{noreply, setstats(State)};
|
||||
upstats(),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
||||
F = fun() ->
|
||||
|
@ -260,7 +268,8 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
|||
{atomic, _} -> ok;
|
||||
{aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason])
|
||||
end,
|
||||
{noreply, setstats(State)};
|
||||
upstats(),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
lager:error("Unexpected Info: ~p", [Info]),
|
||||
|
@ -375,11 +384,16 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
|||
throw({notfound, NodeId})
|
||||
end.
|
||||
|
||||
setstats(State) ->
|
||||
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)),
|
||||
upstats() ->
|
||||
upstats(topic), upstats(subscribe).
|
||||
|
||||
upstats(topic) ->
|
||||
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size));
|
||||
|
||||
upstats(subscribe) ->
|
||||
emqttd_broker:setstats('subscribers/count',
|
||||
'subscribers/max',
|
||||
mnesia:table_info(topic_subscriber, size)), State.
|
||||
mnesia:table_info(topic_subscriber, size)).
|
||||
|
||||
dropped(true) ->
|
||||
emqttd_metrics:inc('messages/dropped');
|
||||
|
|
Loading…
Reference in New Issue