Merge branch 'master' into plus

This commit is contained in:
Feng 2016-06-05 12:51:35 +08:00
commit eaf254e919
21 changed files with 607 additions and 119 deletions

24
.gitmodules vendored
View File

@ -1,27 +1,3 @@
[submodule "plugins/emqttd_dashboard"]
path = plugins/emqttd_dashboard
url = https://github.com/emqtt/emqttd_dashboard.git
[submodule "plugins/emqttd_plugin_template"]
path = plugins/emqttd_plugin_template
url = https://github.com/emqtt/emqttd_plugin_template.git
[submodule "plugins/emqttd_plugin_pgsql"]
path = plugins/emqttd_plugin_pgsql
url = https://github.com/emqtt/emqttd_plugin_pgsql.git
[submodule "plugins/emqttd_plugin_mysql"]
path = plugins/emqttd_plugin_mysql
url = https://github.com/emqtt/emqttd_plugin_mysql.git
[submodule "plugins/emqttd_sockjs"]
path = plugins/emqttd_sockjs
url = https://github.com/emqtt/emqttd_sockjs.git
[submodule "plugins/emqttd_stomp"]
path = plugins/emqttd_stomp
url = https://github.com/emqtt/emqttd_stomp.git
[submodule "plugins/emqttd_recon"]
path = plugins/emqttd_recon
url = https://github.com/emqtt/emqttd_recon.git
[submodule "plugins/emqttd_plugin_redis"]
path = plugins/emqttd_plugin_redis
url = https://github.com/emqtt/emqttd_plugin_redis.git
[submodule "plugins/emqttd_reloader"]
path = plugins/emqttd_reloader
url = https://github.com/emqtt/emqttd_reloader.git

View File

@ -1,7 +1,10 @@
language: erlang
otp_release:
- 17.0
- 18.0
- 18.1
- 18.2.1
- 18.3
script:
- make

View File

