Merge pull request #301 from emqtt/dev-feng

Fix issue #284
This commit is contained in:
Feng Lee 2015-09-18 10:42:30 +08:00
commit 4c8b18389f
6 changed files with 19 additions and 58 deletions

View File

@ -141,7 +141,7 @@
%% PubSub
{pubsub, [
%% default should be scheduler numbers
%% {pool_size, 4}
%% {pool_size, 8}
]},
%% Bridge
{bridge, [

View File

@ -24,6 +24,5 @@
{runner_base_dir, "${RUNNER_SCRIPT_DIR%/*}"}.
{runner_etc_dir, "$RUNNER_BASE_DIR/etc"}.
{runner_log_dir, "$RUNNER_BASE_DIR/log"}.
{pipe_dir, "/tmp/$RUNNER_BASE_DIR/"}.
{pipe_dir, "/tmp/$RUNNER_SCRIPT/"}.
{runner_user, ""}.

View File

@ -52,7 +52,7 @@ init([]) ->
Name = {emqttd_pubsub, I},
gproc_pool:add_worker(pubsub, Name, I),
{Name, {emqttd_pubsub, start_link, [I, Opts]},
permanent, 5000, worker, [emqttd_pubsub]}
permanent, 10000, worker, [emqttd_pubsub]}
end, lists:seq(1, PoolSize)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -267,7 +267,7 @@ resume_session(Session = #mqtt_session{client_id = ClientId,
{badrpc, Reason} ->
lager:critical("Resume session ~s on remote node ~p failed for ~p",
[ClientId, Node, Reason]),
{error, list_to_atom("session_" ++ atom_to_list(Reason))}
{error, Reason}
end;
false ->
lager:critical("Session ~s died for node ~p down!", [ClientId, Node]),

View File

@ -89,9 +89,9 @@ init([]) ->
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) ->
case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of
{ok, exists} ->
{ok, exists} ->
{reply, {error, existed}, State};
{ok, Trace} ->
{ok, Trace} ->
{reply, ok, State#state{trace_map = maps:put(Who, {Trace, LogFile}, TraceMap)}};
{error, Error} ->
{reply, {error, Error}, State}

View File

@ -27,6 +27,8 @@
-module(emqttd_vm).
-author('huangdan').
-export([schedulers/0]).
-export([microsecs/0]).
@ -39,7 +41,7 @@
get_process_info/0,
get_process_gc/0,
get_process_group_leader_info/1,
get_process_limit/0]).
get_process_limit/0]).
-export([get_ets_list/0,
get_ets_info/0,
@ -59,8 +61,7 @@
sl_alloc,
ll_alloc,
fix_alloc,
std_alloc
]).
std_alloc]).
-define(PROCESS_LIST, [initial_call,
reductions,
@ -139,8 +140,7 @@
trace_control_word,
update_cpu_info,
version,
wordsize
]).
wordsize]).
-define(SOCKET_OPTS, [active,
broadcast,
@ -175,8 +175,8 @@ microsecs() ->
(Mega * 1000000 + Sec) * 1000000 + Micro.
loads() ->
[{load1, ftos(cpu_sup:avg1()/256)},
{load5, ftos(cpu_sup:avg5()/256)},
[{load1, ftos(cpu_sup:avg1()/256)},
{load5, ftos(cpu_sup:avg5()/256)},
{load15, ftos(cpu_sup:avg15()/256)}].
get_system_info() ->
@ -306,24 +306,20 @@ get_process_list()->
[get_process_list(Pid) || Pid <- processes()].
get_process_list(Pid) when is_pid(Pid) ->
Info = [process_info(Pid, Key) || Key <- ?PROCESS_LIST],
[{pid, pid_port_fun_to_atom(Pid)}] ++ lists:flatten([convert_pid_info(Item) || Item <- Info]).
[{pid, Pid} | [process_info(Pid, Key) || Key <- ?PROCESS_LIST]].
get_process_info() ->
[get_process_info(Pid) || Pid <- processes()].
get_process_info(Pid) when is_pid(Pid) ->
ProcessInfo = [process_info(Pid, Key) || Key <- ?PROCESS_INFO],
lists:flatten([convert_pid_info(Item) || Item <- ProcessInfo]).
[process_info(Pid, Key) || Key <- ?PROCESS_INFO].
get_process_gc() ->
[get_process_gc(Pid) || Pid <- processes()].
get_process_gc(Pid) when is_pid(Pid) ->
GcInfo = [process_info(Pid, Key) || Key <- ?PROCESS_GC],
lists:flatten([convert_pid_info(E) || E <- GcInfo]).
[process_info(Pid, Key) || Key <- ?PROCESS_GC].
get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) ->
LeaderInfo = [{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO)],
lists:flatten([convert_pid_info(E) || E <- LeaderInfo]).
[{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO)].
get_process_limit() ->
erlang:system_info(process_limit).
@ -453,41 +449,7 @@ mapping([], Acc) ->
mapping([{owner, V}|Entries], Acc) when is_pid(V) ->
OwnerInfo = process_info(V),
Owner = proplists:get_value(registered_name, OwnerInfo, undefined),
mapping(Entries, [{owner, pid_port_fun_to_atom(Owner)}|Acc]);
mapping(Entries, [{owner, Owner}|Acc]);
mapping([{Key, Value}|Entries], Acc) ->
mapping(Entries, [{Key, pid_port_fun_to_atom(Value)}|Acc]).
mapping(Entries, [{Key, Value}|Acc]).
%ip_to_binary(Tuple) ->
% iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")).
convert_pid_info({initial_call,{_M, F, _A}}) ->
{initial_call, F};
convert_pid_info({current_function, {M, F, A}}) ->
{current_function, list_to_atom(lists:concat([atom_to_list(M),":",atom_to_list(F),"/",integer_to_list(A)]))};
convert_pid_info({suspending, List}) ->
{suspending, [pid_port_fun_to_atom(E) || E <- List]};
convert_pid_info({binary, List}) ->
{binary,[tuple_to_list(E) || E <- List]};
convert_pid_info({Key, Term}) when is_pid(Term) or is_port(Term) or is_function(Term) ->
{Key, pid_port_fun_to_atom(Term)};
convert_pid_info(Item) ->
Item.
%convert_port_info({name, Name}) ->
% {name, list_to_binary(Name)};
%convert_port_info({links, List}) ->
% {links, [pid_port_fun_to_atom(Item) || Item <- List]};
%convert_port_info({connected, Pid}) ->
% erlang:process_info(Pid, registered_name);
%convert_port_info(Item) ->
% Item.
pid_port_fun_to_atom(Term) when is_pid(Term) ->
erlang:list_to_atom(pid_to_list(Term));
pid_port_fun_to_atom(Term) when is_port(Term) ->
erlang:list_to_atom(erlang:port_to_list(Term));
pid_port_fun_to_atom(Term) when is_function(Term) ->
erlang:list_to_atom(erlang:fun_to_list(Term));
pid_port_fun_to_atom(Term) ->
Term.