add bin/ and some files

This commit is contained in:
erylee 2012-12-21 17:33:21 +08:00
parent e1e7d4e421
commit efc1d9f424
34 changed files with 5553 additions and 312 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
lib
ebin/*
ebin
*.log

15
LICENSE Normal file
View File

@ -0,0 +1,15 @@
The emqtt server is licensed under the MPL.
The files below copied from rabbitmq and licensed under LICENSE-MPL-RabbitMQ:
credit_flow.erl
file_handle_cache.erl
gen_server2.erl
priority_queue.erl
supervisor2.erl
tcp_acceptor.erl
tcp_acceptor_sup.erl
tcp_listener.erl
tcp_listener_sup.erl
If you have any questions regarding licensing, please contact ery.lee@gmail.com

455
LICENSE-MPL-RabbitMQ Normal file
View File

@ -0,0 +1,455 @@
MOZILLA PUBLIC LICENSE
Version 1.1
---------------
1. Definitions.
1.0.1. "Commercial Use" means distribution or otherwise making the
Covered Code available to a third party.
1.1. "Contributor" means each entity that creates or contributes to
the creation of Modifications.
1.2. "Contributor Version" means the combination of the Original
Code, prior Modifications used by a Contributor, and the Modifications
made by that particular Contributor.
1.3. "Covered Code" means the Original Code or Modifications or the
combination of the Original Code and Modifications, in each case
including portions thereof.
1.4. "Electronic Distribution Mechanism" means a mechanism generally
accepted in the software development community for the electronic
transfer of data.
1.5. "Executable" means Covered Code in any form other than Source
Code.
1.6. "Initial Developer" means the individual or entity identified
as the Initial Developer in the Source Code notice required by Exhibit
A.
1.7. "Larger Work" means a work which combines Covered Code or
portions thereof with code not governed by the terms of this License.
1.8. "License" means this document.
1.8.1. "Licensable" means having the right to grant, to the maximum
extent possible, whether at the time of the initial grant or
subsequently acquired, any and all of the rights conveyed herein.
1.9. "Modifications" means any addition to or deletion from the
substance or structure of either the Original Code or any previous
Modifications. When Covered Code is released as a series of files, a
Modification is:
A. Any addition to or deletion from the contents of a file
containing Original Code or previous Modifications.
B. Any new file that contains any part of the Original Code or
previous Modifications.
1.10. "Original Code" means Source Code of computer software code
which is described in the Source Code notice required by Exhibit A as
Original Code, and which, at the time of its release under this
License is not already Covered Code governed by this License.
1.10.1. "Patent Claims" means any patent claim(s), now owned or
hereafter acquired, including without limitation, method, process,
and apparatus claims, in any patent Licensable by grantor.
1.11. "Source Code" means the preferred form of the Covered Code for
making modifications to it, including all modules it contains, plus
any associated interface definition files, scripts used to control
compilation and installation of an Executable, or source code
differential comparisons against either the Original Code or another
well known, available Covered Code of the Contributor's choice. The
Source Code can be in a compressed or archival form, provided the
appropriate decompression or de-archiving software is widely available
for no charge.
1.12. "You" (or "Your") means an individual or a legal entity
exercising rights under, and complying with all of the terms of, this
License or a future version of this License issued under Section 6.1.
For legal entities, "You" includes any entity which controls, is
controlled by, or is under common control with You. For purposes of
this definition, "control" means (a) the power, direct or indirect,
to cause the direction or management of such entity, whether by
contract or otherwise, or (b) ownership of more than fifty percent
(50%) of the outstanding shares or beneficial ownership of such
entity.
2. Source Code License.
2.1. The Initial Developer Grant.
The Initial Developer hereby grants You a world-wide, royalty-free,
non-exclusive license, subject to third party intellectual property
claims:
(a) under intellectual property rights (other than patent or
trademark) Licensable by Initial Developer to use, reproduce,
modify, display, perform, sublicense and distribute the Original
Code (or portions thereof) with or without Modifications, and/or
as part of a Larger Work; and
(b) under Patents Claims infringed by the making, using or
selling of Original Code, to make, have made, use, practice,
sell, and offer for sale, and/or otherwise dispose of the
Original Code (or portions thereof).
(c) the licenses granted in this Section 2.1(a) and (b) are
effective on the date Initial Developer first distributes
Original Code under the terms of this License.
(d) Notwithstanding Section 2.1(b) above, no patent license is
granted: 1) for code that You delete from the Original Code; 2)
separate from the Original Code; or 3) for infringements caused
by: i) the modification of the Original Code or ii) the
combination of the Original Code with other software or devices.
2.2. Contributor Grant.
Subject to third party intellectual property claims, each Contributor
hereby grants You a world-wide, royalty-free, non-exclusive license
(a) under intellectual property rights (other than patent or
trademark) Licensable by Contributor, to use, reproduce, modify,
display, perform, sublicense and distribute the Modifications
created by such Contributor (or portions thereof) either on an
unmodified basis, with other Modifications, as Covered Code
and/or as part of a Larger Work; and
(b) under Patent Claims infringed by the making, using, or
selling of Modifications made by that Contributor either alone
and/or in combination with its Contributor Version (or portions
of such combination), to make, use, sell, offer for sale, have
made, and/or otherwise dispose of: 1) Modifications made by that
Contributor (or portions thereof); and 2) the combination of
Modifications made by that Contributor with its Contributor
Version (or portions of such combination).
(c) the licenses granted in Sections 2.2(a) and 2.2(b) are
effective on the date Contributor first makes Commercial Use of
the Covered Code.
(d) Notwithstanding Section 2.2(b) above, no patent license is
granted: 1) for any code that Contributor has deleted from the
Contributor Version; 2) separate from the Contributor Version;
3) for infringements caused by: i) third party modifications of
Contributor Version or ii) the combination of Modifications made
by that Contributor with other software (except as part of the
Contributor Version) or other devices; or 4) under Patent Claims
infringed by Covered Code in the absence of Modifications made by
that Contributor.
3. Distribution Obligations.
3.1. Application of License.
The Modifications which You create or to which You contribute are
governed by the terms of this License, including without limitation
Section 2.2. The Source Code version of Covered Code may be
distributed only under the terms of this License or a future version
of this License released under Section 6.1, and You must include a
copy of this License with every copy of the Source Code You
distribute. You may not offer or impose any terms on any Source Code
version that alters or restricts the applicable version of this
License or the recipients' rights hereunder. However, You may include
an additional document offering the additional rights described in
Section 3.5.
3.2. Availability of Source Code.
Any Modification which You create or to which You contribute must be
made available in Source Code form under the terms of this License
either on the same media as an Executable version or via an accepted
Electronic Distribution Mechanism to anyone to whom you made an
Executable version available; and if made available via Electronic
Distribution Mechanism, must remain available for at least twelve (12)
months after the date it initially became available, or at least six
(6) months after a subsequent version of that particular Modification
has been made available to such recipients. You are responsible for
ensuring that the Source Code version remains available even if the
Electronic Distribution Mechanism is maintained by a third party.
3.3. Description of Modifications.
You must cause all Covered Code to which You contribute to contain a
file documenting the changes You made to create that Covered Code and
the date of any change. You must include a prominent statement that
the Modification is derived, directly or indirectly, from Original
Code provided by the Initial Developer and including the name of the
Initial Developer in (a) the Source Code, and (b) in any notice in an
Executable version or related documentation in which You describe the
origin or ownership of the Covered Code.
3.4. Intellectual Property Matters
(a) Third Party Claims.
If Contributor has knowledge that a license under a third party's
intellectual property rights is required to exercise the rights
granted by such Contributor under Sections 2.1 or 2.2,
Contributor must include a text file with the Source Code
distribution titled "LEGAL" which describes the claim and the
party making the claim in sufficient detail that a recipient will
know whom to contact. If Contributor obtains such knowledge after
the Modification is made available as described in Section 3.2,
Contributor shall promptly modify the LEGAL file in all copies
Contributor makes available thereafter and shall take other steps
(such as notifying appropriate mailing lists or newsgroups)
reasonably calculated to inform those who received the Covered
Code that new knowledge has been obtained.
(b) Contributor APIs.
If Contributor's Modifications include an application programming
interface and Contributor has knowledge of patent licenses which
are reasonably necessary to implement that API, Contributor must
also include this information in the LEGAL file.
(c) Representations.
Contributor represents that, except as disclosed pursuant to
Section 3.4(a) above, Contributor believes that Contributor's
Modifications are Contributor's original creation(s) and/or
Contributor has sufficient rights to grant the rights conveyed by
this License.
3.5. Required Notices.
You must duplicate the notice in Exhibit A in each file of the Source
Code. If it is not possible to put such notice in a particular Source
Code file due to its structure, then You must include such notice in a
location (such as a relevant directory) where a user would be likely
to look for such a notice. If You created one or more Modification(s)
You may add your name as a Contributor to the notice described in
Exhibit A. You must also duplicate this License in any documentation
for the Source Code where You describe recipients' rights or ownership
rights relating to Covered Code. You may choose to offer, and to
charge a fee for, warranty, support, indemnity or liability
obligations to one or more recipients of Covered Code. However, You
may do so only on Your own behalf, and not on behalf of the Initial
Developer or any Contributor. You must make it absolutely clear than
any such warranty, support, indemnity or liability obligation is
offered by You alone, and You hereby agree to indemnify the Initial
Developer and every Contributor for any liability incurred by the
Initial Developer or such Contributor as a result of warranty,
support, indemnity or liability terms You offer.
3.6. Distribution of Executable Versions.
You may distribute Covered Code in Executable form only if the
requirements of Section 3.1-3.5 have been met for that Covered Code,
and if You include a notice stating that the Source Code version of
the Covered Code is available under the terms of this License,
including a description of how and where You have fulfilled the
obligations of Section 3.2. The notice must be conspicuously included
in any notice in an Executable version, related documentation or
collateral in which You describe recipients' rights relating to the
Covered Code. You may distribute the Executable version of Covered
Code or ownership rights under a license of Your choice, which may
contain terms different from this License, provided that You are in
compliance with the terms of this License and that the license for the
Executable version does not attempt to limit or alter the recipient's
rights in the Source Code version from the rights set forth in this
License. If You distribute the Executable version under a different
license You must make it absolutely clear that any terms which differ
from this License are offered by You alone, not by the Initial
Developer or any Contributor. You hereby agree to indemnify the
Initial Developer and every Contributor for any liability incurred by
the Initial Developer or such Contributor as a result of any such
terms You offer.
3.7. Larger Works.
You may create a Larger Work by combining Covered Code with other code
not governed by the terms of this License and distribute the Larger
Work as a single product. In such a case, You must make sure the
requirements of this License are fulfilled for the Covered Code.
4. Inability to Comply Due to Statute or Regulation.
If it is impossible for You to comply with any of the terms of this
License with respect to some or all of the Covered Code due to
statute, judicial order, or regulation then You must: (a) comply with
the terms of this License to the maximum extent possible; and (b)
describe the limitations and the code they affect. Such description
must be included in the LEGAL file described in Section 3.4 and must
be included with all distributions of the Source Code. Except to the
extent prohibited by statute or regulation, such description must be
sufficiently detailed for a recipient of ordinary skill to be able to
understand it.
5. Application of this License.
This License applies to code to which the Initial Developer has
attached the notice in Exhibit A and to related Covered Code.
6. Versions of the License.
6.1. New Versions.
Netscape Communications Corporation ("Netscape") may publish revised
and/or new versions of the License from time to time. Each version
will be given a distinguishing version number.
6.2. Effect of New Versions.
Once Covered Code has been published under a particular version of the
License, You may always continue to use it under the terms of that
version. You may also choose to use such Covered Code under the terms
of any subsequent version of the License published by Netscape. No one
other than Netscape has the right to modify the terms applicable to
Covered Code created under this License.
6.3. Derivative Works.
If You create or use a modified version of this License (which you may
only do in order to apply it to code which is not already Covered Code
governed by this License), You must (a) rename Your license so that
the phrases "Mozilla", "MOZILLAPL", "MOZPL", "Netscape",
"MPL", "NPL" or any confusingly similar phrase do not appear in your
license (except to note that your license differs from this License)
and (b) otherwise make it clear that Your version of the license
contains terms which differ from the Mozilla Public License and
Netscape Public License. (Filling in the name of the Initial
Developer, Original Code or Contributor in the notice described in
Exhibit A shall not of themselves be deemed to be modifications of
this License.)
7. DISCLAIMER OF WARRANTY.
COVERED CODE IS PROVIDED UNDER THIS LICENSE ON AN "AS IS" BASIS,
WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING,
WITHOUT LIMITATION, WARRANTIES THAT THE COVERED CODE IS FREE OF
DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR NON-INFRINGING.
THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE COVERED CODE
IS WITH YOU. SHOULD ANY COVERED CODE PROVE DEFECTIVE IN ANY RESPECT,
YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER CONTRIBUTOR) ASSUME THE
COST OF ANY NECESSARY SERVICING, REPAIR OR CORRECTION. THIS DISCLAIMER
OF WARRANTY CONSTITUTES AN ESSENTIAL PART OF THIS LICENSE. NO USE OF
ANY COVERED CODE IS AUTHORIZED HEREUNDER EXCEPT UNDER THIS DISCLAIMER.
8. TERMINATION.
8.1. This License and the rights granted hereunder will terminate
automatically if You fail to comply with terms herein and fail to cure
such breach within 30 days of becoming aware of the breach. All
sublicenses to the Covered Code which are properly granted shall
survive any termination of this License. Provisions which, by their
nature, must remain in effect beyond the termination of this License
shall survive.
8.2. If You initiate litigation by asserting a patent infringement
claim (excluding declatory judgment actions) against Initial Developer
or a Contributor (the Initial Developer or Contributor against whom
You file such action is referred to as "Participant") alleging that:
(a) such Participant's Contributor Version directly or indirectly
infringes any patent, then any and all rights granted by such
Participant to You under Sections 2.1 and/or 2.2 of this License
shall, upon 60 days notice from Participant terminate prospectively,
unless if within 60 days after receipt of notice You either: (i)
agree in writing to pay Participant a mutually agreeable reasonable
royalty for Your past and future use of Modifications made by such
Participant, or (ii) withdraw Your litigation claim with respect to
the Contributor Version against such Participant. If within 60 days
of notice, a reasonable royalty and payment arrangement are not
mutually agreed upon in writing by the parties or the litigation claim
is not withdrawn, the rights granted by Participant to You under
Sections 2.1 and/or 2.2 automatically terminate at the expiration of
the 60 day notice period specified above.
(b) any software, hardware, or device, other than such Participant's
Contributor Version, directly or indirectly infringes any patent, then
any rights granted to You by such Participant under Sections 2.1(b)
and 2.2(b) are revoked effective as of the date You first made, used,
sold, distributed, or had made, Modifications made by that
Participant.
8.3. If You assert a patent infringement claim against Participant
alleging that such Participant's Contributor Version directly or
indirectly infringes any patent where such claim is resolved (such as
by license or settlement) prior to the initiation of patent
infringement litigation, then the reasonable value of the licenses
granted by such Participant under Sections 2.1 or 2.2 shall be taken
into account in determining the amount or value of any payment or
license.
8.4. In the event of termination under Sections 8.1 or 8.2 above,
all end user license agreements (excluding distributors and resellers)
which have been validly granted by You or any distributor hereunder
prior to termination shall survive termination.
9. LIMITATION OF LIABILITY.
UNDER NO CIRCUMSTANCES AND UNDER NO LEGAL THEORY, WHETHER TORT
(INCLUDING NEGLIGENCE), CONTRACT, OR OTHERWISE, SHALL YOU, THE INITIAL
DEVELOPER, ANY OTHER CONTRIBUTOR, OR ANY DISTRIBUTOR OF COVERED CODE,
OR ANY SUPPLIER OF ANY OF SUCH PARTIES, BE LIABLE TO ANY PERSON FOR
ANY INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES OF ANY
CHARACTER INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF GOODWILL,
WORK STOPPAGE, COMPUTER FAILURE OR MALFUNCTION, OR ANY AND ALL OTHER
COMMERCIAL DAMAGES OR LOSSES, EVEN IF SUCH PARTY SHALL HAVE BEEN
INFORMED OF THE POSSIBILITY OF SUCH DAMAGES. THIS LIMITATION OF
LIABILITY SHALL NOT APPLY TO LIABILITY FOR DEATH OR PERSONAL INJURY
RESULTING FROM SUCH PARTY'S NEGLIGENCE TO THE EXTENT APPLICABLE LAW
PROHIBITS SUCH LIMITATION. SOME JURISDICTIONS DO NOT ALLOW THE
EXCLUSION OR LIMITATION OF INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO
THIS EXCLUSION AND LIMITATION MAY NOT APPLY TO YOU.
10. U.S. GOVERNMENT END USERS.
The Covered Code is a "commercial item," as that term is defined in
48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer
software" and "commercial computer software documentation," as such
terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent with 48
C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4 (June 1995),
all U.S. Government End Users acquire Covered Code with only those
rights set forth herein.
11. MISCELLANEOUS.
This License represents the complete agreement concerning subject
matter hereof. If any provision of this License is held to be
unenforceable, such provision shall be reformed only to the extent
necessary to make it enforceable. This License shall be governed by
California law provisions (except to the extent applicable law, if
any, provides otherwise), excluding its conflict-of-law provisions.
With respect to disputes in which at least one party is a citizen of,
or an entity chartered or registered to do business in the United
States of America, any litigation relating to this License shall be
subject to the jurisdiction of the Federal Courts of the Northern
District of California, with venue lying in Santa Clara County,
California, with the losing party responsible for costs, including
without limitation, court costs and reasonable attorneys' fees and
expenses. The application of the United Nations Convention on
Contracts for the International Sale of Goods is expressly excluded.
Any law or regulation which provides that the language of a contract
shall be construed against the drafter shall not apply to this
License.
12. RESPONSIBILITY FOR CLAIMS.
As between Initial Developer and the Contributors, each party is
responsible for claims and damages arising, directly or indirectly,
out of its utilization of rights under this License and You agree to
work with Initial Developer and Contributors to distribute such
responsibility on an equitable basis. Nothing herein is intended or
shall be deemed to constitute any admission of liability.
13. MULTIPLE-LICENSED CODE.
Initial Developer may designate portions of the Covered Code as
"Multiple-Licensed". "Multiple-Licensed" means that the Initial
Developer permits you to utilize portions of the Covered Code under
Your choice of the NPL or the alternative licenses, if any, specified
by the Initial Developer in the file described in Exhibit A.
EXHIBIT A -Mozilla Public License.
``The contents of this file are subject to the Mozilla Public License
Version 1.1 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.mozilla.org/MPL/
Software distributed under the License is distributed on an "AS IS"
basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
License for the specific language governing rights and limitations
under the License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
use the text of this Exhibit A rather than the text found in the
Original Code Source Code for Your Modifications.]

View File

@ -1,10 +1,10 @@
all: compile
run: compile
erl -pa ebin -pa lib/rabbitlib/ebin -config etc/emqtt.config -s emqtt_app start
compile: deps
rebar compile
./rebar compile
deps:
rebar get-deps
./rebar get-deps
clean:
./rebar clean

View File

@ -1,4 +1,32 @@
emqtt
=====
erlang mqtt broker
erlang mqtt broker based on rabbitmq network layer.
compile
=======
require erlang R15B+ and git client
make
start
======
./bin/emqtt console
./bin/emqtt start
stop
====
./bin/emqtt stop
status
======
./bin/emqtt_ctl status
logs
====
log/*

4
TODO
View File

@ -1,2 +1,2 @@
1. Topic Trie
2. MQTT frame parse
OK 1. Topic Trie
OK 2. MQTT frame parse

179
bin/emqtt Executable file
View File

@ -0,0 +1,179 @@
#!/bin/bash
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et
RUNNER_SCRIPT_DIR=$(cd ${0%/*} && pwd)
RUNNER_BASE_DIR=${RUNNER_SCRIPT_DIR%/*}
RUNNER_ETC_DIR=$RUNNER_BASE_DIR/etc
RUNNER_LOG_DIR=$RUNNER_BASE_DIR/log
# Note the trailing slash on $PIPE_DIR/
PIPE_DIR=/tmp/$RUNNER_BASE_DIR/
RUNNER_USER=
# Make sure this script is running as the appropriate user
if [ ! -z "$RUNNER_USER" ] && [ `whoami` != "$RUNNER_USER" ]; then
exec sudo -u $RUNNER_USER -i $0 $@
fi
# Make sure CWD is set to runner base dir
cd $RUNNER_BASE_DIR
# Make sure log directory exists
mkdir -p $RUNNER_LOG_DIR
# Identify the script name
SCRIPT=`basename $0`
# Parse out release and erts info
ERLANG_BASE_DIR=/usr/local/lib/erlang
START_ERL=`cat $ERLANG_BASE_DIR/releases/start_erl.data`
ERTS_VSN=${START_ERL% *}
APP_VSN=${START_ERL#* }
VMARGS_PATH="$RUNNER_ETC_DIR/emqtt.args"
CONFIG_PATH="$RUNNER_ETC_DIR/emqtt.config"
# Extract the target node name from node.args
NAME_ARG=`egrep '^-s?name' $VMARGS_PATH`
if [ -z "$NAME_ARG" ]; then
echo "emqtt.args needs to have either -name or -sname parameter."
exit 1
fi
# Extract the target cookie
COOKIE_ARG=`grep '^-setcookie' $VMARGS_PATH`
if [ -z "$COOKIE_ARG" ]; then
echo "emqtt.args needs to have a -setcookie parameter."
exit 1
fi
# Add ERTS bin dir to our path
ERTS_PATH=$ERLANG_BASE_DIR/erts-$ERTS_VSN/bin
# Setup command to control the node
NODETOOL="escript $RUNNER_BASE_DIR/bin/nodetool $NAME_ARG $COOKIE_ARG"
# Check the first argument for instructions
case "$1" in
start)
# Make sure there is not already a node running
RES=`$NODETOOL ping`
if [ "$RES" = "pong" ]; then
echo "Node is already running!"
exit 1
fi
HEART_COMMAND="$RUNNER_BASE_DIR/bin/$SCRIPT start"
export HEART_COMMAND
mkdir -p $PIPE_DIR
shift # remove $1
$ERTS_PATH/run_erl -daemon $PIPE_DIR $RUNNER_LOG_DIR "exec $RUNNER_BASE_DIR/bin/$SCRIPT console $@" 2>&1
;;
stop)
# Wait for the node to completely stop...
case `uname -s` in
Linux|Darwin|FreeBSD|DragonFly|NetBSD|OpenBSD)
# PID COMMAND
PID=`ps ax -o pid= -o command=|\
grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'`
;;
SunOS)
# PID COMMAND
PID=`ps -ef -o pid= -o args=|\
grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'`
;;
CYGWIN*)
# UID PID PPID TTY STIME COMMAND
PID=`ps -efW|grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $2}'`
;;
esac
$NODETOOL stop
ES=$?
if [ "$ES" -ne 0 ]; then
exit $ES
fi
while `kill -0 $PID 2>/dev/null`;
do
sleep 1
done
;;
restart)
## Restart the VM without exiting the process
$NODETOOL restart
ES=$?
if [ "$ES" -ne 0 ]; then
exit $ES
fi
;;
reboot)
## Restart the VM completely (uses heart to restart it)
$NODETOOL reboot
ES=$?
if [ "$ES" -ne 0 ]; then
exit $ES
fi
;;
ping)
## See if the VM is alive
$NODETOOL ping
ES=$?
if [ "$ES" -ne 0 ]; then
exit $ES
fi
;;
attach)
# Make sure a node IS running
RES=`$NODETOOL ping`
ES=$?
if [ "$ES" -ne 0 ]; then
echo "Node is not running!"
exit $ES
fi
shift
exec $ERTS_PATH/to_erl $PIPE_DIR
;;
console)
# .boot file typically just $SCRIPT (ie, the app name)
# however, for debugging, sometimes start_clean.boot is useful:
#case "$1" in
# console) BOOTFILE=$SCRIPT ;;
# console_clean) BOOTFILE=start_clean ;;
#esac
# Setup beam-required vars
ERL_LIBS=$RUNNER_BASE_DIR/lib
ROOTDIR=$RUNNER_BASE_DIR
BINDIR=$ERTS_PATH
EMU=beam
PROGNAME=`echo $0 | sed 's/.*\\///'`
CMD="$ERTS_PATH/erl -pa $RUNNER_BASE_DIR/ebin -config $CONFIG_PATH -args_file $VMARGS_PATH -- ${1+"$@"}"
export ERL_LIBS
export EMU
export ROOTDIR
export BINDIR
export PROGNAME
# Dump environment info for logging purposes
echo "Exec: $CMD"
echo "Root: $ROOTDIR"
# Log the startup
logger -t "$SCRIPT[$$]" "Starting up"
# Start the VM
exec $CMD
;;
*)
echo "Usage: $SCRIPT {start|stop|restart|reboot|ping|console||attach}"
exit 1
;;
esac
exit 0

123
bin/emqtt_ctl Executable file
View File

@ -0,0 +1,123 @@
#!/bin/bash
RUNNER_SCRIPT_DIR=$(cd ${0%/*} && pwd)
RUNNER_BASE_DIR=${RUNNER_SCRIPT_DIR%/*}
RUNNER_ETC_DIR=$RUNNER_BASE_DIR/etc
RUNNER_BIN_DIR=$RUNNER_BASE_DIR/bin
RUNNER_LOG_DIR=$RUNNER_BASE_DIR/log
RUNNER_EBIN_DIR=$RUNNER_BASE_DIR/ebin
RUNNER_USER=
# Make sure CWD is set to runner base dir
cd $RUNNER_BASE_DIR
# Extract the target node name from node.args
NAME_ARG=`grep '\-[s]*name' $RUNNER_ETC_DIR/emqtt.args`
if [ -z "$NAME_ARG" ]; then
echo "emqtt.args needs to have either -name or -sname parameter."
exit 1
fi
# Learn how to specify node name for connection from remote nodes
echo "$NAME_ARG" | grep '^-sname' > /dev/null 2>&1
if [ "X$?" = "X0" ]; then
NAME_PARAM="-sname"
NAME_HOST=""
else
NAME_PARAM="-name"
echo "$NAME_ARG" | grep '@.*' > /dev/null 2>&1
if [ "X$?" = "X0" ]; then
NAME_HOST=`echo "${NAME_ARG}" | sed -e 's/.*\(@.*\)$/\1/'`
else
NAME_HOST=""
fi
fi
# Extract the target cookie
COOKIE_ARG=`grep '\-setcookie' $RUNNER_ETC_DIR/emqtt.args`
if [ -z "$COOKIE_ARG" ]; then
echo "emqtt.args needs to have a -setcookie parameter."
exit 1
fi
# Identify the script name
SCRIPT=`basename $0`
# Parse out release and erts info
ERLANG_BASE_DIR=/usr/local/lib/erlang
START_ERL=`cat $ERLANG_BASE_DIR/releases/start_erl.data`
ERTS_VSN=${START_ERL% *}
APP_VSN=${START_ERL#* }
# Add ERTS bin dir to our path
ERTS_PATH=$ERLANG_BASE_DIR/erts-$ERTS_VSN/bin
# Setup command to control the node
NODETOOL="$ERTS_PATH/escript $RUNNER_BIN_DIR/nodetool $NAME_ARG $COOKIE_ARG"
# Check the first argument for instructions
case "$1" in
status)
if [ $# -ne 1 ]; then
echo "Usage: $SCRIPT status"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "Node is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl status $@
;;
cluster_info)
if [ $# -ne 1 ]; then
echo "Usage: $SCRIPT cluster_info"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "Node is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl cluster_info $@
;;
cluster)
if [ $# -ne 2 ]; then
echo "Usage: $SCRIPT cluster <Node>"
exit 1
fi
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqtt is not running!"
exit 1
fi
shift
$NODETOOL rpc emqtt_ctl cluster $@
;;
*)
echo "Usage: $SCRIPT"
echo " status #query emqtt status"
echo " cluster_info #query cluster nodes"
echo " cluster <Node> #cluster node"
exit 1
;;
esac

138
bin/nodetool Executable file
View File

@ -0,0 +1,138 @@
%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ft=erlang ts=4 sw=4 et
%% -------------------------------------------------------------------
%%
%% nodetool: Helper Script for interacting with live nodes
%%
%% -------------------------------------------------------------------
main(Args) ->
ok = start_epmd(),
%% Extract the args
{RestArgs, TargetNode} = process_args(Args, [], undefined),
%% See if the node is currently running -- if it's not, we'll bail
case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
{true, pong} ->
ok;
{_, pang} ->
io:format("Node ~p not responding to pings.\n", [TargetNode]),
halt(1)
end,
case RestArgs of
["ping"] ->
%% If we got this far, the node already responsed to a ping, so just dump
%% a "pong"
io:format("pong\n");
["stop"] ->
io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]);
["restart"] ->
io:format("~p\n", [rpc:call(TargetNode, init, restart, [], 60000)]);
["reboot"] ->
io:format("~p\n", [rpc:call(TargetNode, init, reboot, [], 60000)]);
["rpc", Module, Function | RpcArgs] ->
case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
RpcArgs, 60000) of
ok ->
ok;
{badrpc, Reason} ->
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
halt(1);
_ ->
halt(1)
end;
["rpcterms", Module, Function, ArgsAsString] ->
case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
consult(ArgsAsString), 60000) of
{badrpc, Reason} ->
io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
halt(1);
Other ->
io:format("~p\n", [Other])
end;
Other ->
io:format("Other: ~p\n", [Other]),
io:format("Usage: nodetool {ping|stop|restart|reboot}\n")
end,
net_kernel:stop().
process_args([], Acc, TargetNode) ->
{lists:reverse(Acc), TargetNode};
process_args(["-setcookie", Cookie | Rest], Acc, TargetNode) ->
erlang:set_cookie(node(), list_to_atom(Cookie)),
process_args(Rest, Acc, TargetNode);
process_args(["-name", TargetName | Rest], Acc, _) ->
ThisNode = append_node_suffix(TargetName, "_ctrl_"),
{ok, _} = net_kernel:start([ThisNode, longnames]),
process_args(Rest, Acc, nodename(TargetName));
process_args(["-sname", TargetName | Rest], Acc, _) ->
ThisNode = append_node_suffix(TargetName, "_ctrl_"),
{ok, _} = net_kernel:start([ThisNode, shortnames]),
process_args(Rest, Acc, nodename(TargetName));
process_args([Arg | Rest], Acc, Opts) ->
process_args(Rest, [Arg | Acc], Opts).
start_epmd() ->
[] = os:cmd(epmd_path() ++ " -daemon"),
ok.
epmd_path() ->
ErtsBinDir = filename:dirname(escript:script_name()),
Name = "epmd",
case os:find_executable(Name, ErtsBinDir) of
false ->
case os:find_executable(Name) of
false ->
io:format("Could not find epmd.~n"),
halt(1);
GlobalEpmd ->
GlobalEpmd
end;
Epmd ->
Epmd
end.
nodename(Name) ->
case string:tokens(Name, "@") of
[_Node, _Host] ->
list_to_atom(Name);
[Node] ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
list_to_atom(lists:concat([Node, "@", Host]))
end.
append_node_suffix(Name, Suffix) ->
case string:tokens(Name, "@") of
[Node, Host] ->
list_to_atom(lists:concat([Node, Suffix, os:getpid(), "@", Host]));
[Node] ->
list_to_atom(lists:concat([Node, Suffix, os:getpid()]))
end.
%%
%% Given a string or binary, parse it into a list of terms, ala file:consult/0
%%
consult(Str) when is_list(Str) ->
consult([], Str, []);
consult(Bin) when is_binary(Bin)->
consult([], binary_to_list(Bin), []).
consult(Cont, Str, Acc) ->
case erl_scan:tokens(Cont, Str, 0) of
{done, Result, Remaining} ->
case Result of
{ok, Tokens, _} ->
{ok, Term} = erl_parse:parse_term(Tokens),
consult([], Remaining, [Term | Acc]);
{eof, _Other} ->
lists:reverse(Acc);
{error, Info, _} ->
{error, Info}
end;
{more, Cont1} ->
consult(Cont1, eof, Acc)
end.

28
etc/emqtt.args Normal file
View File

@ -0,0 +1,28 @@
## Name of the node
-sname emqtt
## Cookie for distributed erlang
-setcookie emqttsecret
#-boot start_sasl
-s emqtt start
## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
## (Disabled by default..use with caution!)
##-heart
-smp auto
## Enable kernel poll and a few async threads
+K true
+A 64
+P 10000
## Increase number of concurrent ports/sockets
-env ERL_MAX_PORTS 4096
-env ERL_MAX_ETS_TABLES 1400
## Tweak GC to run more often
-env ERL_FULLSWEEP_AFTER 10

View File

@ -10,16 +10,27 @@
{mnesia, [
{dir, "var/mnesia"}
]},
{lager, [
{error_logger_redirect, false},
{crash_log, "log/emqtt_crash.log"},
{handlers, [
{lager_console_backend, info},
{lager_file_backend, [
{"log/emqtt_error.log", error, 10485760, "$D0", 5},
{"log/emqtt_info.log", info, 10485760, "$D0", 5}
]}
]}
]},
{emqtt, [
{tcp_listeners, [1883]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true},
{linger, {true, 0}},
{exit_on_close, false}]}
{listeners, [
{1883, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true}
]}
]}
]}
].

View File

@ -1,6 +1,25 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%%
%% The Initial Developer of the Original Code is ery.lee@gmail.com
%% Copyright (c) 2012 Ery Lee. All rights reserved.
%%
%% ---------------------------------
%% banner
%% ---------------------------------
-define(COPYRIGHT, "Copyright (C) 2012 Ery Lee.").
-define(COPYRIGHT, "Copyright (C) 2007-2012 VMware, Inc.").
-define(LICENSE_MESSAGE, "Licensed under the MPL.").
-define(PROTOCOL_VERSION, "MQTT/3.1").
-define(ERTS_MINIMUM, "5.6.3").
@ -9,3 +28,66 @@
-record(subscriber, {topic, pid}).
%% ---------------------------------
%% Logging mechanism
-define(PRINT(Format, Args),
io:format(Format, Args)).
-define(PRINT_MSG(Msg),
io:format(Msg)).
-define(DEBUG(Format, Args),
lager:debug(Format, Args)).
-define(DEBUG_TRACE(Dest, Format, Args),
lager:debug(Dest, Format, Args)).
-define(DEBUG_MSG(Msg),
lager:debug(Msg)).
-define(INFO(Format, Args),
lager:info(Format, Args)).
-define(INFO_TRACE(Dest, Format, Args),
lager:info(Dest, Format, Args)).
-define(INFO_MSG(Msg),
lager:info(Msg)).
-define(WARN(Format, Args),
lager:warning(Format, Args)).
-define(WARN_TRACE(Dest, Format, Args),
lager:warning(Dest, Format, Args)).
-define(WARN_MSG(Msg),
lager:warning(Msg)).
-define(WARNING(Format, Args),
lager:warning(Format, Args)).
-define(WARNING_TRACE(Dest, Format, Args),
lager:warning(Dest, Format, Args)).
-define(WARNING_MSG(Msg),
lager:warning(Msg)).
-define(ERROR(Format, Args),
lager:error(Format, Args)).
-define(ERROR_TRACE(Dest, Format, Args),
lager:error(Dest, Format, Args)).
-define(ERROR_MSG(Msg),
lager:error(Msg)).
-define(CRITICAL(Format, Args),
lager:critical(Format, Args)).
-define(CRITICAL_TRACE(Dest, Format, Args),
lager:critical(Dest, Format, Args)).
-define(CRITICAL_MSG(Msg),
lager:critical(Msg)).

View File

@ -1,8 +1,11 @@
{erl_opts, [debug_info, {parse_transform, lager_transform}]}.
{erl_opts, [{i, "include"}]}.
{lib_dirs,["lib"]}.
{deps_dir, ["lib"]}.
{deps, [
{'rabbitlib', ".*", {git, "git://github.com/emqtt/rabbitlib.git", {branch, "master"}}}
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}
]}.

3
run
View File

@ -1,3 +0,0 @@
#!/bin/bash
erl -pa ebin -pa lib/rabbitlib/ebin -config etc/emqtt.config -s emqtt_app start

BIN
src/.emqtt_listener.erl.swp Normal file

Binary file not shown.

11
src/emqtt.erl Normal file
View File

@ -0,0 +1,11 @@
-module(emqtt).
-export([start/0]).
start() ->
ok = application:start(sasl),
mnesia:create_schema([node()]),
mnesia:start(),
lager:start(),
ok = application:start(emqtt).

View File

@ -1,32 +1,20 @@
-module(emqtt_app).
-export([start/0]).
-include("emqtt.hrl").
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
-define(APPS, [sasl, mnesia, emqtt]).
start() ->
[start_app(App) || App <- ?APPS].
start_app(mnesia) ->
mnesia:create_schema([node()]),
mnesia:start();
start_app(App) ->
application:start(App).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
{ok, Sup} = emqtt_sup:start_link(),
emqtt_networking:boot(),
{ok, Sup}.
{ok, Listeners} = application:get_env(listeners),
emqtt_sup:start_link(Listeners).
stop(_State) ->
ok.

View File

@ -179,7 +179,7 @@ run_socket(State = #state{ socket = Sock }) ->
control_throttle(State = #state{ connection_state = Flow,
conserve = Conserve }) ->
case {Flow, Conserve orelse credit_flow:blocked()} of
case {Flow, Conserve} of
{running, true} -> State #state{ connection_state = blocked };
{blocked, false} -> run_socket(State #state{
connection_state = running });

View File

@ -1,6 +1,6 @@
-module(emqtt_client_sup).
-export([start_link/0]).
-export([start_link/0, start_client/1]).
-behaviour(supervisor2).
@ -14,3 +14,13 @@ init([]) ->
[{client, {emqtt_client, start_link, []},
temporary, 5000, worker, [emqtt_client]}]}}.
start_client(Sock) ->
{ok, Client} = supervisor:start_child(?MODULE, []),
ok = gen_tcp:controlling_process(Sock, Client),
emqtt_client:go(Client, Sock),
%% see comment in rabbit_networking:start_client/2
gen_event:which_handlers(error_logger),
Client.

28
src/emqtt_ctl.erl Normal file
View File

@ -0,0 +1,28 @@
-module(emqtt_ctl).
-include("emqtt.hrl").
-compile(export_all).
status() ->
{InternalStatus, _ProvidedStatus} = init:get_status(),
?PRINT("Node ~p is ~p.", [node(), InternalStatus]),
case lists:keysearch(wifioss, 1, application:which_applications()) of
false ->
?PRINT_MSG("wifioss is not running~n");
{value,_Version} ->
?PRINT_MSG("wifioss is running~n")
end.
cluster_info() ->
Nodes = [node()|nodes()],
?PRINT("cluster nodes: ~p~n", [Nodes]).
cluster(Node) ->
case net_adm:ping(list_to_atom(Node)) of
pong ->
?PRINT("cluster with ~p successfully.~n", [Node]);
pang ->
?PRINT("failed to cluster with ~p~n", [Node])
end.

52
src/emqtt_listener.erl Normal file
View File

@ -0,0 +1,52 @@
-module(emqtt_listener).
-include("emqtt.hrl").
-export([spec/2, listener_started/3, listener_stopped/3]).
spec({Listener, SockOpts}, Callback) ->
[tcp_listener_spec(emqtt_tcp_listener_sup, Address, SockOpts,
mqtt, "TCP Listener", Callback) || Address <- tcp_listener_addresses(Listener)].
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
Protocol, Label, OnConnect) ->
{emqtt_net:tcp_name(NamePrefix, IPAddress, Port),
{tcp_listener_sup, start_link,
[IPAddress, Port, [Family | SocketOpts],
{?MODULE, listener_started, [Protocol]},
{?MODULE, listener_stopped, [Protocol]},
OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}.
tcp_listener_addresses(Port) when is_integer(Port) ->
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
tcp_listener_addresses({Host, Port, auto});
tcp_listener_addresses({Host, Port, Family0})
when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
[{IPAddress, Port, Family} ||
{IPAddress, Family} <- emqtt_net:getaddr(Host, Family0)];
tcp_listener_addresses({_Host, Port, _Family0}) ->
?ERROR("invalid port ~p - not 0..65535~n", [Port]),
throw({error, {invalid_port, Port}}).
tcp_listener_addresses_auto(Port) ->
lists:append([tcp_listener_addresses(Listener) ||
Listener <- emqtt_net:port_to_listeners(Port)]).
%--------------------------------------------
%TODO: callback
%--------------------------------------------
listener_started(Protocol, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
%% We need the host so we can distinguish multiple instances of the above
%% in a cluster.
?INFO("tcp listener started: ~p ~p:~p", [Protocol, IPAddress, Port]).
listener_stopped(Protocol, IPAddress, Port) ->
?INFO("tcp listener stopped: ~p ~p:~p", [Protocol, IPAddress, Port]).

View File

@ -1,9 +1,137 @@
-module(emqtt_net).
-export([tcp_name/3, tcp_host/1, getaddr/2, port_to_listeners/1]).
-export([tune_buffer_size/1, connection_string/2]).
-include_lib("kernel/include/inet.hrl").
-define(FIRST_TEST_BIND_PORT, 10000).
%%--------------------------------------------------------------------
%% inet_parse:address takes care of ip string, like "0.0.0.0"
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
%% and runs 'inet_gethost' port process for dns lookups.
%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
getaddr(Host, Family) ->
case inet_parse:address(Host) of
{ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
{error, _} -> gethostaddr(Host, Family)
end.
gethostaddr(Host, auto) ->
Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
[] -> host_lookup_error(Host, Lookups);
IPs -> IPs
end;
gethostaddr(Host, Family) ->
case inet:getaddr(Host, Family) of
{ok, IPAddress} -> [{IPAddress, Family}];
{error, Reason} -> host_lookup_error(Host, Reason)
end.
host_lookup_error(Host, Reason) ->
error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
throw({error, {invalid_host, Host, Reason}}).
resolve_family({_,_,_,_}, auto) -> inet;
resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
resolve_family(_, F) -> F.
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
%%
%% * Those which treat IPv4 addresses as a special kind of IPv6 address
%% ("Single stack")
%% - Linux by default, Windows Vista and later
%% - We also treat any (hypothetical?) IPv6-only machine the same way
%% * Those which consider IPv6 and IPv4 to be completely separate things
%% ("Dual stack")
%% - OpenBSD, Windows XP / 2003, Linux if so configured
%% * Those which do not support IPv6.
%% - Ancient/weird OSes, Linux if so configured
%%
%% How to reconfigure Linux to test this:
%% Single stack (default):
%% echo 0 > /proc/sys/net/ipv6/bindv6only
%% Dual stack:
%% echo 1 > /proc/sys/net/ipv6/bindv6only
%% IPv4 only:
%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then
%% sudo update-grub && sudo reboot
%%
%% This matters in (and only in) the case where the sysadmin (or the
%% app descriptor) has only supplied a port and we wish to bind to
%% "all addresses". This means different things depending on whether
%% we're single or dual stack. On single stack binding to "::"
%% implicitly includes all IPv4 addresses, and subsequently attempting
%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will
%% only bind to IPv6 addresses, and we need another listener bound to
%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only
%% want to bind to "0.0.0.0".
%%
%% Unfortunately it seems there is no way to detect single vs dual stack
%% apart from attempting to bind to the port.
port_to_listeners(Port) ->
IPv4 = {"0.0.0.0", Port, inet},
IPv6 = {"::", Port, inet6},
case ipv6_status(?FIRST_TEST_BIND_PORT) of
single_stack -> [IPv6];
ipv6_only -> [IPv6];
dual_stack -> [IPv6, IPv4];
ipv4_only -> [IPv4]
end.
ipv6_status(TestPort) ->
IPv4 = [inet, {ip, {0,0,0,0}}],
IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}],
case gen_tcp:listen(TestPort, IPv6) of
{ok, LSock6} ->
case gen_tcp:listen(TestPort, IPv4) of
{ok, LSock4} ->
%% Dual stack
gen_tcp:close(LSock6),
gen_tcp:close(LSock4),
dual_stack;
%% Checking the error here would only let us
%% distinguish single stack IPv6 / IPv4 vs IPv6 only,
%% which we figure out below anyway.
{error, _} ->
gen_tcp:close(LSock6),
case gen_tcp:listen(TestPort, IPv4) of
%% Single stack
{ok, LSock4} -> gen_tcp:close(LSock4),
single_stack;
%% IPv6-only machine. Welcome to the future.
{error, eafnosupport} -> ipv6_only; %% Linux
{error, eprotonosupport}-> ipv6_only; %% FreeBSD
%% Dual stack machine with something already
%% on IPv4.
{error, _} -> ipv6_status(TestPort + 1)
end
end;
%% IPv4-only machine. Welcome to the 90s.
{error, eafnosupport} -> %% Linux
ipv4_only;
{error, eprotonosupport} -> %% FreeBSD
ipv4_only;
%% Port in use
{error, _} ->
ipv6_status(TestPort + 1)
end.
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
lists:flatten(
io_lib:format(
"~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))).
tune_buffer_size(Sock) ->
case getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
@ -14,16 +142,15 @@ tune_buffer_size(Sock) ->
connection_string(Sock, Direction) ->
case socket_ends(Sock, Direction) of
{ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
{ok, format(
"~s:~p -> ~s:~p",
[maybe_ntoab(FromAddress), FromPort,
maybe_ntoab(ToAddress), ToPort])};
{ok, lists:flatten(
io_lib:format(
"~s:~p -> ~s:~p",
[maybe_ntoab(FromAddress), FromPort,
maybe_ntoab(ToAddress), ToPort]))};
Error ->
Error
end.
format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
socket_ends(Sock, Direction) ->
{From, To} = sock_funs(Direction),
case {From(Sock), To(Sock)} of
@ -84,3 +211,5 @@ hostname() ->
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> Hostname
end.

View File

@ -1,257 +0,0 @@
-module(emqtt_networking).
-export([boot/0]).
-export([start_tcp_listener/1, stop_tcp_listener/1, tcp_host/1, ntoab/1]).
%callback.
-export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1]).
-include_lib("kernel/include/inet.hrl").
-define(FIRST_TEST_BIND_PORT, 10000).
boot() ->
{ok, TcpListeners} = application:get_env(tcp_listeners),
[ok = start_tcp_listener(Listener) || Listener <- TcpListeners].
start_tcp_listener(Listener) ->
start_listener(Listener, emqtt, "TCP Listener",
{?MODULE, start_client, []}).
start_listener(Listener, Protocol, Label, OnConnect) ->
[start_listener0(Address, Protocol, Label, OnConnect) ||
Address <- tcp_listener_addresses(Listener)],
ok.
start_listener0(Address, Protocol, Label, OnConnect) ->
Spec = tcp_listener_spec(emqtt_tcp_listener_sup, Address, tcp_opts(),
Protocol, Label, OnConnect),
case supervisor:start_child(emqtt_sup, Spec) of
{ok, _} -> ok;
{error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
exit({could_not_start_tcp_listener,
{ntoa(IPAddress), Port}})
end.
stop_tcp_listener(Listener) ->
[stop_tcp_listener0(Address) ||
Address <- tcp_listener_addresses(Listener)],
ok.
stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = tcp_name(emqtt_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(emqtt_sup, Name),
ok = supervisor:delete_child(emqtt_sup, Name).
tcp_listener_addresses(Port) when is_integer(Port) ->
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
tcp_listener_addresses_auto(Port);
tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
tcp_listener_addresses({Host, Port, auto});
tcp_listener_addresses({Host, Port, Family0})
when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
[{IPAddress, Port, Family} ||
{IPAddress, Family} <- getaddr(Host, Family0)];
tcp_listener_addresses({_Host, Port, _Family0}) ->
error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
throw({error, {invalid_port, Port}}).
tcp_listener_addresses_auto(Port) ->
lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
Protocol, Label, OnConnect) ->
{tcp_name(NamePrefix, IPAddress, Port),
{tcp_listener_sup, start_link,
[IPAddress, Port, [Family | SocketOpts],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}.
tcp_listener_started(Protocol, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
%% We need the host so we can distinguish multiple instances of the above
%% in a cluster.
error_logger:info_msg("tcp listener started: ~p ~p:~p", [Protocol, IPAddress, Port]).
tcp_listener_stopped(Protocol, IPAddress, Port) ->
error_logger:info_msg("tcp listener stopped: ~p ~p:~p", [Protocol, IPAddress, Port]).
start_client(Sock) ->
{ok, Client} = supervisor:start_child(emqtt_client_sup, []),
ok = gen_tcp:controlling_process(Sock, Client),
emqtt_client:go(Client, Sock),
%% see comment in rabbit_networking:start_client/2
gen_event:which_handlers(error_logger),
Client.
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
hostname();
tcp_host({0,0,0,0,0,0,0,0}) ->
hostname();
tcp_host(IPAddress) ->
case inet:gethostbyaddr(IPAddress) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> ntoa(IPAddress)
end.
hostname() ->
{ok, Hostname} = inet:gethostname(),
case inet:gethostbyname(Hostname) of
{ok, #hostent{h_name = Name}} -> Name;
{error, _Reason} -> Hostname
end.
tcp_opts() ->
{ok, Opts} = application:get_env(emqtt, tcp_listen_options),
Opts.
%% inet_parse:address takes care of ip string, like "0.0.0.0"
%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
%% and runs 'inet_gethost' port process for dns lookups.
%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
getaddr(Host, Family) ->
case inet_parse:address(Host) of
{ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
{error, _} -> gethostaddr(Host, Family)
end.
gethostaddr(Host, auto) ->
Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
[] -> host_lookup_error(Host, Lookups);
IPs -> IPs
end;
gethostaddr(Host, Family) ->
case inet:getaddr(Host, Family) of
{ok, IPAddress} -> [{IPAddress, Family}];
{error, Reason} -> host_lookup_error(Host, Reason)
end.
host_lookup_error(Host, Reason) ->
error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
throw({error, {invalid_host, Host, Reason}}).
resolve_family({_,_,_,_}, auto) -> inet;
resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
resolve_family(_, F) -> F.
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
%%
%% * Those which treat IPv4 addresses as a special kind of IPv6 address
%% ("Single stack")
%% - Linux by default, Windows Vista and later
%% - We also treat any (hypothetical?) IPv6-only machine the same way
%% * Those which consider IPv6 and IPv4 to be completely separate things
%% ("Dual stack")
%% - OpenBSD, Windows XP / 2003, Linux if so configured
%% * Those which do not support IPv6.
%% - Ancient/weird OSes, Linux if so configured
%%
%% How to reconfigure Linux to test this:
%% Single stack (default):
%% echo 0 > /proc/sys/net/ipv6/bindv6only
%% Dual stack:
%% echo 1 > /proc/sys/net/ipv6/bindv6only
%% IPv4 only:
%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then
%% sudo update-grub && sudo reboot
%%
%% This matters in (and only in) the case where the sysadmin (or the
%% app descriptor) has only supplied a port and we wish to bind to
%% "all addresses". This means different things depending on whether
%% we're single or dual stack. On single stack binding to "::"
%% implicitly includes all IPv4 addresses, and subsequently attempting
%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will
%% only bind to IPv6 addresses, and we need another listener bound to
%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only
%% want to bind to "0.0.0.0".
%%
%% Unfortunately it seems there is no way to detect single vs dual stack
%% apart from attempting to bind to the port.
port_to_listeners(Port) ->
IPv4 = {"0.0.0.0", Port, inet},
IPv6 = {"::", Port, inet6},
case ipv6_status(?FIRST_TEST_BIND_PORT) of
single_stack -> [IPv6];
ipv6_only -> [IPv6];
dual_stack -> [IPv6, IPv4];
ipv4_only -> [IPv4]
end.
ipv6_status(TestPort) ->
IPv4 = [inet, {ip, {0,0,0,0}}],
IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}],
case gen_tcp:listen(TestPort, IPv6) of
{ok, LSock6} ->
case gen_tcp:listen(TestPort, IPv4) of
{ok, LSock4} ->
%% Dual stack
gen_tcp:close(LSock6),
gen_tcp:close(LSock4),
dual_stack;
%% Checking the error here would only let us
%% distinguish single stack IPv6 / IPv4 vs IPv6 only,
%% which we figure out below anyway.
{error, _} ->
gen_tcp:close(LSock6),
case gen_tcp:listen(TestPort, IPv4) of
%% Single stack
{ok, LSock4} -> gen_tcp:close(LSock4),
single_stack;
%% IPv6-only machine. Welcome to the future.
{error, eafnosupport} -> ipv6_only; %% Linux
{error, eprotonosupport}-> ipv6_only; %% FreeBSD
%% Dual stack machine with something already
%% on IPv4.
{error, _} -> ipv6_status(TestPort + 1)
end
end;
%% IPv4-only machine. Welcome to the 90s.
{error, eafnosupport} -> %% Linux
ipv4_only;
{error, eprotonosupport} -> %% FreeBSD
ipv4_only;
%% Port in use
{error, _} ->
ipv6_status(TestPort + 1)
end.
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
ntoa(IP) ->
inet_parse:ntoa(IP).
ntoab(IP) ->
Str = ntoa(IP),
case string:str(Str, ":") of
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).

View File

@ -1,6 +1,7 @@
-module(emqtt_router).
-include("emqtt.hrl").
-include("emqtt_frame.hrl").
-export([start_link/0]).
@ -44,8 +45,8 @@ delete(Sub) when is_record(Sub, subscriber) ->
gen_server:cast(?MODULE, {delete, Sub}).
init([]) ->
Res = ets:new(subscriber, [bag, protected, named_table, {keypos, 2}]),
error_logger:info_msg("emqtt_router is started: ~p~n", [Res]),
ets:new(subscriber, [bag, named_table, {keypos, 2}]),
?INFO_MSG("emqtt_router is started."),
{ok, #state{}}.
handle_call({insert, Sub}, _From, State) ->

View File

@ -5,7 +5,7 @@
-behaviour(supervisor).
%% API
-export([start_link/0]).
-export([start_link/1]).
%% Supervisor callbacks
-export([init/1]).
@ -17,17 +17,25 @@
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_link(Listeners) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Listeners]).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
init([Listeners]) ->
{ok, { {one_for_all, 5, 10}, [
?CHILD(emqtt_topic, worker),
?CHILD(emqtt_router, worker),
?CHILD(emqtt_client_sup, supervisor)
]} }.
| listener_children(Listeners) ]}
}.
listener_children(Listeners) ->
lists:append([emqtt_listener:spec(Listener,
{emqtt_client_sup, start_client, []}) || Listener <- Listeners]).

View File

@ -50,7 +50,7 @@ init([]) ->
{record_name, topic},
{ram_copies, [node()]},
{attributes, record_info(fields, topic)}]),
error_logger:info_msg("emqtt_topic is started."),
?INFO_MSG("emqtt_topic is started."),
{ok, #state{}}.
handle_call({insert, Topic}, _From, State) ->

1227
src/file_handle_cache.erl Normal file

File diff suppressed because it is too large Load Diff

1234
src/gen_server2.erl Normal file

File diff suppressed because it is too large Load Diff

194
src/priority_queue.erl Normal file
View File

@ -0,0 +1,194 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
%% queues, except that a) there is an in/3 that takes a priority, and
%% b) we have only implemented the core API we need.
%%
%% Priorities should be integers - the higher the value the higher the
%% priority - but we don't actually check that.
%%
%% in/2 inserts items with priority 0.
%%
%% We optimise the case where a priority queue is being used just like
%% an ordinary queue. When that is the case we represent the priority
%% queue as an ordinary queue. We could just call into the 'queue'
%% module for that, but for efficiency we implement the relevant
%% functions directly in here, thus saving on inter-module calls and
%% eliminating a level of boxing.
%%
%% When the queue contains items with non-zero priorities, it is
%% represented as a sorted kv list with the inverted Priority as the
%% key and an ordinary queue as the value. Here again we use our own
%% ordinary queue implemention for efficiency, often making recursive
%% calls into the same function knowing that ordinary queues represent
%% a base case.
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
out/1, join/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([q/0]).
-type(q() :: pqueue()).
-type(priority() :: integer() | 'infinity').
-type(squeue() :: {queue, [any()], [any()]}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
-spec(is_queue/1 :: (any()) -> boolean()).
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
%%----------------------------------------------------------------------------
new() ->
{queue, [], []}.
is_queue({queue, R, F}) when is_list(R), is_list(F) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
lists:all(fun ({infinity, Q}) -> is_queue(Q);
({P, Q}) -> is_integer(P) andalso is_queue(Q)
end, Queues);
is_queue(_) ->
false.
is_empty({queue, [], []}) ->
true;
is_empty(_) ->
false.
len({queue, R, F}) when is_list(R), is_list(F) ->
length(R) + length(F);
len({pqueue, Queues}) ->
lists:sum([len(Q) || {_, Q} <- Queues]).
to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
in(Item, Q) ->
in(Item, 0, Q).
in(X, 0, {queue, [_] = In, []}) ->
{queue, [X], In};
in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
{queue, [X|In], Out};
in(X, Priority, _Q = {queue, [], []}) ->
in(X, Priority, {pqueue, []});
in(X, Priority, Q = {queue, _, _}) ->
in(X, Priority, {pqueue, [{0, Q}]});
in(X, Priority, {pqueue, Queues}) ->
P = maybe_negate_priority(Priority),
{pqueue, case lists:keysearch(P, 1, Queues) of
{value, {_, Q}} ->
lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
false when P == infinity ->
[{P, {queue, [X], []}} | Queues];
false ->
case Queues of
[{infinity, InfQueue} | Queues1] ->
[{infinity, InfQueue} |
lists:keysort(1, [{P, {queue, [X], []}} | Queues1])];
_ ->
lists:keysort(1, [{P, {queue, [X], []}} | Queues])
end
end}.
out({queue, [], []} = Q) ->
{empty, Q};
out({queue, [V], []}) ->
{{value, V}, {queue, [], []}};
out({queue, [Y|In], []}) ->
[V|Out] = lists:reverse(In, []),
{{value, V}, {queue, [Y], Out}};
out({queue, In, [V]}) when is_list(In) ->
{{value,V}, r2f(In)};
out({queue, In,[V|Out]}) when is_list(In) ->
{{value, V}, {queue, In, Out}};
out({pqueue, [{P, Q} | Queues]}) ->
{R, Q1} = out(Q),
NewQ = case is_empty(Q1) of
true -> case Queues of
[] -> {queue, [], []};
[{0, OnlyQ}] -> OnlyQ;
[_|_] -> {pqueue, Queues}
end;
false -> {pqueue, [{P, Q1} | Queues]}
end,
{R, NewQ}.
join(A, {queue, [], []}) ->
A;
join({queue, [], []}, B) ->
B;
join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
{queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
join(A = {queue, _, _}, {pqueue, BPQ}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ),
Post1 = case Post of
[] -> [ {0, A} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
_ -> [ {0, A} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, B = {queue, _, _}) ->
{Pre, Post} =
lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ),
Post1 = case Post of
[] -> [ {0, B} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
_ -> [ {0, B} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, {pqueue, BPQ}) ->
{pqueue, merge(APQ, BPQ, [])}.
merge([], BPQ, Acc) ->
lists:reverse(Acc, BPQ);
merge(APQ, [], Acc) ->
lists:reverse(Acc, APQ);
merge([{P, A}|As], [{P, B}|Bs], Acc) ->
merge(As, Bs, [ {P, join(A, B)} | Acc ]);
merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As, Bs, [ {PA, A} | Acc ]);
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P.

1232
src/supervisor2.erl Normal file

File diff suppressed because it is too large Load Diff

89
src/tcp_acceptor.erl Normal file
View File

@ -0,0 +1,89 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
-behaviour(gen_server).
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {callback, sock, ref}).
%%--------------------------------------------------------------------
start_link(Callback, LSock) ->
gen_server:start_link(?MODULE, {Callback, LSock}, []).
%%--------------------------------------------------------------------
init({Callback, LSock}) ->
gen_server:cast(self(), accept),
{ok, #state{callback=Callback, sock=LSock}}.
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
ok = file_handle_cache:obtain(),
accept(State);
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({inet_async, LSock, Ref, {ok, Sock}},
State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
%% patch up the socket so it looks like one we got from
%% gen_tcp:accept/1
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
%% handle
file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
ok = file_handle_cache:obtain(),
%% accept more
accept(State);
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
{stop, {accept_failed, Reason}, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
end.

43
src/tcp_acceptor_sup.erl Normal file
View File

@ -0,0 +1,43 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
-behaviour(supervisor).
-export([start_link/2]).
-export([init/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
start_link(Name, Callback) ->
supervisor:start_link({local,Name}, ?MODULE, Callback).
init(Callback) ->
{ok, {{simple_one_for_one, 10, 10},
[{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
transient, brutal_kill, worker, [tcp_acceptor]}]}}.

113
src/tcp_listener.erl Normal file
View File

@ -0,0 +1,113 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
-behaviour(gen_server).
-export([start_link/8]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {sock, on_startup, on_shutdown, label}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/8 ::
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
integer(), atom(), mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
OnStartup, OnShutdown, Label) ->
gen_server:start_link(
?MODULE, {IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
OnStartup, OnShutdown, Label}, []).
%%--------------------------------------------------------------------
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
error_logger:info_msg(
"started ~s on ~s:~p~n",
[Label, ntoab(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
"failed to start ~s on ~s:~p - ~p (~s)~n",
[Label, ntoab(IPAddress), Port,
Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
error_logger:info_msg("stopped ~s on ~s:~p~n",
[Label, ntoab(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
%% when IPv6 is enabled but not used (i.e. 99% of the time).
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
ntoa(IP) ->
inet_parse:ntoa(IP).
ntoab(IP) ->
Str = ntoa(IP),
case string:str(Str, ":") of
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.

79
src/tcp_listener_sup.erl Normal file
View File

@ -0,0 +1,79 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
-behaviour(supervisor).
-export([start_link/7, start_link/8]).
-export([init/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/7 ::
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
mfargs(), mfargs(), mfargs(), string()) ->
rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
(inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
mfargs(), mfargs(), mfargs(), integer(), string()) ->
rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, 1, Label).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount, Label) ->
supervisor:start_link(
?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount, Label}).
init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount, Label}) ->
%% This is gross. The tcp_listener needs to know about the
%% tcp_acceptor_sup, and the only way I can think of accomplishing
%% that without jumping through hoops is to register the
%% tcp_acceptor_sup.
Name = tcp_name(tcp_acceptor_sup, IPAddress, Port),
{ok, {{one_for_all, 10, 10},
[{tcp_acceptor_sup, {tcp_acceptor_sup, start_link,
[Name, AcceptCallback]},
transient, infinity, supervisor, [tcp_acceptor_sup]},
{tcp_listener, {tcp_listener, start_link,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
OnStartup, OnShutdown, Label]},
transient, 16#ffffffff, worker, [tcp_listener]}]}}.
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
lists:flatten(
io_lib:format(
"~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))).