From d4a434176a7f06e07e62dda2fa020b3b4487194d Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:06:47 +0800 Subject: [PATCH] sysmon --- src/emqttd_sysmon.erl | 120 +++++++++++++++++++++++++++++++++--------- 1 file changed, 94 insertions(+), 26 deletions(-) diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 38b419bad..a86cad01e 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -25,69 +25,137 @@ %%% @end %%%----------------------------------------------------------------------------- -%%TODO: this is a demo module.... - -module(emqttd_sysmon). -author("Feng Lee "). -behavior(gen_server). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). +-record(state, {tref, events = []}). %%------------------------------------------------------------------------------ %% @doc Start system monitor %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec start_link(Opts :: list(tuple())) -> + {ok, pid()} | ignore | {error, term()}. +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - erlang:system_monitor(self(), [{long_gc, 5000}, - {large_heap, 8 * 1024 * 1024}, - busy_port]), - {ok, #state{}}. +init([Opts]) -> + erlang:system_monitor(self(), parse_opt(Opts)), + {ok, TRef} = timer:send_interval(1000, reset), + {ok, #state{tref = TRef}}. + +parse_opt(Opts) -> + parse_opt(Opts, []). +parse_opt([], Acc) -> + Acc; +parse_opt([{long_gc, false}|Opts], Acc) -> + parse_opt(Opts, Acc); +parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) -> + parse_opt(Opts, [{long_gc, Ms}|Acc]); +parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) -> + parse_opt(Opts, [{long_schedule, Ms}|Acc]); +parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) -> + parse_opt(Opts, [{large_heap, Size}|Acc]); +parse_opt([{busy_port, true}|Opts], Acc) -> + parse_opt(Opts, [busy_port|Acc]); +parse_opt([{busy_port, false}|Opts], Acc) -> + parse_opt(Opts, Acc); +parse_opt([{busy_dist_port, true}|Opts], Acc) -> + parse_opt(Opts, [busy_dist_port|Acc]); +parse_opt([{busy_dist_port, false}|Opts], Acc) -> + parse_opt(Opts, Acc). handle_call(Request, _From, State) -> lager:error("Unexpected request: ~p", [Request]), {reply, {error, unexpected_request}, State}. handle_cast(Msg, State) -> - lager:error("unexpected msg: ~p", [Msg]), + lager:error("Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({monitor, GcPid, long_gc, Info}, State) -> - lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid, - [registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]), - {noreply, State}; +handle_info({monitor, Pid, long_gc, Info}, State) -> + suppress({long_gc, Pid}, fun() -> + WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]), + lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + publish(long_gc, WarnMsg) + end, State); -handle_info({monitor, GcPid, large_heap, Info}, State) -> - lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid, - [registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]), - {noreply, State}; +handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> + suppress({long_schedule, Pid}, fun() -> + WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), + lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + publish(long_schedule, WarnMsg) + end, State); + +handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> + suppress({long_schedule, Port}, fun() -> + WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), + lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]), + publish(long_schedule, WarnMsg) + end, State); + +handle_info({monitor, Pid, large_heap, Info}, State) -> + suppress({large_heap, Pid}, fun() -> + WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), + lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + publish(large_heap, WarnMsg) + end, State); handle_info({monitor, SusPid, busy_port, Port}, State) -> - lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid, - [registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]), - {noreply, State}; + suppress({busy_port, Port}, fun() -> + WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), + lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + publish(busy_port, WarnMsg) + end, State); + +handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> + suppress({busy_dist_port, Port}, fun() -> + WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), + lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + publish(busy_dist_port, WarnMsg) + end, State); + +handle_info(reset, State) -> + {noreply, State#state{events = []}}; handle_info(Info, State) -> lager:error("Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{tref = TRef}) -> + timer:cancel(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. +suppress(Key, SuccFun, State = #state{events = Events}) -> + case lists:member(Key, Events) of + true -> + {noreply, State}; + false -> + SuccFun(), + {noreply, State#state{events = [Key|Events]}} + end. + +procinfo(Pid) -> + emqttd_vm:get_process_info(Pid) ++ emqttd_vm:get_process_gc(Pid). + +publish(Sysmon, WarnMsg) -> + Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)), + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). + +topic(Sysmon) -> + emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))). +