Merge branch 'dev' of github.com:slimpp/emqtt into dev

This commit is contained in:
Feng Lee 2014-12-10 18:23:32 +08:00
commit 5fc497ec1a
14 changed files with 57 additions and 53 deletions

1
.gitignore vendored
View File

@ -10,3 +10,4 @@ rel/emqtt
.concrete/DEV_MODE
.rebar
test/ebin/*.beam
.exrc

View File

@ -26,10 +26,10 @@
-define(MQTT_SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, false}
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, false}
]).
listen(Listeners) when is_list(Listeners) ->
@ -43,3 +43,4 @@ listen({http, Port, Options}) ->
MFArgs = {emqtt_http, handle, []},
mochiweb:start_http(Port, Options, MFArgs).

View File

@ -42,7 +42,7 @@ check(undefined, _) -> false;
check(_, undefined) -> false;
check(Username, Password) when is_binary(Username) ->
check(Username, Password) when is_binary(Username), is_binary(Password) ->
PasswdHash = crypto:hash(md5, Password),
case mnesia:dirty_read(emqtt_user, Username) of
[#emqtt_user{passwdhash=PasswdHash}] -> true;
@ -50,7 +50,12 @@ check(Username, Password) when is_binary(Username) ->
end.
add(Username, Password) when is_binary(Username) and is_binary(Password) ->
mnesia:dirty_write(#emqtt_user{username=Username, passwdhash=crypto:hash(md5, Password)}).
mnesia:dirty_write(
#emqtt_user{
username=Username,
passwdhash=crypto:hash(md5, Password)
}
).
delete(Username) when is_binary(Username) ->
mnesia:dirty_delete(emqtt_user, Username).

View File

@ -26,7 +26,9 @@
-behaviour(gen_server).
-export([start_link/1, info/1, go/2, stop/2]).
-export([start_link/1,
info/1,
go/2]).
-export([init/1,
handle_call/3,
@ -71,11 +73,8 @@ info(Pid) ->
go(Pid, Sock) ->
gen_server:call(Pid, {go, Sock}).
stop(Pid, Error) ->
gen_server:cast(Pid, {stop, Error}).
init([Sock]) ->
{ok, #state{socket = Sock}}.
{ok, #state{socket = Sock}, 1000}.
handle_call({go, Sock}, _From, State=#state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
@ -103,14 +102,17 @@ handle_call(info, _From, #state{conn_name=ConnName,
handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) ->
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
stop({shutdown, duplicate_id}, State);
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info({route, Msg}, #state{socket = Sock, message_id=MsgId} = State) ->
handle_info(timeout, State) ->
stop({shutdown, timeout}, State);
handle_info({stop, duplicate_id}, State=#state{conn_name=ConnName, client_id=ClientId}) ->
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),
stop({shutdown, duplicate_id}, State);
handle_info({dispatch, Msg}, #state{socket = Sock, message_id=MsgId} = State) ->
#mqtt_msg{retain = Retain,
qos = Qos,
@ -157,7 +159,6 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State);
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
?ERROR("sock error: ~p~n", [Reason]),
{noreply, State};
handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
@ -171,24 +172,17 @@ handle_info(keep_alive_timeout, #state{keep_alive=KeepAlive}=State) ->
end;
handle_info(Info, State) ->
?ERROR("unext info :~p",[Info]),
?ERROR("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}.
terminate(_Reason, #state{client_id=ClientId, keep_alive=KeepAlive}) ->
ok = emqtt_registry:unregister(ClientId),
terminate(_Reason, #state{keep_alive=KeepAlive}) ->
emqtt_keep_alive:cancel(KeepAlive),
emqtt_cm:destroy(self()),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
{ok, Res} -> Res;
Res -> Res
end.
async_recv(Sock, Length, infinity) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, -1);
@ -261,7 +255,7 @@ process_request(?CONNECT,
{?CONNACK_CREDENTIALS, State};
true ->
?INFO("connect from clientid: ~p, ~p", [ClientId, AlivePeriod]),
ok = emqtt_registry:register(ClientId, self()),
emqtt_cm:create(ClientId, self()),
KeepAlive = emqtt_keep_alive:new(AlivePeriod*1500, keep_alive_timeout),
{?CONNACK_ACCEPT,
State #state{ will_msg = make_will_msg(Var),
@ -364,7 +358,6 @@ process_request(?UNSUBSCRIBE,
{ok, State};
process_request(?PINGREQ, #mqtt_frame{}, #state{socket=Sock, keep_alive=KeepAlive}=State) ->
%?INFO("PINGREQ...",[]),
%Keep alive timer
KeepAlive1 = emqtt_keep_alive:reset(KeepAlive),
send_frame(Sock, #mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}),
@ -400,7 +393,7 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
send_will_msg(#state{will_msg = undefined}) ->
ignore;
send_will_msg(#state{will_msg = WillMsg }) ->
emqtt_router:publish(WillMsg).
emqtt_pubsub:publish(WillMsg).
send_frame(Sock, Frame) ->
?INFO("send frame:~p", [Frame]),

View File

@ -66,13 +66,14 @@ lookup(ClientId) ->
-spec create(ClientId :: binary(), Pid :: pid()) -> ok.
create(ClientId, Pid) ->
case lookup(ClientId) of
Pid ->
ignore;
OldPid when is_pid(OldPid) ->
%%TODO: FIX STOP...
emqtt_client:stop(OldPid, duplicate_id);
OldPid ! {stop, duplicate_id},
ets:insert(emqtt_client, {ClientId, Pid});
undefined ->
ok
end,
ets:insert(emqtt_client, {ClientId, Pid}).
ets:insert(emqtt_client, {ClientId, Pid})
end.
-spec destroy(binary() | pid()) -> ok.
destroy(ClientId) when is_binary(ClientId) ->

View File

@ -22,6 +22,8 @@
-module(emqtt_http).
-author('feng@slimchat.io').
-include("emqtt.hrl").
-import(proplists, [get_value/2, get_value/3]).

View File

@ -19,8 +19,11 @@
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_keep_alive).
-author('feng@slimchat.io').
-export([new/2,
state/1,
activate/1,

View File

@ -1,16 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% Developer of the eMQTT Code is <ery.lee@gmail.com>
%% Copyright (c) 2012 Ery Lee. All rights reserved.
%%
-module(emqtt_lib).

View File

@ -22,6 +22,8 @@
-module(emqtt_monitor).
-author('feng@slimchat.io').
-include("emqtt_log.hrl").
-behavior(gen_server).

View File

@ -22,6 +22,8 @@
-module(emqtt_net).
-author('feng@slimchat.io').
-export([tcp_name/3, tcp_host/1, getaddr/2, port_to_listeners/1]).
-export([connection_string/2]).

View File

@ -22,6 +22,8 @@
-module(emqtt_pubsub).
-author('feng@slimchat.io').
-include("emqtt.hrl").
-include("emqtt_log.hrl").

View File

@ -22,6 +22,10 @@
-module(emqtt_retained).
-author('feng@slimchat.io').
%%TODO: FIXME Later...
%%
%% <<MQTT_V3.1_Protocol_Specific>>
@ -74,11 +78,10 @@ delete(Topic) ->
gen_server:cast(?MODULE, {delete, Topic}).
send(Topic, Client) ->
[Client ! {route, Msg} ||{_, Msg} <- lookup(Topic)].
[Client ! {dispatch, Msg} ||{_, Msg} <- lookup(Topic)].
init([]) ->
ets:new(retained_msg, [set, protected, named_table]),
?INFO("~p is started.", [?MODULE]),
{ok, #state{}}.
handle_call(Req, _From, State) ->

View File

@ -22,6 +22,8 @@
-module(emqtt_sup).
-author('feng@slimchat.io').
-include("emqtt.hrl").
-behaviour(supervisor).

View File

@ -19,8 +19,11 @@
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_topic).
-author('feng@slimchat.io').
-import(lists, [reverse/1]).
-import(string, [rchr/2, substr/2, substr/3]).