diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 182959df6..63ca9a047 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -80,9 +80,10 @@ clientid :: binary() | undefined, username :: binary() | undefined, ipaddress :: inet:ip_address(), + client_pid :: pid(), + client_mon :: reference(), clean_sess :: boolean(), - proto_ver :: 3 | 4, - client_pid :: pid() + proto_ver :: 3 | 4 }). -type mqtt_client() :: #mqtt_client{}. diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 1c109536e..1c3effa31 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -82,10 +82,10 @@ lookup(ClientId) when is_binary(ClientId) -> %% @doc Register clientId with pid. %% @end %%------------------------------------------------------------------------------ --spec register(ClientId :: binary()) -> ok. -register(ClientId) when is_binary(ClientId) -> +-spec register(Client :: mqtt_client()) -> ok. +register(Client = #mqtt_client{clientid = ClientId}) -> CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:call(CmPid, {register, ClientId, self()}, infinity). + gen_server:call(CmPid, {register, Client}, infinity). %%------------------------------------------------------------------------------ %% @doc Unregister clientId with pid. @@ -104,18 +104,18 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. -handle_call({register, ClientId, Pid}, _From, State) -> +handle_call({register, Client = #mqtt_client{clientid = ClientId, client_pid = Pid}}, _From, State) -> case ets:lookup(?CLIENT_TAB, ClientId) of - [{_, Pid, _}] -> + [#mqtt_client{client_pid = Pid}] -> lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; - [{_, OldPid, MRef}] -> + [#mqtt_client{client_pid = OldPid, client_mon = MRef}] -> lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), OldPid ! {stop, duplicate_id, Pid}, erlang:demonitor(MRef), - ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}); + ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}); [] -> - ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}) + ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}) end, {reply, ok, setstats(State)}; @@ -125,7 +125,7 @@ handle_call(Req, _From, State) -> handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(?CLIENT_TAB, ClientId) of - [{_, Pid, MRef}] -> + [#mqtt_client{client_pid = Pid, client_mon = MRef}] -> erlang:demonitor(MRef, [flush]), ets:delete(?CLIENT_TAB, ClientId); [_] -> @@ -138,8 +138,18 @@ handle_cast({unregister, ClientId, Pid}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, Reason}, State) -> + case ets:match_object(?CLIENT_TAB, {mqtt_client, '$1', '_', '_', DownPid, MRef, '_', '_'}) of + [] -> + ignore; + Clients -> + lists:foreach( + fun(Client = #mqtt_client{clientid = ClientId}) -> + ets:delete_object(?CLIENT_TAB, Client), + lager:error("Client ~s is Down: ~p", [ClientId, Reason]), + emqttd_broker:foreach_hooks(client_disconnected, [Reason, ClientId]) + end, Clients) + end, {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -158,3 +168,4 @@ code_change(_OldVsn, State, _Extra) -> setstats(State = #state{statsfun = StatsFun}) -> StatsFun(ets:info(?CLIENT_TAB, size)), State. + diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl index 53a338404..739fc08ec 100644 --- a/apps/emqttd/src/emqttd_cm_sup.erl +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -42,7 +42,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(emqttd_cm:table(), [set, named_table, public, + ets:new(emqttd_cm:table(), [set, named_table, public, {keypos, 2}, {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), diff --git a/apps/emqttd/src/emqttd_vm.erl b/apps/emqttd/src/emqttd_vm.erl index 217cf0092..6cc9f7c50 100644 --- a/apps/emqttd/src/emqttd_vm.erl +++ b/apps/emqttd/src/emqttd_vm.erl @@ -33,6 +33,87 @@ -export([loads/0]). +-define(SYSTEM_INFO, [ + allocated_areas, + allocator, + alloc_util_allocators, + build_type, + check_io, + compat_rel, + creation, + debug_compiled, + dist, + dist_ctrl, + driver_version, + elib_malloc, + dist_buf_busy_limit, + %fullsweep_after, % included in garbage_collection + garbage_collection, + %global_heaps_size, % deprecated + heap_sizes, + heap_type, + info, + kernel_poll, + loaded, + logical_processors, + logical_processors_available, + logical_processors_online, + machine, + %min_heap_size, % included in garbage_collection + %min_bin_vheap_size, % included in garbage_collection + modified_timing_level, + multi_scheduling, + multi_scheduling_blockers, + otp_release, + port_count, + process_count, + process_limit, + scheduler_bind_type, + scheduler_bindings, + scheduler_id, + schedulers, + schedulers_online, + smp_support, + system_version, + system_architecture, + threads, + thread_pool_size, + trace_control_word, + update_cpu_info, + version, + wordsize + ]). + +-define(SOCKET_OPTS, [ + active, + broadcast, + delay_send, + dontroute, + exit_on_close, + header, + keepalive, + nodelay, + packet, + packet_size, + read_packets, + recbuf, + reuseaddr, + send_timeout, + send_timeout_close, + sndbuf, + priority, + tos + ]). + + + +-export([loads/0, + get_system_info/0, + % get_statistics/0, + % get_process_info/0, + get_ports_info/0, + get_ets_info/0]). + timestamp() -> {MegaSecs, Secs, _MicroSecs} = os:timestamp(), MegaSecs * 1000000 + Secs. @@ -49,3 +130,142 @@ loads() -> ftos(F) -> [S] = io_lib:format("~.2f", [F]), S. +get_system_info() -> + [{Key, format_system_info(Key, get_system_info(Key))} || Key <- ?SYSTEM_INFO]. + +get_system_info(Key) -> + try erlang:system_info(Key) catch + error:badarg->undefined + end. + +%% conversion functions for erlang:system_info(Key) + +format_system_info(allocated_areas, List) -> + [convert_allocated_areas(Value) || Value <- List]; +format_system_info(allocator, {_,_,_,List}) -> + List; +format_system_info(dist_ctrl, List) -> + lists:map(fun({Node, Socket}) -> + {ok, Stats} = inet:getstat(Socket), + {Node, Stats} + end, List); +format_system_info(driver_version, Value) -> + list_to_binary(Value); +format_system_info(machine, Value) -> + list_to_binary(Value); +format_system_info(otp_release, Value) -> + list_to_binary(Value); +format_system_info(scheduler_bindings, Value) -> + tuple_to_list(Value); +format_system_info(system_version, Value) -> + list_to_binary(Value); +format_system_info(system_architecture, Value) -> + list_to_binary(Value); +format_system_info(version, Value) -> + list_to_binary(Value); +format_system_info(_, Value) -> + Value. + +convert_allocated_areas({Key, Value1, Value2}) -> + {Key, [Value1, Value2]}; +convert_allocated_areas({Key, Value}) -> + {Key, Value}. + + +get_ports_info()-> + [{pid_port_fun_to_atom(Port), get_port_info(Port)} || Port <- erlang:ports()]. + +get_port_info(Port) -> + Stat = get_socket_getstat(Port), + SockName = get_socket_sockname(Port), + Opts = get_socket_opts(Port), + Protocol = get_socket_protocol(Port), + Status = get_socket_status(Port), + Type = get_socket_type(Port), + + lists:flatten(lists:append([ + Stat, + SockName, + Opts, + Protocol, + Status, + Type + ])). + +get_socket_getstat(Socket) -> + case catch inet:getstat(Socket) of + {ok, Info} -> + Info; + _ -> + [] + end. + +get_socket_sockname(Socket) -> + case catch inet:sockname(Socket) of + {ok, {Ip, Port}} -> + [{ip, ip_to_binary(Ip)}, {port, Port}]; + _ -> + [] + end. + +ip_to_binary(Tuple) -> + iolist_to_binary(string:join(lists:map(fun integer_to_list/1, tuple_to_list(Tuple)), ".")). + + +get_socket_protocol(Socket) -> + case erlang:port_info(Socket, name) of + {name, "tcp_inet"} -> + [{protocol, tcp}]; + {name, "udp_inet"} -> + [{protocol, udp}]; + {name,"sctp_inet"} -> + [{protocol, sctp}]; + _ -> + [] + end. + +get_socket_status(Socket) -> + case catch prim_inet:getstatus(Socket) of + {ok, Status} -> + [{status, Status}]; + _ -> + [] + end. + +get_socket_type(Socket) -> + case catch prim_inet:gettype(Socket) of + {ok, Type} -> + [{type, tuple_to_list(Type)}]; + _ -> + [] + end. + +get_socket_opts(Socket) -> + [get_socket_opts(Socket, Key) || Key <- ?SOCKET_OPTS]. + +get_socket_opts(Socket, Key) -> + case catch inet:getopts(Socket, [Key]) of + {ok, Opt} -> + Opt; + _ -> + [] + end. + +get_ets_info() -> + [{Tab, get_ets_dets_info(ets, Tab)} || Tab <- ets:all()]. + +get_ets_dets_info(Type, Tab) -> + case Type:info(Tab) of + undefined -> []; + Entries when is_list(Entries) -> + [{Key, pid_port_fun_to_atom(Value)} || {Key, Value} <- Entries] + end. + +pid_port_fun_to_atom(Term) when is_pid(Term) -> + erlang:list_to_atom(pid_to_list(Term)); +pid_port_fun_to_atom(Term) when is_port(Term) -> + erlang:list_to_atom(erlang:port_to_list(Term)); +pid_port_fun_to_atom(Term) when is_function(Term) -> + erlang:list_to_atom(erlang:fun_to_list(Term)); +pid_port_fun_to_atom(Term) -> + Term.