@ -5,7 +5,7 @@ emqttd is a massively scalable and clusterable MQTT V3.1/V3.1.1 broker written i
emqttd is fully open source and licensed under the Apache Version 2.0. emqttd implements both MQTT V3.1 and V3.1.1 protocol specifications, and supports WebSocket, STOMP, SockJS, CoAP and MQTT-SN at the same time.
emqttd requires Erlang R17+ to build.
emqttd requires Erlang R18+ to build since 1.1 release.
Demo Server: tcp://t.emqtt.io:1883
@ -37,7 +37,7 @@ The emqttd project is aimed to implement a scalable, distributed, extensible ope
* IpAddress Authentication
* Username and Password Authentication
* Access control based on IpAddress, ClientID, Username
* Authentication with LDAP, Redis, MySQL, PostgreSQL
* Authentication with LDAP, Redis, MySQL, PostgreSQL and HTTP API
* Cluster brokers on several servers
* Bridge brokers locally or remotely
* mosquitto, RSMB bridge
@ -65,9 +65,11 @@ Plugin | Desc
[emqttd_plugin_pgsql](https://github.com/emqtt/emqttd_plugin_pgsql) | PostgreSQL Authentication/ACL Plugin
[emqttd_plugin_redis](https://github.com/emqtt/emqttd_plugin_redis) | Redis Authentication/ACL Plugin
[emqttd_plugin_mongo](https://github.com/emqtt/emqttd_plugin_mongo) | MongoDB Authentication/ACL Plugin
[emqttd_auth_http](https://github.com/emqtt/emqttd_auth_http) | Authentication/ACL by HTTP API
[emqttd_stomp](https://github.com/emqtt/emqttd_stomp) | Stomp Protocol Plugin
[emqttd_sockjs](https://github.com/emqtt/emqttd_sockjs) | SockJS(Stomp) Plugin
[emqttd_recon](https://github.com/emqtt/emqttd_recon) | Recon Plugin
[emqttd_reloader](https://github.com/emqtt/emqttd_reloader) | Reloader Plugin
## Dashboard
@ -90,7 +92,7 @@ Download binary package for Linux, Mac and Freebsd from [http://emqtt.io/downloa
Installing on Ubuntu64, for example:
```sh
unzip emqttd-macosx-0.16.0-beta-20160216.zip && cd emqttd
unzip emqttd-ubuntu64-0.16.0-beta-20160216.zip && cd emqttd
# start console
./bin/emqttd console

View File

@ -5,3 +5,7 @@ or
http://emqttd-docs.rtfd.org
or
http://emqttd.io/docs

View File

@ -5,6 +5,133 @@
Changes
=======
.. _release_1.1.1:
-------------
Version 1.1.1
-------------
*Release Date: 2016-06-04*
Compatible with the Qos0 PUBREL packet (#575)
phpMqtt Client Compatibility (#572)
java.io.EOFException using paho java client (#551)
.. _release_1.1:
-----------
Version 1.1
-----------
*Release Date: 2016-06-01*
Highlights
----------
Upgrade eSockd library to 4.0 and Support IPv6
Support to listen on specific IP Address::
{mqtt, {"192.168.1.20", 1883}, [
...
]},
Add MongoDB, HTTP Authentication/ACL Plugins
Upgrade MySQL, PostgreSQL, Redis Plugins to support superuser authentication and avoid SQL Injection
Enhancements
------------
Allow human-friendly IP addresses (PR#395)
File operation error: emfile (#445)
emqttd_plugin_mongo not found in emqttd (#489)
emqttd_plugin_mongo Error While Loading in emqttd (#505)
Feature request: HTTP Authentication (#541)
Compatible with the Qos0 PUBREL packet (#575)
Bugfix
------
Bugfix: function_clause exception occurs when registering a duplicated authentication module (#542)
Bugfix: ./emqttd_top msg_q result: {"init terminating in do_boot",{undef,[{etop,start,[],[]},{init,start_it,1,[]},{init,start_em,1,[]}]}} (#557)
Tests
-----
111 common test cases.
Dashboard Plugin
----------------
WebSocket Page: Support 'Clean Session', Qos, Retained parameters (emqttd_dashboard#52)
Upgrade eSockd library to 4.0, Show OTP Release on Overview Page (emqttd_dashboard#61)
Changing dashboard credentials for username authentication (emqttd_dashboard#56)
Add './bin/emqttd_ctl admins' CLIsupport to add/delete admins
HTTP Auth Plugin
----------------
Authentication/ACL by HTTP API: https://github.com/emqtt/emqttd_auth_http
MongoDB Plugin
--------------
Upgrade Erlang MongoDB driver to v1.0.0
Support superuser authentication
Support ACL (emqttd_plugin_mongo#3)
MySQL Plugin
------------
Support superuser authentication
Use parameterized query to avoid SQL Injection
Postgre Plugin
--------------
Support superuser authentication
Use parameterized query to avoid SQL Injection
Redis Plugin
------------
Support superuser authentication
Support ClientId authentication by '%c' variable
Reloader Plugin
---------------
Reload modified modules during development automatically.
.. _release_1.0.3:
-------------
Version 1.0.3
-------------
*Release Date: 2016-05-23*
eSockd 3.2
MochiWeb 4.0.1
.. _release_1.0.2:
-------------

View File

@ -20,7 +20,7 @@ Show running status of the broker::
$ ./bin/emqttd_ctl status
Node 'emqttd@127.0.0.1' is started
emqttd 0.16.0 is running
emqttd 1.1 is running
.. _command_broker::
@ -756,3 +756,41 @@ mnesia
Show system_info of mnesia database.
------
admins
------
The 'admins' CLI is used to add/del admin account, which is registered by the dashboard plugin.
+------------------------------------+-----------------------------+
| admins add <Username> <Password> | Add admin account |
+------------------------------------+-----------------------------+
| admins passwd <Username> <Password>| Reset admin password |
+------------------------------------+-----------------------------+
| admins del <Username> | Delete admin account |
+------------------------------------+-----------------------------+
admins add
----------
Add admin account::
$ ./bin/emqttd_ctl admins add root public
ok
admins passwd
-------------
Reset password::
$ ./bin/emqttd_ctl admins passwd root private
ok
admins del
----------
Delete admin account::
$ ./bin/emqttd_ctl admins del root
ok

View File

@ -41,7 +41,7 @@ Features
* IpAddress Authentication
* Username and Password Authentication
* Access control based on IpAddress, ClientID, Username
* Authentication with LDAP, Redis, MySQL, PostgreSQL
* Authentication with LDAP, Redis, MySQL, PostgreSQL and HTTP API
* Cluster brokers on several servers
* Bridge brokers locally or remotely
* mosquitto, RSMB bridge
@ -63,7 +63,7 @@ Installing on Mac, for example:
.. code-block:: bash
unzip emqttd-macosx-0.16.0-beta-20160216.zip && cd emqttd
unzip emqttd-macosx-1.1-beta-20160601.zip && cd emqttd
# Start emqttd
./bin/emqttd start
@ -77,7 +77,7 @@ Installing on Mac, for example:
Installing from Source
----------------------
.. NOTE:: emqttd broker requires Erlang R17+ to build.
.. NOTE:: emqttd broker requires Erlang R18+ to build since 1.1 release.
.. code-block:: bash
@ -163,6 +163,8 @@ A plugin is an Erlang application to extend the emqttd broker.
+----------------------------+-----------------------------------+
| `emqttd_dashboard`_ | Web Dashboard |
+----------------------------+-----------------------------------+
| `emqttd_auth_http`_ | Authentication/ACL with HTTP API |
+----------------------------+-----------------------------------+
| `emqttd_plugin_mysql`_ | Authentication with MySQL |
+----------------------------+-----------------------------------+
| `emqttd_plugin_pgsql`_ | Authentication with PostgreSQL |
@ -289,6 +291,7 @@ GitHub: https://github.com/emqtt
.. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template
.. _emqttd_dashboard: https://github.com/emqtt/emqttd_dashboard
.. _emqttd_auth_http: https://github.com/emqtt/emqttd_auth_http
.. _emqttd_plugin_mysql: https://github.com/emqtt/emqttd_plugin_mysql
.. _emqttd_plugin_pgsql: https://github.com/emqtt/emqttd_plugin_pgsql
.. _emqttd_plugin_redis: https://github.com/emqtt/emqttd_plugin_redis

View File

@ -35,7 +35,7 @@ Download binary packages from: http://emqtt.io/downloads
The package name consists of platform, version and release time.
For example: emqttd-centos64-0.16.0-beta-20160216.zip
For example: emqttd-centos64-1.1-beta-20160601.zip
.. _install_on_linux:
@ -47,7 +47,7 @@ Download CentOS Package from: http://emqtt.io/downloads/centos, and then unzip:
.. code-block:: bash
unzip emqttd-centos64-0.16.0-beta-20160216.zip
unzip emqttd-centos64-1.1-beta-20160601.zip
Start the broker in console mode:
@ -80,7 +80,7 @@ If the broker is started successfully, console will print:
mqtt listen on 0.0.0.0:1883 with 16 acceptors.
mqtts listen on 0.0.0.0:8883 with 4 acceptors.
http listen on 0.0.0.0:8083 with 4 acceptors.
Erlang MQTT Broker 0.16.0 is running now
Erlang MQTT Broker 1.1 is running now
Eshell V6.4 (abort with ^G)
(emqttd@127.0.0.1)1>
@ -100,7 +100,7 @@ Check the running status of the broker:
$ ./bin/emqttd_ctl status
Node 'emqttd@127.0.0.1' is started
emqttd 0.16.0 is running
emqttd 1.1 is running
Or check the status by URL::

View File

@ -16,18 +16,24 @@ The plugins that emqtt project released:
+---------------------------+---------------------------+
| `emqttd_dashboard`_ | Web Dashboard |
+---------------------------+---------------------------+
| `emqttd_auth_http`_ | HTTP Auth/ACL Plugin |
+---------------------------+---------------------------+
| `emqttd_plugin_mysql`_ | MySQL Auth/ACL Plugin |
+---------------------------+---------------------------+
| `emqttd_plugin_pgsql`_ | PostgreSQL Auth/ACL Plugin|
+---------------------------+---------------------------+
| `emqttd_plugin_redis`_ | Redis Auth/ACL Plugin |
+---------------------------+---------------------------+
| `emqttd_plugin_mongo`_ | MongoDB Auth/ACL Plugin |
+---------------------------+---------------------------+
| `emqttd_stomp`_ | STOMP Protocol Plugin |
+---------------------------+---------------------------+
| `emqttd_sockjs`_ | STOMP over SockJS Plugin |
+---------------------------+---------------------------+
| `emqttd_recon`_ | Recon Plugin |
+---------------------------+---------------------------+
| `emqttd_reloader`_ | Reloader Plugin |
+---------------------------+---------------------------+
----------------------------------------
emqttd_plugin_template - Template Plugin
@ -81,10 +87,6 @@ emqttd_dashboard/etc/plugin.config:
[
{emqttd_dashboard, [
{default_admin, [
{login, "admin"},
{password, "public"}
]},
{listener,
{emqttd_dashboard, 18083, [
{acceptors, 4},
@ -93,6 +95,75 @@ emqttd_dashboard/etc/plugin.config:
]}
].
---------------------------------------
emqttd_auth_http - HTTP Auth/ACL Plugin
---------------------------------------
MQTT Authentication/ACL with HTTP API: https://github.com/emqtt/emqttd_auth_http
.. NOTE:: Supported in 1.1 release
Configure emqttd_auth_http/etc/plugin.config
--------------------------------------------
.. code:: erlang
[
{emqttd_auth_http, [
%% Variables: %u = username, %c = clientid, %a = ipaddress, %t = topic
{super_req, [
{method, post},
{url, "http://localhost:8080/mqtt/superuser"},
{params, [
{username, "%u"},
{clientid, "%c"}
]}
]},
{auth_req, [
{method, post},
{url, "http://localhost:8080/mqtt/auth"},
{params, [
{clientid, "%c"},
{username, "%u"},
{password, "%P"}
]}
]},
%% 'access' parameter: sub = 1, pub = 2
{acl_req, [
{method, post},
{url, "http://localhost:8080/mqtt/acl"},
{params, [
{access, "%A"},
{username, "%u"},
{clientid, "%c"},
{ipaddr, "%a"},
{topic, "%t"}
]}
]}
]}
].
HTTP API
--------
Return 200 if ok
Return 4xx if unauthorized
Load emqttd_auth_http plugin
----------------------------
.. code:: bash
./bin/emqttd_ctl plugins load emqttd_auth_http
-------------------------------------------
emqttd_plugin_mysql - MySQL Auth/ACL Plugin
-------------------------------------------
@ -109,6 +180,7 @@ MQTT User Table
`username` varchar(100) DEFAULT NULL,
`password` varchar(100) DEFAULT NULL,
`salt` varchar(20) DEFAULT NULL,
`is_superuser` tinyint(1) DEFAULT 0,
`created` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_username` (`username`)
@ -130,6 +202,14 @@ MQTT ACL Table
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
VALUES
(1,1,NULL,'$all',NULL,2,'#'),
(2,0,NULL,'$all',NULL,1,'$SYS/#'),
(3,0,NULL,'$all',NULL,1,'eq #'),
(5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
(6,1,'127.0.0.1',NULL,NULL,2,'#'),
(7,1,NULL,'dashboard',NULL,1,'$SYS/#');
Configure emqttd_plugin_mysql/etc/plugin.config
-----------------------------------------------
@ -140,11 +220,11 @@ Configure MySQL host, username, password and database:
[
{emqttd_plugin_mysql, [
{emqttd_plugin_mysql, [
{mysql_pool, [
%% ecpool options
{pool_size, 4},
{pool_size, 8},
{auto_reconnect, 3},
%% mysql options
@ -156,10 +236,15 @@ Configure MySQL host, username, password and database:
{encoding, utf8}
]},
%% select password only
%% Variables: %u = username, %c = clientid, %a = ipaddress
%% Superuser Query
{superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
%% Authentication Query: select password only
{authquery, "select password from mqtt_user where username = '%u' limit 1"},
%% hash algorithm: md5, sha, sha256, pbkdf2?
%% hash algorithm: plain, md5, sha, sha256, pbkdf2?
{password_hash, sha256},
%% select password with salt
@ -171,12 +256,15 @@ Configure MySQL host, username, password and database:
%% sha256 with salt suffix
%% {password_hash, {sha256, salt}},
%% comment this query, the acl will be disabled
{aclquery, "select * from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
%% '%a' = ipaddress, '%u' = username, '%c' = clientid
%% Comment this query, the acl will be disabled
{aclquery, "select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},
%% If no rules matched, return...
%% If no ACL rules matched, return...
{acl_nomatch, allow}
]}
]}
].
Load emqttd_plugin_mysql plugin
@ -199,6 +287,7 @@ MQTT User Table
CREATE TABLE mqtt_user (
id SERIAL primary key,
is_superuser boolean,
username character varying(100),
password character varying(100),
salt character varying(40)
@ -240,23 +329,29 @@ Configure host, username, password and database of PostgreSQL:
{emqttd_plugin_pgsql, [
{pgsql_pool, [
%% ecpool options
{pool_size, 4},
{auto_reconnect, 3},
%% ecpool options
{pool_size, 8},
{auto_reconnect, 3},
%% pgsql options
{host, "localhost"},
{port, 5432},
{username, "feng"},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
%% pgsql options
{host, "localhost"},
{port, 5432},
{ssl, false},
{username, "feng"},
{password, ""},
{database, "mqtt"},
{encoding, utf8}
]},
%% select password only
%% Variables: %u = username, %c = clientid, %a = ipaddress
%% Superuser Query
{superquery, "select is_superuser from mqtt_user where username = '%u' limit 1"},
%% Authentication Query: select password only
{authquery, "select password from mqtt_user where username = '%u' limit 1"},
%% hash algorithm: md5, sha, sha256, pbkdf2?
%% hash algorithm: plain, md5, sha, sha256, pbkdf2?
{password_hash, sha256},
%% select password with salt
@ -288,7 +383,7 @@ Load emqttd_plugin_pgsql Plugin
emqttd_plugin_redis - Redis Auth/ACL Plugin
-------------------------------------------
MQTT Authentication, ACL with Redis.
MQTT Authentication, ACL with Redis: https://github.com/emqtt/emqttd_plugin_redis
Configure emqttd_plugin_redis/etc/plugin.config
-----------------------------------------------
@ -310,6 +405,11 @@ Configure emqttd_plugin_redis/etc/plugin.config
{password, ""}
]},
%% Variables: %u = username, %c = clientid
%% HMGET mqtt_user:%u is_superuser
{supercmd, ["HGET", "mqtt_user:%u", "is_superuser"]},
%% HMGET mqtt_user:%u password
{authcmd, ["HGET", "mqtt_user:%u", "password"]},
@ -322,18 +422,37 @@ Configure emqttd_plugin_redis/etc/plugin.config
%% If no rules matched, return...
{acl_nomatch, deny},
%% Store subscriptions to redis when SUBSCRIBE packets received.
{subcmd, ["HMSET", "mqtt_subs:%u"]},
%% Load Subscriptions form Redis when client connected.
{loadsub, ["HGETALL", "mqtt_subs:%u"]},
%% Remove subscriptions from redis when UNSUBSCRIBE packets received.
{unsubcmd, ["HDEL", "mqtt_subs:%u"]}
{subcmd, ["HGETALL", "mqtt_subs:%u"]}
]}
].
User HASH
---------
Set a 'user' hash with 'password' field, for example::
HSET mqtt_user:<username> is_superuser 1
HSET mqtt_user:<username> password "passwd"
ACL Rule SET
------------
The plugin uses a redis SET to store ACL rules::
SADD mqtt_acl:<username> "publish topic1"
SADD mqtt_acl:<username> "subscribe topic2"
SADD mqtt_acl:<username> "pubsub topic3"
Subscription HASH
-----------------
The plugin can store static subscriptions in a redis Hash::
HSET mqtt_subs:<username> topic1 0
HSET mqtt_subs:<username> topic2 1
HSET mqtt_subs:<username> topic3 2
Load emqttd_plugin_redis Plugin
-------------------------------
@ -341,6 +460,114 @@ Load emqttd_plugin_redis Plugin
./bin/emqttd_ctl plugins load emqttd_plugin_redis
---------------------------------------------
emqttd_plugin_mongo - MongoDB Auth/ACL Plugin
---------------------------------------------
MQTT Authentication, ACL with MongoDB: https://github.com/emqtt/emqttd_plugin_mongo
Configure emqttd_plugin_mongo/etc/plugin.config
-----------------------------------------------
.. code-block:: erlang
[
{emqttd_plugin_mongo, [
{mongo_pool, [
{pool_size, 8},
{auto_reconnect, 3},
%% Mongodb Driver Opts
{host, "localhost"},
{port, 27017},
%% {login, ""},
%% {password, ""},
{database, "mqtt"}
]},
%% Variables: %u = username, %c = clientid
%% Superuser Query
{superquery, [
{collection, "mqtt_user"},
{super_field, "is_superuser"},
{selector, {"username", "%u"}}
]},
%% Authentication Query
{authquery, [
{collection, "mqtt_user"},
{password_field, "password"},
%% Hash Algorithm: plain, md5, sha, sha256, pbkdf2?
{password_hash, sha256},
{selector, {"username", "%u"}}
]},
%% ACL Query: "%u" = username, "%c" = clientid
{aclquery, [
{collection, "mqtt_acl"},
{selector, {"username", "%u"}}
]},
%% If no ACL rules matched, return...
{acl_nomatch, deny}
]}
].
MongoDB Database
----------------
.. code-block::
use mqtt
db.createCollection("mqtt_user")
db.createCollection("mqtt_acl")
db.mqtt_user.ensureIndex({"username":1})
User Collection
---------------
.. code-block:: json
{
username: "user",
password: "password hash",
is_superuser: boolean (true, false),
created: "datetime"
}
For example::
db.mqtt_user.insert({username: "test", password: "password hash", is_superuser: false})
db.mqtt_user:insert({username: "root", is_superuser: true})
ACL Collection
--------------
.. code-block:: json
{
username: "username",
clientid: "clientid",
publish: ["topic1", "topic2", ...],
subscribe: ["subtop1", "subtop2", ...],
pubsub: ["topic/#", "topic1", ...]
}
For example::
db.mqtt_acl.insert({username: "test", publish: ["t/1", "t/2"], subscribe: ["user/%u", "client/%c"]})
db.mqtt_acl.insert({username: "admin", pubsub: ["#"]})
Load emqttd_plugin_mongo Plugin
-------------------------------
.. code-block:: bash
./bin/emqttd_ctl plugins load emqttd_plugin_mongo
-----------------------------
emqttd_stomp - STOMP Protocol
-----------------------------
@ -454,6 +681,29 @@ Recon CLI
recon node_stats #recon:node_stats(10, 1000)
recon remote_load Mod #recon:remote_load(Mod)
---------------------------------
emqttd_reloader - Reloader Plugin
---------------------------------
Erlang Module Reloader for Development
.. NOTE:: Don't load the plugin in production!
Load emqttd_reloader Plugin
---------------------------
.. code-block:: bash
./bin/emqttd_ctl plugins load emqttd_reloader
reload CLI
----------
.. code-block:: bash
./bin/emqttd_ctl reload
reload <Module> # Reload a Module
------------------------
Plugin Development Guide
@ -602,11 +852,15 @@ There will be a new CLI after the plugin loaded::
.. _emqttd_dashboard: https://github.com/emqtt/emqttd_dashboard
.. _emqttd_auth_http: https://github.com/emqtt/emqttd_auth_http
.. _emqttd_plugin_mysql: https://github.com/emqtt/emqttd_plugin_mysql
.. _emqttd_plugin_pgsql: https://github.com/emqtt/emqttd_plugin_pgsql
.. _emqttd_plugin_redis: https://github.com/emqtt/emqttd_plugin_redis
.. _emqttd_plugin_mongo: https://github.com/emqtt/emqttd_plugin_mongo
.. _emqttd_stomp: https://github.com/emqtt/emqttd_stomp
.. _emqttd_sockjs: https://github.com/emqtt/emqttd_sockjs
.. _emqttd_recon: https://github.com/emqtt/emqttd_recon
.. _emqttd_reloader: https://github.com/emqtt/emqttd_reloader
.. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template
.. _recon: http://ferd.github.io/recon/

View File

@ -16,7 +16,7 @@
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
%%{lager_console_backend, info},
{lager_console_backend, error},
%%NOTICE: Level >= error
%%{lager_emqtt_backend, error},
{lager_file_backend, [

View File

@ -10,21 +10,22 @@
stdlib,
sasl,
asn1,
syntax_tools,
ssl,
crypto,
syntax_tools,
ssl,
crypto,
eldap,
xmerl,
os_mon,
inets,
goldrush,
os_mon,
inets,
goldrush,
compiler,
runtime_tools,
lager,
{observer, load},
lager,
gen_logger,
gproc,
esockd,
mochiweb,
esockd,
mochiweb,
emqttd
]},
{rel, "start_clean", "",
@ -56,6 +57,7 @@
{app, inets, [{incl_cond, include}]},
{app, compiler, [{incl_cond, include}]},
{app, runtime_tools, [{incl_cond, include}]},
{app, observer, [{incl_cond, include}]},
{app, goldrush, [{incl_cond, include}]},
{app, gen_logger, [{incl_cond, include}]},
{app, lager, [{incl_cond, include}]},

View File

@ -1,7 +1,7 @@
{application, emqttd,
[
{description, "Erlang MQTT Broker"},
{vsn, "1.1"},
{vsn, "1.1.1"},
{id, "emqttd"},
{modules, []},
{registered, []},

View File

@ -19,7 +19,7 @@
-include("emqttd.hrl").
-type who() :: all | binary() |
{ipaddr, esockd_access:cidr()} |
{ipaddr, esockd_cidr:cidr_string()} |
{client, binary()} |
{user, binary()}.
@ -51,8 +51,7 @@ compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A) ->
compile(who, all) ->
all;
compile(who, {ipaddr, CIDR}) ->
{Start, End} = esockd_access:range(CIDR),
{ipaddr, {CIDR, Start, End}};
{ipaddr, esockd_cidr:parse(CIDR, true)};
compile(who, {client, all}) ->
{client, all};
compile(who, {client, ClientId}) ->
@ -107,9 +106,8 @@ match_who(#mqtt_client{username = Username}, {user, Username}) ->
true;
match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) ->
false;
match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, {_CDIR, Start, End}}) ->
I = esockd_access:atoi(IP),
I >= Start andalso I =< End;
match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(Client, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(Client, Who) andalso Allow

View File

@ -33,7 +33,7 @@
{backlog, 512},
{nodelay, true}]).
-type listener() :: {atom(), inet:port_number(), [esockd:option()]}.
-type listener() :: {atom(), esockd:listen_on(), [esockd:option()]}.
%%--------------------------------------------------------------------
%% Application callbacks
@ -172,22 +172,22 @@ start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners)).
%% Start mqtt listener
-spec(start_listener(listener()) -> any()).
start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts);
start_listener({mqtt, ListenOn, Opts}) -> start_listener(mqtt, ListenOn, Opts);
%% Start mqtt(SSL) listener
start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts);
start_listener({mqtts, ListenOn, Opts}) -> start_listener(mqtts, ListenOn, Opts);
%% Start http listener
start_listener({http, Port, Opts}) ->
mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []});
start_listener({http, ListenOn, Opts}) ->
mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
%% Start https listener
start_listener({https, Port, Opts}) ->
mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}).
start_listener({https, ListenOn, Opts}) ->
mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}).
start_listener(Protocol, Port, Opts) ->
start_listener(Protocol, ListenOn, Opts) ->
MFArgs = {emqttd_client, start_link, [emqttd:env(mqtt)]},
esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs).
esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
merge_sockopts(Options) ->
SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,
@ -201,5 +201,6 @@ merge_sockopts(Options) ->
%% @doc Stop Listeners
stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners)).
stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}).
%% @private
stop_listener({Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).

View File

@ -108,9 +108,8 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) ->
[#mqtt_auth_clientid{client_id = ClientId} | Clients];
[ClientId, IpAddr0] ->
IpAddr = string:strip(IpAddr0, right, $\n),
Range = esockd_access:range(IpAddr),
[#mqtt_auth_clientid{client_id = list_to_binary(ClientId),
ipaddr = {IpAddr, Range}}|Clients];
ipaddr = esockd_cidr:parse(IpAddr, true)} | Clients];
BadLine ->
lager:error("BadLine in clients.config: ~s", [BadLine]),
Clients
@ -123,11 +122,12 @@ load(Fd, eof, Clients) ->
check_clientid_only(ClientId, IpAddr) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
[] -> {error, clientid_not_found};
[#?AUTH_CLIENTID_TAB{ipaddr = undefined}] -> ok;
[#?AUTH_CLIENTID_TAB{ipaddr = {_, {Start, End}}}] ->
I = esockd_access:atoi(IpAddr),
case I >= Start andalso I =< End of
[] ->
{error, clientid_not_found};
[#?AUTH_CLIENTID_TAB{ipaddr = undefined}] ->
ok;
[#?AUTH_CLIENTID_TAB{ipaddr = CIDR}] ->
case esockd_cidr:match(IpAddr, CIDR) of
true -> ok;
false -> {error, wrong_ipaddr}
end

58
src/emqttd_base62.erl Normal file
View File

@ -0,0 +1,58 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_base62).
-export([encode/1, decode/1]).
%% @doc Encode an integer to base62 string
-spec(encode(non_neg_integer()) -> binary()).
encode(I) when is_integer(I) andalso I > 0 ->
list_to_binary(encode(I, [])).
encode(I, Acc) when I < 62 ->
[char(I) | Acc];
encode(I, Acc) ->
encode(I div 62, [char(I rem 62) | Acc]).
char(I) when I < 10 ->
$0 + I;
char(I) when I < 36 ->
$A + I - 10;
char(I) when I < 62 ->
$a + I - 36.
%% @doc Decode base62 string to an integer
-spec(decode(string() | binary()) -> integer()).
decode(B) when is_binary(B) ->
decode(binary_to_list(B));
decode(S) when is_list(S) ->
decode(S, 0).
decode([], I) ->
I;
decode([C|S], I) ->
decode(S, I * 62 + byte(C)).
byte(C) when $0 =< C andalso C =< $9 ->
C - $0;
byte(C) when $A =< C andalso C =< $Z ->
C - $A + 10;
byte(C) when $a =< C andalso C =< $z ->
C - $a + 36.

View File

@ -453,12 +453,12 @@ trace_off(Who, Name) ->
%%--------------------------------------------------------------------
%% @doc Listeners Command
listeners([]) ->
foreach(fun({{Protocol, Port}, Pid}) ->
foreach(fun({{Protocol, ListenOn}, Pid}) ->
Info = [{acceptors, esockd:get_acceptors(Pid)},
{max_clients, esockd:get_max_clients(Pid)},
{current_clients,esockd:get_current_clients(Pid)},
{shutdown_count, esockd:get_shutdown_count(Pid)}],
?PRINT("listener on ~s:~w~n", [Protocol, Port]),
?PRINT("listener on ~s:~s~n", [Protocol, esockd:to_string(ListenOn)]),
foreach(fun({Key, Val}) ->
?PRINT(" ~-16s: ~w~n", [Key, Val])
end, Info)

View File

@ -44,10 +44,10 @@ limit(Opts) ->
-> {ok, mqtt_packet()} | {error, any()} | {more, fun()}).
parse(<<>>, {none, Limit}) ->
{more, fun(Bin) -> parse(Bin, {none, Limit}) end};
parse(<<PacketType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
parse_remaining_len(Rest, #mqtt_packet_header{type = PacketType,
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
parse_remaining_len(Rest, #mqtt_packet_header{type = Type,
dup = bool(Dup),
qos = QoS,
qos = fixqos(Type, QoS),
retain = bool(Retain)}, Limit);
parse(Bin, Cont) -> Cont(Bin).
@ -136,14 +136,14 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
1 = Qos,
%% 1 = Qos,
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{?SUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
1 = Qos,
%% 1 = Qos,
<<PacketId:16/big, Rest1/binary>> = FrameBin,
TopicTable = parse_topics(?SUBSCRIBE, Rest1, []),
wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
@ -153,7 +153,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
% wrap(Header, #mqtt_packet_suback{packet_id = PacketId,
% qos_table = parse_qos(Rest1, []) }, Rest);
{?UNSUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
1 = Qos,
%% 1 = Qos,
<<PacketId:16/big, Rest1/binary>> = FrameBin,
Topics = parse_topics(?UNSUBSCRIBE, Rest1, []),
wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
@ -218,3 +218,9 @@ bool(1) -> true.
protocol_name_approved(Ver, Name) ->
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
%% Fix Issue#575
fixqos(?PUBREL, 0) -> 1;
fixqos(?SUBSCRIBE, 0) -> 1;
fixqos(?UNSUBSCRIBE, 0) -> 1;
fixqos(_Type, QoS) -> QoS.

View File

@ -117,8 +117,7 @@ call(SM, Req) ->
init([Pool, Id]) ->
?GPROC_POOL(join, Pool, Id),
{ok, #state{pool = Pool, id = Id,
monitors = dict:new()}}.
{ok, #state{pool = Pool, id = Id, monitors = dict:new()}}.
prioritise_call(_Msg, _From, _Len, _State) ->
1.
@ -175,7 +174,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
[_Sess] -> ok
end
end),
{noreply, erase_monitor(MRef, State)};
{noreply, erase_monitor(MRef, State), hibernate};
error ->
lager:error("MRef of session ~p not found", [DownPid]),
{noreply, State}

View File

@ -118,14 +118,14 @@ check_acl(_) ->
compile_rule(_) ->
{allow, {'and', [{ipaddr, {"127.0.0.1", _I, _I}},
{allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {'or', [{ipaddr, {"127.0.0.1", _I, _I}},
{allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {ipaddr, {"127.0.0.1", _I, _I}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
{allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}),
{allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}),

View File

@ -28,9 +28,11 @@
-define(PQ, priority_queue).
-define(BASE62, emqttd_base62).
all() -> [{group, guid}, {group, opts},
{group, ?PQ}, {group, time},
{group, node}].
{group, node}, {group, base62}].
groups() ->
[{guid, [], [guid_gen]},
@ -38,7 +40,8 @@ groups() ->
{?PQ, [], [priority_queue_plen,
priority_queue_out2]},
{time, [], [time_now_to_]},
{node, [], [node_is_aliving, node_parse_name]}].
{node, [], [node_is_aliving, node_parse_name]},
{base62, [], [base62_encode]}].
%%--------------------------------------------------------------------
%% emqttd_guid
@ -144,3 +147,17 @@ node_parse_name(_) ->
'a@127.0.0.1' = emqttd_node:parse_name("a@127.0.0.1"),
'b@127.0.0.1' = emqttd_node:parse_name("b").
%%--------------------------------------------------------------------
%% base62 encode decode
%%--------------------------------------------------------------------
base62_encode(_) ->
10 = ?BASE62:decode(?BASE62:encode(10)),
100 = ?BASE62:decode(?BASE62:encode(100)),
9999 = ?BASE62:decode(?BASE62:encode(9999)),
65535 = ?BASE62:decode(?BASE62:encode(65535)),
<<X:128/unsigned-big-integer>> = emqttd_guid:gen(),
<<Y:128/unsigned-big-integer>> = emqttd_guid:gen(),
X = ?BASE62:decode(?BASE62:encode(X)),
Y = ?BASE62:decode(?BASE62:encode(Y)).