This commit is contained in:
Feng Lee 2015-03-08 13:38:59 +08:00
parent 1fc9eb287d
commit c7c7b597c5
1 changed files with 18 additions and 7 deletions

View File

@ -29,7 +29,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(TAB, emqtt_client). -define(TABLE, emqtt_client).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% API Function Exports %% API Function Exports
@ -39,6 +39,8 @@
-export([lookup/1, register/2, unregister/2]). -export([lookup/1, register/2, unregister/2]).
-export([getstats/0]).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
@ -50,6 +52,8 @@
terminate/2, terminate/2,
code_change/3]). code_change/3]).
-record(state, {max = 0}).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% API Function Definitions %% API Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
@ -80,17 +84,20 @@ register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) -> unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
gen_server:cast(?SERVER, {unregister, ClientId, Pid}). gen_server:cast(?SERVER, {unregister, ClientId, Pid}).
getstats(?SERVER) ->
gen_server:call(?SERVER, getstats).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([]) -> init([]) ->
%on one node %on one node
ets:new(?TAB, [set, named_table, protected]), ets:new(?TABLE, [set, named_table, protected]),
{ok, none}. {ok, none}.
handle_call({register, ClientId, Pid}, _From, State) -> handle_call({register, ClientId, Pid}, _From, State) ->
case ets:lookup(?TAB, ClientId) of case ets:lookup(?TABLE, ClientId) of
[{_, Pid, _}] -> [{_, Pid, _}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
ignore; ignore;
@ -103,14 +110,18 @@ handle_call({register, ClientId, Pid}, _From, State) ->
end, end,
{reply, ok, State}; {reply, ok, State};
handle_call(getstats, _From, State = #state{max = Max}) ->
Stats = [{total, ets:info(?TABLE, size)}, {max, Max}],
{reply, Stats, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast({unregister, ClientId, Pid}, State) -> handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?TAB, ClientId) of case ets:lookup(?TABLE, ClientId) of
[{_, Pid, MRef}] -> [{_, Pid, MRef}] ->
erlang:demonitor(MRef), erlang:demonitor(MRef),
ets:delete(?TAB, ClientId); ets:delete(?TABLE, ClientId);
[_] -> [_] ->
ignore; ignore;
[] -> [] ->
@ -122,7 +133,7 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}), ets:match_delete(?TABLE, {{'_', DownPid, MRef}}),
{noreply, State}; {noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
@ -135,5 +146,5 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
insert(ClientId, Pid) -> insert(ClientId, Pid) ->
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}). ets:insert(?TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}).