integrate with esockd...

This commit is contained in:
Feng Lee 2014-12-06 19:50:40 +08:00
parent 3a6ed9a876
commit d12c5d40b2
12 changed files with 40 additions and 1612 deletions

16
TODO Normal file
View File

@ -0,0 +1,16 @@
0.2.0
=====
1. project structure...
2. apps, deps, plugins,
3. rel...
4. mochiweb: http publish...
5 cluster...
6. support MQTT3.1.1...
7. python, java test code
????
{ok, {{simple_one_for_one_terminate, 0, 1},
[{client, {emqtt_client, start_link, []},
temporary, 5000, worker, [emqtt_client]}]}}.

View File

@ -20,14 +20,9 @@
emqtt_router,
emqtt_registry,
emqtt_sup,
file_handle_cache,
gen_server2,
priority_queue,
supervisor2,
tcp_acceptor,
tcp_acceptor_sup,
tcp_listener,
tcp_listener_sup
supervisor2
]},
{registered, [emqtt_auth,
emqtt_router,

View File

@ -1,41 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is eMQTT
%%
%% The Initial Developer of the Original Code is <ery.lee at gmail dot com>
%% Copyright (C) 2012 Ery Lee All Rights Reserved.
-module(emqtt_client_sup).
-export([start_link/0, start_client/1]).
-behaviour(supervisor2).
-export([init/1]).
start_link() ->
supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {{simple_one_for_one_terminate, 0, 1},
[{client, {emqtt_client, start_link, []},
temporary, 5000, worker, [emqtt_client]}]}}.
start_client(Sock) ->
{ok, Client} = supervisor:start_child(?MODULE, []),
ok = gen_tcp:controlling_process(Sock, Client),
emqtt_client:go(Client, Sock),
%% see comment in rabbit_networking:start_client/2
gen_event:which_handlers(error_logger),
Client.

View File

View File

@ -16,9 +16,19 @@
-include("emqtt.hrl").
-include_lib("elog/include/elog.hrl").
-define(MQTT_SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, false}
]).
-export([spec/2, listener_started/3, listener_stopped/3]).
-export([start/1]).
start(Listeners) ->
todo.
spec({Listener, SockOpts}, Callback) ->
[tcp_listener_spec(emqtt_tcp_listener_sup, Address, SockOpts,

File diff suppressed because it is too large Load Diff

View File

@ -1,89 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
-behaviour(gen_server).
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {callback, sock, ref}).
%%--------------------------------------------------------------------
start_link(Callback, LSock) ->
gen_server:start_link(?MODULE, {Callback, LSock}, []).
%%--------------------------------------------------------------------
init({Callback, LSock}) ->
gen_server:cast(self(), accept),
{ok, #state{callback=Callback, sock=LSock}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
ok = file_handle_cache:obtain(),
accept(State);
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({inet_async, LSock, Ref, {ok, Sock}},
State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
%% patch up the socket so it looks like one we got from
%% gen_tcp:accept/1
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
%% handle
file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
ok = file_handle_cache:obtain(),
%% accept more
accept(State);
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
{stop, {accept_failed, Reason}, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
end.

View File

@ -1,43 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
-behaviour(supervisor).
-export([start_link/2]).
-export([init/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
start_link(Name, Callback) ->
supervisor:start_link({local,Name}, ?MODULE, Callback).
init(Callback) ->
{ok, {{simple_one_for_one, 10, 10},
[{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
transient, brutal_kill, worker, [tcp_acceptor]}]}}.

View File

@ -1,115 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
-include("emqtt.hrl").
-include_lib("elog/include/elog.hrl").
-behaviour(gen_server).
-export([start_link/8]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {sock, on_startup, on_shutdown, label}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/8 ::
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
integer(), atom(), mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
OnStartup, OnShutdown, Label) ->
gen_server:start_link(
?MODULE, {IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
OnStartup, OnShutdown, Label}, []).
%%--------------------------------------------------------------------
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
?INFO("started ~s on ~s:~p~n",
[Label, ntoab(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
?ERROR("failed to start ~s on ~s:~p - ~p (~s)~n",
[Label, ntoab(IPAddress), Port,
Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
?ERROR("stopped ~s on ~s:~p~n",
[Label, ntoab(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
%% when IPv6 is enabled but not used (i.e. 99% of the time).
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
ntoa(IP) ->
inet_parse:ntoa(IP).
ntoab(IP) ->
Str = ntoa(IP),
case string:str(Str, ":") of
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.

View File

@ -1,79 +0,0 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
-behaviour(supervisor).
-export([start_link/7, start_link/8]).
-export([init/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/7 ::
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
mfargs(), mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
mfargs(), mfargs(), mfargs(), integer(), string()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, 1, Label).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount, Label) ->
supervisor:start_link(
?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount, Label}).
init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount, Label}) ->
%% This is gross. The tcp_listener needs to know about the
%% tcp_acceptor_sup, and the only way I can think of accomplishing
%% that without jumping through hoops is to register the
%% tcp_acceptor_sup.
Name = tcp_name(tcp_acceptor_sup, IPAddress, Port),
{ok, {{one_for_all, 10, 10},
[{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,
[Name, AcceptCallback]},
transient, infinity, supervisor, [tcp_acceptor_sup]},
{tcp_listener, {tcp_listener, start_link,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
OnStartup, OnShutdown, Label]},
transient, 16#ffffffff, worker, [tcp_listener]}]}}.
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
lists:flatten(
io_lib:format(
"~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))).

View File

@ -33,15 +33,16 @@
]},
{emqtt, [
{auth, {anonymous, []}}, %internal, anonymous
{listeners, [
{1883, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true}
]}
]}
{listen, [
{mqtt, 1883, [
{max_conns, 1024},
{acceptor_pool, 2}
]},
{http, 8883, [
{max_conns, 512},
{acceptor_pool, 1}
]}
]}
]}
].

View File

@ -14,7 +14,7 @@
+A 32
## Increase number of concurrent ports/sockets
##-env ERL_MAX_PORTS 4096
-env ERL_MAX_PORTS 4096
## Tweak GC to run more often
##-env ERL_FULLSWEEP_AFTER 10