diff --git a/apps/emqttd/src/emqttd_vm.erl b/apps/emqttd/src/emqttd_vm.erl index f4ce3cdd4..c083bfda2 100644 --- a/apps/emqttd/src/emqttd_vm.erl +++ b/apps/emqttd/src/emqttd_vm.erl @@ -26,88 +26,23 @@ %%%----------------------------------------------------------------------------- -module(emqttd_vm). +-define(UTIL_ALLOCATORS, [temp_alloc, + eheap_alloc, + binary_alloc, + ets_alloc, + driver_alloc, + sl_alloc, + ll_alloc, + fix_alloc, + std_alloc + ]). + + -author("Feng Lee "). --define(SYSTEM_INFO, [ - allocated_areas, - allocator, - alloc_util_allocators, - build_type, - check_io, - compat_rel, - creation, - debug_compiled, - dist, - dist_ctrl, - driver_version, - elib_malloc, - dist_buf_busy_limit, - %fullsweep_after, % included in garbage_collection - garbage_collection, - %global_heaps_size, % deprecated - heap_sizes, - heap_type, - info, - kernel_poll, - loaded, - logical_processors, - logical_processors_available, - logical_processors_online, - machine, - %min_heap_size, % included in garbage_collection - %min_bin_vheap_size, % included in garbage_collection - modified_timing_level, - multi_scheduling, - multi_scheduling_blockers, - otp_release, - port_count, - process_count, - process_limit, - scheduler_bind_type, - scheduler_bindings, - scheduler_id, - schedulers, - schedulers_online, - smp_support, - system_version, - system_architecture, - threads, - thread_pool_size, - trace_control_word, - update_cpu_info, - version, - wordsize - ]). - --define(SOCKET_OPTS, [ - active, - broadcast, - delay_send, - dontroute, - exit_on_close, - header, - keepalive, - nodelay, - packet, - packet_size, - read_packets, - recbuf, - reuseaddr, - send_timeout, - send_timeout_close, - sndbuf, - priority, - tos - ]). - - - -export([loads/0, - get_system_info/0, - % get_statistics/0, - % get_process_info/0, - get_ports_info/0, - get_ets_info/0]). + scheduler_usage/1, + get_memory/0]). loads() -> [{load1, ftos(cpu_sup:avg1()/256)}, @@ -117,142 +52,69 @@ loads() -> ftos(F) -> [S] = io_lib:format("~.2f", [F]), S. -get_system_info() -> - [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO]. +%%%% erlang vm scheduler_usage fun copied from recon +scheduler_usage(Interval) when is_integer(Interval) -> + %% We start and stop the scheduler_wall_time system flag + %% if it wasn't in place already. Usually setting the flag + %% should have a CPU impact(make it higher) only when under low usage. + FormerFlag = erlang:system_flag(scheduler_wall_time), + First = erlang:statistics(scheduler_wall_time), + timer:sleep(Interval), + Last = erlang:statistics(scheduler_wall_time), + erlang:system_flag(scheduler_wall_time, FormerFlag), + scheduler_usage_diff(First, Last). -get_system_info(Key) -> - try erlang:system_info(Key) catch - error:badarg->undefined - end. +scheduler_usage_diff(First, Last) -> + lists:map( + fun({{I, A0, T0},{I, A1, T1}}) ->{I, (A1 - A0)/(T1 - T0)}end, + lists:zip(lists:sort(First), lists:sort(Last)) + ). -%% conversion functions for erlang:system_info(Key) +get_memory()-> + [{Key, get_memory(Key, current)} || Key <- [used, allocated, unused]] ++ erlang:memory(). -format_system_info(allocated_areas, List) -> - [convert_allocated_areas(Value) || Value <- List]; -format_system_info(allocator, {_,_,_,List}) -> - List; -format_system_info(dist_ctrl, List) -> - lists:map(fun({Node, Socket}) -> - {ok, Stats} = inet:getstat(Socket), - {Node, Stats} - end, List); -format_system_info(driver_version, Value) -> - list_to_binary(Value); -format_system_info(machine, Value) -> - list_to_binary(Value); -format_system_info(otp_release, Value) -> - list_to_binary(Value); -format_system_info(scheduler_bindings, Value) -> - tuple_to_list(Value); -format_system_info(system_version, Value) -> - list_to_binary(Value); -format_system_info(system_architecture, Value) -> - list_to_binary(Value); -format_system_info(version, Value) -> - list_to_binary(Value); -format_system_info(_, Value) -> - Value. +get_memory(used, Keyword) -> + lists:sum(lists:map(fun({_, Prop}) -> + container_size(Prop, Keyword, blocks_size) + end, util_alloc())). +util_alloc()-> + alloc(?UTIL_ALLOCATORS). -convert_allocated_areas({Key, Value1, Value2}) -> - {Key, [Value1, Value2]}; -convert_allocated_areas({Key, Value}) -> - {Key, Value}. +alloc()-> + {_Mem, Allocs} = snapshot_int(), + Allocs. +alloc(Type) -> + [{{T, Instance}, Props} || {{T, Instance}, Props} <- alloc(), lists:member(T, Type)]. +snapshot_int() -> + {erlang:memory(), allocators()}. -get_ports_info()-> - [{pid_port_fun_to_atom(Port), get_port_info(Port)} || Port <- erlang:ports()]. +allocators() -> + UtilAllocators = erlang:system_info(alloc_util_allocators), + Allocators = [sys_alloc, mseg_alloc|UtilAllocators], + [{{A, N},lists:sort(proplists:deleted(versions, Props))} || + A <- Allocators, + Allocs <- [erlang:system_info({allocator, A})], + Allocs =/= false, + {_, N, Props} <- Allocs]. -get_port_info(Port) -> - Stat = get_socket_getstat(Port), - SockName = get_socket_sockname(Port), - Opts = get_socket_opts(Port), - Protocol = get_socket_protocol(Port), - Status = get_socket_status(Port), - Type = get_socket_type(Port), +container_size(Prop, Keyword, Container) -> + Sbcs = container_value(Prop, Keyword, sbcs, Container), + Mbcs = container_value(Prop, Keyword, mbcs, Container), + Sbcs+Mbcs. - lists:flatten(lists:append([ - Stat, - SockName, - Opts, - Protocol, - Status, - Type - ])). +container_value(Prop, Keyword, Type, Container) when is_atom(Keyword)-> + container_value(Prop, 2, Type, Container); +container_value(Props, Pos, mbcs = Type, Container) when is_integer(Pos)-> + Pool = case proplists:get_value(mbcs_pool, Props) of + PoolProps when PoolProps =/= undefined -> + element(Pos, lists:keyfind(Container, 1, PoolProps)); + _ -> 0 + end, + TypeProps = proplists:get_value(Type, Props), + Pool + element(Pos, lists:keyfind(Container, 1, TypeProps)); -get_socket_getstat(Socket) -> - case catch inet:getstat(Socket) of - {ok, Info} -> - Info; - _ -> - [] - end. +container_value(Props, Pos, Type, Container) -> + TypeProps = proplists:get_value(Type, Props), + element(Pos, lists:keyfind(Container, 1, TypeProps)). -get_socket_sockname(Socket) -> - case catch inet:sockname(Socket) of - {ok, {Ip, Port}} -> - [{ip, ip_to_binary(Ip)}, {port, Port}]; - _ -> - [] - end. - -ip_to_binary(Tuple) -> - iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")). - - -get_socket_protocol(Socket) -> - case erlang:port_info(Socket, name) of - {name, "tcp_inet"} -> - [{protocol, tcp}]; - {name, "udp_inet"} -> - [{protocol, udp}]; - {name,"sctp_inet"} -> - [{protocol, sctp}]; - _ -> - [] - end. - -get_socket_status(Socket) -> - case catch prim_inet:getstatus(Socket) of - {ok, Status} -> - [{status, Status}]; - _ -> - [] - end. - -get_socket_type(Socket) -> - case catch prim_inet:gettype(Socket) of - {ok, Type} -> - [{type, tuple_to_list(Type)}]; - _ -> - [] - end. - -get_socket_opts(Socket) -> - [get_socket_opts(Socket, Key) || Key <- ?SOCKET_OPTS]. - -get_socket_opts(Socket, Key) -> - case catch inet:getopts(Socket, [Key]) of - {ok, Opt} -> - Opt; - _ -> - [] - end. - -get_ets_info() -> - [{Tab, get_ets_dets_info(ets, Tab)} || Tab <- ets:all()]. - -get_ets_dets_info(Type, Tab) -> - case Type:info(Tab) of - undefined -> []; - Entries when is_list(Entries) -> - [{Key, pid_port_fun_to_atom(Value)} || {Key, Value} <- Entries] - end. - -pid_port_fun_to_atom(Term) when is_pid(Term) -> - erlang:list_to_atom(pid_to_list(Term)); -pid_port_fun_to_atom(Term) when is_port(Term) -> - erlang:list_to_atom(erlang:port_to_list(Term)); -pid_port_fun_to_atom(Term) when is_function(Term) -> - erlang:list_to_atom(erlang:fun_to_list(Term)); -pid_port_fun_to_atom(Term) -> - Term.