From ba8c81e8058018e153a0986005c4c618e9417f05 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 11 Oct 2015 21:44:05 +0800 Subject: [PATCH 01/10] dialyzer plugins/*/ebin --- Makefile | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 49ff57b9f..5895a95e0 100644 --- a/Makefile +++ b/Makefile @@ -53,12 +53,13 @@ APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \ check_plt: compile dialyzer --check_plt --plt $(PLT) --apps $(APPS) \ - deps/*/ebin ./ebin + deps/*/ebin ./ebin plugins/*/ebin build_plt: compile dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) \ - deps/*/ebin ./ebin + deps/*/ebin ./ebin plugins/*/ebin dialyzer: compile - dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin + dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin plugins/*/ebin + From 47e3e826dc09bbe65eeb232adcd1bb584cf3d9f3 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:03:01 +0800 Subject: [PATCH 02/10] 0.12.1 --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index d970a9e01..6bda6f2de 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.12.0"}, + {vsn, "0.12.1"}, {modules, []}, {registered, []}, {applications, [kernel, From fc9f894aa101e28db7c7f78c86d06852dd9d6b41 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:05:23 +0800 Subject: [PATCH 03/10] sysmon --- rel/files/emqttd.config.development | 21 +++++++++++++++++++++ rel/files/emqttd.config.production | 23 +++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index f93d30959..fed80e9c4 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -241,6 +241,27 @@ %{buffer, 4096}, ]} ]} + ]}, + + %% Erlang System Monitor + {sysmon, [ + + %% Long GC + {long_gc, 100}, + + %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. + %% 8 * 1024 * 1024 + {large_heap, 8388608}, + + %% Long Schedule(ms) + {long_schedule, 50}, + + %% Busy Port + {busy_port, true}, + + %% Busy Dist Port + {busy_dist_port, true} + ]} ]} ]. diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 773f34b8a..4d92dfb0f 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -233,7 +233,30 @@ %{buffer, 4096}, ]} ]} + ]}, + + %% Erlang System Monitor + {sysmon, [ + + %% Long GC, don't monitor in production mode for: + %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 + {long_gc, false}, + + %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. + %% 8 * 1024 * 1024 + {large_heap, 8388608}, + + %% Long Schedule(ms) + {long_schedule, 50}, + + %% Busy Port + {busy_port, true}, + + %% Busy Dist Port + {busy_dist_port, true} + ]} + ]} ]. From deaf2203434f2ab1502a11768cd66c7d2d139acc Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:05:37 +0800 Subject: [PATCH 04/10] sysmon --- src/emqttd_app.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 5f4e51da6..366df60cb 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -88,7 +88,7 @@ start_servers(Sup) -> {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd access control", emqttd_access_control}, - {"emqttd system monitor", emqttd_sysmon}], + {"emqttd system monitor", emqttd_sysmon, emqttd:env(sysmon)}], [start_server(Sup, Server) || Server <- Servers]. start_server(_Sup, {Name, F}) when is_function(F) -> From 48255915655ebcefb06d2d6436a591624d295028 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:06:01 +0800 Subject: [PATCH 05/10] name --- src/emqttd_pubsub.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 6bb32452c..478db721d 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -107,7 +107,10 @@ mnesia(copy) -> Id :: pos_integer(), Opts :: list(). start_link(Id, Opts) -> - gen_server2:start_link(?MODULE, [Id, Opts], []). + gen_server2:start_link({local, name(Id)}, ?MODULE, [Id, Opts], []). + +name(Id) -> + list_to_atom("emqttd_pubsub_" ++ integer_to_list(Id)). %%------------------------------------------------------------------------------ %% @doc Create topic. Notice That this transaction is not protected by pubsub pool From 6209d47aaedcc90a674c4f334e88ba13b675604d Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:06:18 +0800 Subject: [PATCH 06/10] name --- src/emqttd_sm.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 49db5be9c..719ac0ca1 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -85,7 +85,10 @@ mnesia(copy) -> %%------------------------------------------------------------------------------ -spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. start_link(Id) -> - gen_server2:start_link(?MODULE, [Id], []). + gen_server2:start_link({local, name(Id)}, ?MODULE, [Id], []). + +name(Id) -> + list_to_atom("emqttd_sm_" ++ integer_to_list(Id)). %%------------------------------------------------------------------------------ %% @doc Pool name. From d4a434176a7f06e07e62dda2fa020b3b4487194d Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:06:47 +0800 Subject: [PATCH 07/10] sysmon --- src/emqttd_sysmon.erl | 120 +++++++++++++++++++++++++++++++++--------- 1 file changed, 94 insertions(+), 26 deletions(-) diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 38b419bad..a86cad01e 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -25,69 +25,137 @@ %%% @end %%%----------------------------------------------------------------------------- -%%TODO: this is a demo module.... - -module(emqttd_sysmon). -author("Feng Lee "). -behavior(gen_server). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). +-record(state, {tref, events = []}). %%------------------------------------------------------------------------------ %% @doc Start system monitor %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec start_link(Opts :: list(tuple())) -> + {ok, pid()} | ignore | {error, term()}. +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - erlang:system_monitor(self(), [{long_gc, 5000}, - {large_heap, 8 * 1024 * 1024}, - busy_port]), - {ok, #state{}}. +init([Opts]) -> + erlang:system_monitor(self(), parse_opt(Opts)), + {ok, TRef} = timer:send_interval(1000, reset), + {ok, #state{tref = TRef}}. + +parse_opt(Opts) -> + parse_opt(Opts, []). +parse_opt([], Acc) -> + Acc; +parse_opt([{long_gc, false}|Opts], Acc) -> + parse_opt(Opts, Acc); +parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) -> + parse_opt(Opts, [{long_gc, Ms}|Acc]); +parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) -> + parse_opt(Opts, [{long_schedule, Ms}|Acc]); +parse_opt([{large_heap, Size}|Opts], Acc) when is_integer(Size) -> + parse_opt(Opts, [{large_heap, Size}|Acc]); +parse_opt([{busy_port, true}|Opts], Acc) -> + parse_opt(Opts, [busy_port|Acc]); +parse_opt([{busy_port, false}|Opts], Acc) -> + parse_opt(Opts, Acc); +parse_opt([{busy_dist_port, true}|Opts], Acc) -> + parse_opt(Opts, [busy_dist_port|Acc]); +parse_opt([{busy_dist_port, false}|Opts], Acc) -> + parse_opt(Opts, Acc). handle_call(Request, _From, State) -> lager:error("Unexpected request: ~p", [Request]), {reply, {error, unexpected_request}, State}. handle_cast(Msg, State) -> - lager:error("unexpected msg: ~p", [Msg]), + lager:error("Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({monitor, GcPid, long_gc, Info}, State) -> - lager:error("long_gc: gcpid = ~p, ~p ~n ~p", [GcPid, process_info(GcPid, - [registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]), - {noreply, State}; +handle_info({monitor, Pid, long_gc, Info}, State) -> + suppress({long_gc, Pid}, fun() -> + WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]), + lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + publish(long_gc, WarnMsg) + end, State); -handle_info({monitor, GcPid, large_heap, Info}, State) -> - lager:error("large_heap: gcpid = ~p,~p ~n ~p", [GcPid, process_info(GcPid, - [registered_name, memory, message_queue_len,heap_size,total_heap_size]), Info]), - {noreply, State}; +handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> + suppress({long_schedule, Pid}, fun() -> + WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), + lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + publish(long_schedule, WarnMsg) + end, State); + +handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> + suppress({long_schedule, Port}, fun() -> + WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), + lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]), + publish(long_schedule, WarnMsg) + end, State); + +handle_info({monitor, Pid, large_heap, Info}, State) -> + suppress({large_heap, Pid}, fun() -> + WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), + lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + publish(large_heap, WarnMsg) + end, State); handle_info({monitor, SusPid, busy_port, Port}, State) -> - lager:error("busy_port: suspid = ~p, port = ~p", [process_info(SusPid, - [registered_name, memory, message_queue_len,heap_size,total_heap_size]), Port]), - {noreply, State}; + suppress({busy_port, Port}, fun() -> + WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), + lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + publish(busy_port, WarnMsg) + end, State); + +handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> + suppress({busy_dist_port, Port}, fun() -> + WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), + lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + publish(busy_dist_port, WarnMsg) + end, State); + +handle_info(reset, State) -> + {noreply, State#state{events = []}}; handle_info(Info, State) -> lager:error("Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{tref = TRef}) -> + timer:cancel(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. +suppress(Key, SuccFun, State = #state{events = Events}) -> + case lists:member(Key, Events) of + true -> + {noreply, State}; + false -> + SuccFun(), + {noreply, State#state{events = [Key|Events]}} + end. + +procinfo(Pid) -> + emqttd_vm:get_process_info(Pid) ++ emqttd_vm:get_process_gc(Pid). + +publish(Sysmon, WarnMsg) -> + Msg = emqttd_message:make(sysmon, topic(Sysmon), iolist_to_binary(WarnMsg)), + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). + +topic(Sysmon) -> + emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))). + From 063f59f846486489a326dd698ec5144cf2ec4471 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:07:05 +0800 Subject: [PATCH 08/10] fix process_info --- src/emqttd_vm.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/emqttd_vm.erl b/src/emqttd_vm.erl index 3836f0587..ab2daa01d 100644 --- a/src/emqttd_vm.erl +++ b/src/emqttd_vm.erl @@ -39,7 +39,9 @@ -export([get_process_list/0, get_process_info/0, + get_process_info/1, get_process_gc/0, + get_process_gc/1, get_process_group_leader_info/1, get_process_limit/0]). @@ -311,12 +313,12 @@ get_process_list(Pid) when is_pid(Pid) -> get_process_info() -> [get_process_info(Pid) || Pid <- processes()]. get_process_info(Pid) when is_pid(Pid) -> - [process_info(Pid, Key) || Key <- ?PROCESS_INFO]. + process_info(Pid, ?PROCESS_INFO). get_process_gc() -> [get_process_gc(Pid) || Pid <- processes()]. get_process_gc(Pid) when is_pid(Pid) -> - [process_info(Pid, Key) || Key <- ?PROCESS_GC]. + process_info(Pid, ?PROCESS_GC). get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) -> [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO)]. From 1ce01bdf7aae530b14f4ed851ecacc376ddeb396 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 12 Oct 2015 21:07:19 +0800 Subject: [PATCH 09/10] space --- include/emqttd_cli.hrl | 1 - 1 file changed, 1 deletion(-) diff --git a/include/emqttd_cli.hrl b/include/emqttd_cli.hrl index c2dc1ddba..55d7d1739 100644 --- a/include/emqttd_cli.hrl +++ b/include/emqttd_cli.hrl @@ -32,4 +32,3 @@ -define(USAGE(CmdList), [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]). - From 7a93f4b2a301b3e4e545e432ad94590a20873b36 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 12 Oct 2015 21:20:24 +0800 Subject: [PATCH 10/10] order --- rel/files/emqttd.config.development | 6 +++--- rel/files/emqttd.config.production | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index fed80e9c4..9b8eedb2d 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -249,13 +249,13 @@ %% Long GC {long_gc, 100}, + %% Long Schedule(ms) + {long_schedule, 50}, + %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. %% 8 * 1024 * 1024 {large_heap, 8388608}, - %% Long Schedule(ms) - {long_schedule, 50}, - %% Busy Port {busy_port, true}, diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 4d92dfb0f..639630672 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -242,13 +242,13 @@ %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 {long_gc, false}, + %% Long Schedule(ms) + {long_schedule, 50}, + %% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. %% 8 * 1024 * 1024 {large_heap, 8388608}, - %% Long Schedule(ms) - {long_schedule, 50}, - %% Busy Port {busy_port, true},