From 5beb38cd6877cf0b6b6f1ff5a36d0994f9046056 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 5 Jan 2015 13:04:53 +0800 Subject: [PATCH] misc fix --- apps/emqtt/include/emqtt_frame.hrl | 2 - apps/emqtt/src/emqtt.erl | 2 +- apps/emqtt/src/emqtt_client.erl | 4 +- apps/emqtt/src/emqtt_cm.erl | 59 ++++++++++++++++++++---------- apps/emqtt/src/emqtt_router.erl | 28 ++++++++++++++ 5 files changed, 70 insertions(+), 25 deletions(-) create mode 100644 apps/emqtt/src/emqtt_router.erl diff --git a/apps/emqtt/include/emqtt_frame.hrl b/apps/emqtt/include/emqtt_frame.hrl index bdaa087b7..f1ae3e7d5 100644 --- a/apps/emqtt/include/emqtt_frame.hrl +++ b/apps/emqtt/include/emqtt_frame.hrl @@ -25,8 +25,6 @@ -define(MQTT_PROTO_MAJOR, 3). -define(MQTT_PROTO_MINOR, 1). --define(CLIENT_ID_MAXLEN, 23). - %% frame types -define(CONNECT, 1). diff --git a/apps/emqtt/src/emqtt.erl b/apps/emqtt/src/emqtt.erl index d881106ad..0b57307e0 100644 --- a/apps/emqtt/src/emqtt.erl +++ b/apps/emqtt/src/emqtt.erl @@ -29,7 +29,7 @@ {packet, raw}, {reuseaddr, true}, {backlog, 512}, - {nodelay, false} + {nodelay, true} ]). listen(Listeners) when is_list(Listeners) -> diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index ef7a01d2d..46a97f807 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -173,9 +173,9 @@ handle_info(Info, State) -> ?ERROR("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. -terminate(_Reason, #state{keep_alive=KeepAlive}) -> +terminate(_Reason, #state{client_id = ClientId, keep_alive=KeepAlive}) -> emqtt_keep_alive:cancel(KeepAlive), - emqtt_cm:destroy(self()), + emqtt_cm:destroy(ClientId, self()), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index 9061a6800..d6809f92e 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -29,15 +29,15 @@ -define(SERVER, ?MODULE). +-define(TAB, emqtt_client). + %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ -export([start_link/0]). --export([create/2, - destroy/1, - lookup/1]). +-export([lookup/1, create/2, destroy/2]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -59,28 +59,17 @@ start_link() -> -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) -> case ets:lookup(emqtt_client, ClientId) of - [{_, Pid}] -> Pid; + [{_, Pid, _}] -> Pid; [] -> undefined end. -spec create(ClientId :: binary(), Pid :: pid()) -> ok. create(ClientId, Pid) -> - case lookup(ClientId) of - Pid -> - ignore; - OldPid when is_pid(OldPid) -> - OldPid ! {stop, duplicate_id}, - ets:insert(emqtt_client, {ClientId, Pid}); - undefined -> - ets:insert(emqtt_client, {ClientId, Pid}) - end. + gen_server:call(?SERVER, {create, ClientId, Pid}). --spec destroy(binary() | pid()) -> ok. -destroy(ClientId) when is_binary(ClientId) -> - ets:delete(emqtt_client, ClientId); - -destroy(Pid) when is_pid(Pid) -> - ets:match_delete(emqtt_client, {{'_', Pid}}). +-spec destroy(ClientId :: binary(), Pid :: pid()) -> ok. +destroy(ClientId, Pid) when is_binary(ClientId) -> + gen_server:cast(?SERVER, {destroy, ClientId, Pid}); %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -88,15 +77,45 @@ destroy(Pid) when is_pid(Pid) -> init(Args) -> %on one node - ets:new(emqtt_client, [named_table, public]), + ets:new(?TAB, [set, named_table, protected]), {ok, Args}. +handle_call({create, ClientId, Pid}, _From, State) -> + case ets:lookup(?TAB, ClientId) of + [{_, Pid, _}] -> + ?ERROR("client '~s' has been registered with ~p", [ClientId, Pid]), + ignore; + [{_, OldPid, MRef}] -> + OldPid ! {stop, duplicate_id}, + erlang:demonitor(MRef), + ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}); + [] -> + ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)}) + end. + {reply, ok, State}; + handle_call(_Request, _From, State) -> {reply, ok, State}. +handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) -> + case ets:lookup(?TAB, ClientId) of + [{_, Pid, MRef}] -> + erlang:demonitor(MRef), + ets:delete(?TAB, ClientId); + [_] -> + ignore; + [] -> + ?ERROR("cannot find client '~s' with ~p", [ClientId, Pid]) + end + {noreply, State}; + handle_cast(_Msg, State) -> {noreply, State}. +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> + ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}), + {noreply, State}. + handle_info(_Info, State) -> {noreply, State}. diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl new file mode 100644 index 000000000..98c7329d0 --- /dev/null +++ b/apps/emqtt/src/emqtt_router.erl @@ -0,0 +1,28 @@ +%%----------------------------------------------------------------------------- +%% Copyright (c) 2014, Feng Lee +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in all +%% copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%% SOFTWARE. +%%------------------------------------------------------------------------------ + +-module(emqtt_router). + +%%Router Chain--> +%%--->In +%%Out<--- +