diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index ad32e3463..8bca804ba 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -150,7 +150,8 @@ setstat(Stat, Val) -> setstats(Stat, MaxStat, Val) -> MaxVal = ets:lookup_element(?BROKER_TABLE, MaxStat, 2), if - Val > MaxVal -> ets:update_element(?BROKER_TABLE, MaxStat, {2, Val}); + Val > MaxVal -> + ets:update_element(?BROKER_TABLE, MaxStat, {2, Val}); true -> ok end, ets:update_element(?BROKER_TABLE, Stat, {2, Val}). diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 951425e6c..1692401a9 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -141,7 +141,7 @@ handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) -> true -> ok; false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid]) end, - {noreply, State}; + {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(?CLIENT_TABLE, ClientId) of diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index a8381f4cb..d16709884 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -240,14 +240,22 @@ handle_cast(Msg, State) -> %% a new record has been written. handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) -> + %%TODO: rewrite... erlang:monitor(process, Pid), - {noreply, setstats(State)}; + upstats(subscriber), + {noreply, State}; +%% TODO:... + +handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) -> + upstats(topic), + {noreply, State}; %% {write, #topic{}, _ActivityId} %% {delete_object, _OldRecord, _ActivityId} %% {delete, {Tab, Key}, ActivityId} handle_info({mnesia_table_event, _Event}, State) -> - {noreply, setstats(State)}; + upstats(), + {noreply, State}; handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> F = fun() -> @@ -260,7 +268,8 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> {atomic, _} -> ok; {aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason]) end, - {noreply, setstats(State)}; + upstats(), + {noreply, State}; handle_info(Info, State) -> lager:error("Unexpected Info: ~p", [Info]), @@ -375,11 +384,16 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) -> throw({notfound, NodeId}) end. -setstats(State) -> - emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)), +upstats() -> + upstats(topic), upstats(subscribe). + +upstats(topic) -> + emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)); + +upstats(subscribe) -> emqttd_broker:setstats('subscribers/count', 'subscribers/max', - mnesia:table_info(topic_subscriber, size)), State. + mnesia:table_info(topic_subscriber, size)). dropped(true) -> emqttd_metrics:inc('messages/dropped');