From a34ae660dc57501ad3e7eb55c1ee97ef89d044e0 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Wed, 10 Dec 2014 15:10:50 +0800 Subject: [PATCH] fix router --- apps/emqtt/src/emqtt.erl | 9 +++--- apps/emqtt/src/emqtt_auth_internal.erl | 9 ++++-- apps/emqtt/src/emqtt_client.erl | 43 ++++++++++++-------------- apps/emqtt/src/emqtt_cm.erl | 11 ++++--- apps/emqtt/src/emqtt_lib.erl | 16 ---------- apps/emqtt/src/emqtt_retained.erl | 5 +-- 6 files changed, 40 insertions(+), 53 deletions(-) delete mode 100644 apps/emqtt/src/emqtt_lib.erl diff --git a/apps/emqtt/src/emqtt.erl b/apps/emqtt/src/emqtt.erl index 7931e8416..d881106ad 100644 --- a/apps/emqtt/src/emqtt.erl +++ b/apps/emqtt/src/emqtt.erl @@ -26,10 +26,10 @@ -define(MQTT_SOCKOPTS, [ binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, false} + {packet, raw}, + {reuseaddr, true}, + {backlog, 512}, + {nodelay, false} ]). listen(Listeners) when is_list(Listeners) -> @@ -43,3 +43,4 @@ listen({http, Port, Options}) -> MFArgs = {emqtt_http, handle, []}, mochiweb:start_http(Port, Options, MFArgs). + diff --git a/apps/emqtt/src/emqtt_auth_internal.erl b/apps/emqtt/src/emqtt_auth_internal.erl index d864dc0a3..0824fddde 100644 --- a/apps/emqtt/src/emqtt_auth_internal.erl +++ b/apps/emqtt/src/emqtt_auth_internal.erl @@ -40,7 +40,7 @@ check(undefined, _) -> false; check(_, undefined) -> false; -check(Username, Password) when is_binary(Username) -> +check(Username, Password) when is_binary(Username), is_binary(Password) -> PasswdHash = crypto:hash(md5, Password), case mnesia:dirty_read(emqtt_user, Username) of [#emqtt_user{passwdhash=PasswdHash}] -> true; @@ -48,7 +48,12 @@ check(Username, Password) when is_binary(Username) -> end. add(Username, Password) when is_binary(Username) and is_binary(Password) -> - mnesia:dirty_write(#emqtt_user{username=Username, passwdhash=crypto:hash(md5, Password)}). + mnesia:dirty_write( + #emqtt_user{ + username=Username, + passwdhash=crypto:hash(md5, Password) + } + ). delete(Username) when is_binary(Username) -> mnesia:dirty_delete(emqtt_user, Username). diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index c0cf6df51..d6b62cfe0 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -22,9 +22,13 @@ -module(emqtt_client). +-author('feng@slimchat.io'). + -behaviour(gen_server). --export([start_link/1, info/1, go/2, stop/2]). +-export([start_link/1, + info/1, + go/2]). -export([init/1, handle_call/3, @@ -69,11 +73,8 @@ info(Pid) -> go(Pid, Sock) -> gen_server:call(Pid, {go, Sock}). -stop(Pid, Error) -> - gen_server:cast(Pid, {stop, Error}). - init([Sock]) -> - {ok, #state{socket = Sock}}. + {ok, #state{socket = Sock}, 1000}. handle_call({go, Sock}, _From, State=#state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), @@ -101,14 +102,17 @@ handle_call(info, _From, #state{conn_name=ConnName, handle_call(_Req, _From, State) -> {reply, ok, State}. -handle_cast({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) -> - ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), - stop({shutdown, duplicate_id}, State); - handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. -handle_info({route, Msg}, #state{socket = Sock, message_id=MsgId} = State) -> +handle_info(timeout, State) -> + stop({shutdown, timeout}, State); + +handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) -> + ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), + stop({shutdown, duplicate_id}, State); + +handle_info({dispatch, Msg}, #state{socket = Sock, message_id=MsgId} = State) -> #mqtt_msg{retain = Retain, qos = Qos, @@ -155,7 +159,6 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - ?ERROR("sock error: ~p~n", [Reason]), {noreply, State}; handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> @@ -169,24 +172,17 @@ handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) -> end; handle_info(Info, State) -> - ?ERROR("unext info :~p",[Info]), + ?ERROR("badinfo :~p",[Info]), {stop, {badinfo, Info}, State}. -terminate(_Reason, #state{client_id=ClientId, keep_alive=KeepAlive}) -> - ok = emqtt_registry:unregister(ClientId), +terminate(_Reason, #state{keep_alive=KeepAlive}) -> emqtt_keep_alive:cancel(KeepAlive), + emqtt_cm:destroy(self()), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -throw_on_error(E, Thunk) -> - case Thunk() of - {error, Reason} -> throw({E, Reason}); - {ok, Res} -> Res; - Res -> Res - end. - async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); @@ -259,7 +255,7 @@ process_request(?CONNECT, {?CONNACK_CREDENTIALS, State}; true -> ?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]), - ok = emqtt_registry:register(ClientId, self()), + emqtt_cm:create(ClientId, self()), KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout), {?CONNACK_ACCEPT, State #state{ will_msg = make_will_msg(Var), @@ -362,7 +358,6 @@ process_request(?UNSUBSCRIBE, {ok, State}; process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) -> - %?INFO("PINGREQ...",[]), %Keep alive timer KeepAlive1 = emqtt_keep_alive:reset(KeepAlive), send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}), @@ -398,7 +393,7 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain, send_will_msg(#state{will_msg = undefined}) -> ignore; send_will_msg(#state{will_msg = WillMsg }) -> - emqtt_router:publish(WillMsg). + emqtt_pubsub:publish(WillMsg). send_frame(Sock, Frame) -> ?INFO("send frame:~p", [Frame]), diff --git a/apps/emqtt/src/emqtt_cm.erl b/apps/emqtt/src/emqtt_cm.erl index 28f6ef240..9061a6800 100644 --- a/apps/emqtt/src/emqtt_cm.erl +++ b/apps/emqtt/src/emqtt_cm.erl @@ -66,13 +66,14 @@ lookup(ClientId) -> -spec create(ClientId :: binary(), Pid :: pid()) -> ok. create(ClientId, Pid) -> case lookup(ClientId) of + Pid -> + ignore; OldPid when is_pid(OldPid) -> - %%TODO: FIX STOP... - emqtt_client:stop(OldPid, duplicate_id); + OldPid ! {stop, duplicate_id}, + ets:insert(emqtt_client, {ClientId, Pid}); undefined -> - ok - end, - ets:insert(emqtt_client, {ClientId, Pid}). + ets:insert(emqtt_client, {ClientId, Pid}) + end. -spec destroy(binary() | pid()) -> ok. destroy(ClientId) when is_binary(ClientId) -> diff --git a/apps/emqtt/src/emqtt_lib.erl b/apps/emqtt/src/emqtt_lib.erl deleted file mode 100644 index 93925c66d..000000000 --- a/apps/emqtt/src/emqtt_lib.erl +++ /dev/null @@ -1,16 +0,0 @@ - -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% Developer of the eMQTT Code is -%% Copyright (c) 2012 Ery Lee. All rights reserved. -%% --module(emqtt_lib). - diff --git a/apps/emqtt/src/emqtt_retained.erl b/apps/emqtt/src/emqtt_retained.erl index fcb7f925f..fb57d223a 100644 --- a/apps/emqtt/src/emqtt_retained.erl +++ b/apps/emqtt/src/emqtt_retained.erl @@ -24,6 +24,8 @@ -author('feng@slimchat.io'). +%%TODO: FIXME Later... + %% %% <> @@ -76,11 +78,10 @@ delete(Topic) -> gen_server:cast(?MODULE, {delete, Topic}). send(Topic, Client) -> - [Client ! {route, Msg} ||{_, Msg} <- lookup(Topic)]. + [Client ! {dispatch, Msg} ||{_, Msg} <- lookup(Topic)]. init([]) -> ets:new(retained_msg, [set, protected, named_table]), - ?INFO("~p is started.", [?MODULE]), {ok, #state{}}. handle_call(Req, _From, State) ->