This commit is contained in:
Feng 2015-10-12 21:06:47 +08:00
parent 6209d47aae
commit d4a434176a
1 changed files with 94 additions and 26 deletions

View File

@ -25,69 +25,137 @@
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%TODO: this is a demo module....
-module(emqttd_sysmon). -module(emqttd_sysmon).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-behavior(gen_server). -behavior(gen_server).
-export([start_link/0]). -export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {}). -record(state, {tref, events = []}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start system monitor %% @doc Start system monitor
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}. -spec start_link(Opts :: list(tuple())) ->
start_link() -> {ok, pid()} | ignore | {error, term()}.
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init([]) -> init([Opts]) ->
erlang:system_monitor(self(), [{long_gc, 5000}, erlang:system_monitor(self(), parse_opt(Opts)),
{large_heap, 8 * 1024 * 1024}, {ok, TRef} = timer:send_interval(1000, reset),
busy_port]), {ok, #state{tref = TRef}}.
{ok, #state{}}.
parse_opt(Opts) ->
parse_opt(Opts, []).
parse_opt([], Acc) ->
Acc;
parse_opt([{long_gc, false}|Opts], Acc) ->
parse_opt(Opts, Acc);
parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) ->
parse_opt(Opts, [{long_gc, Ms}|Acc]);
parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) ->
parse_opt(Opts, [{long_schedule, Ms}|Acc]);
parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) ->
parse_opt(Opts, [{large_heap, Size}|Acc]);
parse_opt([{busy_port, true}|Opts], Acc) ->
parse_opt(Opts, [busy_port|Acc]);
parse_opt([{busy_port, false}|Opts], Acc) ->
parse_opt(Opts, Acc);
parse_opt([{busy_dist_port, true}|Opts], Acc) ->
parse_opt(Opts, [busy_dist_port|Acc]);
parse_opt([{busy_dist_port, false}|Opts], Acc) ->
parse_opt(Opts, Acc).
handle_call(Request, _From, State) -> handle_call(Request, _From, State) ->
lager:error("Unexpected request: ~p", [Request]), lager:error("Unexpected request: ~p", [Request]),
{reply, {error, unexpected_request}, State}. {reply, {error, unexpected_request}, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("unexpected msg: ~p", [Msg]), lager:error("Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({monitor, GcPid, long_gc, Info}, State) -> handle_info({monitor, Pid, long_gc, Info}, State) ->
lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid, suppress({long_gc, Pid}, fun() ->
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]), WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]),
{noreply, State}; lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
publish(long_gc, WarnMsg)
end, State);
handle_info({monitor, GcPid, large_heap, Info}, State) -> handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid, suppress({long_schedule, Pid}, fun() ->
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]), WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
{noreply, State}; lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
publish(long_schedule, WarnMsg)
end, State);
handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port}, fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]),
publish(long_schedule, WarnMsg)
end, State);
handle_info({monitor, Pid, large_heap, Info}, State) ->
suppress({large_heap, Pid}, fun() ->
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]),
publish(large_heap, WarnMsg)
end, State);
handle_info({monitor, SusPid, busy_port, Port}, State) -> handle_info({monitor, SusPid, busy_port, Port}, State) ->
lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid, suppress({busy_port, Port}, fun() ->
[registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]), WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
{noreply, State}; lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
publish(busy_port, WarnMsg)
end, State);
handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port}, fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
publish(busy_dist_port, WarnMsg)
end, State);
handle_info(reset, State) ->
{noreply, State#state{events = []}};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected info: ~p", [Info]), lager:error("Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, #state{tref = TRef}) ->
ok. timer:cancel(TRef), ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
suppress(Key, SuccFun, State = #state{events = Events}) ->
case lists:member(Key, Events) of
true ->
{noreply, State};
false ->
SuccFun(),
{noreply, State#state{events = [Key|Events]}}
end.
procinfo(Pid) ->
emqttd_vm:get_process_info(Pid) ++ emqttd_vm:get_process_gc(Pid).
publish(Sysmon, WarnMsg) ->
Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)),
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
topic(Sysmon) ->
emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).