diff --git a/.gitmodules b/.gitmodules index fd65dc85a..dc7660343 100644 --- a/.gitmodules +++ b/.gitmodules @@ -22,18 +22,6 @@ [submodule "plugins/emqttd_plugin_redis"] path = plugins/emqttd_plugin_redis url = https://github.com/emqtt/emqttd_plugin_redis.git -[submodule "plugins/emqplus_backend_redis"] - path = plugins/emqplus_backend_redis - url = git@github.com:emqplus/emqplus_backend_redis.git -[submodule "plugins/emqplus_backend_mongo"] - path = plugins/emqplus_backend_mongo - url = git@github.com:emqplus/emqplus_backend_mongo.git -[submodule "plugins/emqplus_backend_mysql"] - path = plugins/emqplus_backend_mysql - url = git@github.com:emqplus/emqplus_backend_mysql.git -[submodule "plugins/emqplus_backend_pgsql"] - path = plugins/emqplus_backend_pgsql - url = git@github.com:emqplus/emqplus_backend_pgsql.git -[submodule "plugins/emqplus_backend_cassa"] - path = plugins/emqplus_backend_cassa - url = git@github.com:emqplus/emqplus_backend_cassa.git +[submodule "plugins/emqttd_reloader"] + path = plugins/emqttd_reloader + url = https://github.com/emqtt/emqttd_reloader.git diff --git a/Makefile b/Makefile index c1a173c07..2e02db60e 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,8 @@ DIST = $(BASE_DIR)/rel/$(APP) all: compile -submods: - @git submodule update --init +# submods: +# @git submodule update --init compile: deps @$(REBAR) compile @@ -35,18 +35,7 @@ edoc: rel: compile @cd rel && $(REBAR) generate -f -plugins: - @for plugin in ./plugins/* ; do \ - if [ -d $${plugin} ]; then \ - mkdir -p $(DIST)/$${plugin}/ ; \ - cp -R $${plugin}/ebin $(DIST)/$${plugin}/ ; \ - [ -d "$${plugin}/priv" ] && cp -R $${plugin}/priv $(DIST)/$${plugin}/ ; \ - [ -d "$${plugin}/etc" ] && cp -R $${plugin}/etc $(DIST)/$${plugin}/ ; \ - echo "$${plugin} copied" ; \ - fi \ - done - -dist: rel plugins +dist: rel PLT = $(BASE_DIR)/.emqttd_dialyzer.plt APPS = erts kernel stdlib sasl crypto ssl os_mon syntax_tools \ @@ -63,4 +52,3 @@ build_plt: compile dialyzer: compile dialyzer -Wno_return --plt $(PLT) deps/*/ebin ./ebin plugins/*/ebin - diff --git a/docs/source/bridge.rst b/docs/source/bridge.rst index f2552f6bd..5ff0e3c12 100644 --- a/docs/source/bridge.rst +++ b/docs/source/bridge.rst @@ -33,7 +33,7 @@ Create a bridge that forwards all the 'sensor/#' messages from emqttd1 to emqttd 1. Start Brokers ................ -.. code:: console +.. code-block:: bash cd emqttd1/ && ./bin/emqttd start cd emqttd2/ && ./bin/emqttd start @@ -41,7 +41,7 @@ Create a bridge that forwards all the 'sensor/#' messages from emqttd1 to emqttd 2. Create bridge: emqttd1--sensor/#-->emqttd2 ............................................. -.. code:: console +.. code-block:: bash $ cd emqttd1 && ./bin/emqttd_ctl bridges start emqttd2@127.0.0.1 sensor/# @@ -54,7 +54,7 @@ Create a bridge that forwards all the 'sensor/#' messages from emqttd1 to emqttd 3. Test the bridge ................... -.. code:: console +.. code-block:: bash #emqttd2 mosquitto_sub -t sensor/# -p 2883 -d @@ -65,7 +65,7 @@ Create a bridge that forwards all the 'sensor/#' messages from emqttd1 to emqttd 4. Delete the bridge ..................... -.. code:: console +.. code-block:: bash ./bin/emqttd_ctl bridges stop emqttd2@127.0.0.1 sensor/# @@ -73,7 +73,7 @@ Create a bridge that forwards all the 'sensor/#' messages from emqttd1 to emqttd emqttd Bridge CLI ----------------- -.. code:: console +.. code-block:: bash #query bridges ./bin/emqttd_ctl bridges list diff --git a/docs/source/changes.rst b/docs/source/changes.rst index d16836319..96f29fa03 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -5,6 +5,16 @@ Changes ======= +.. _release_1.0.1: + +------------- +Version 1.0.1 +------------- + +*Release Date: 2016-04-16* + +PR#515 - Fix '$queue' pubsub, add 'pubsub_queue' test and update docs + .. _release_1.0: ------------------------------------ @@ -343,7 +353,7 @@ Benchmark 3.1G memory and 50+ CPU/core: -.. code:: console +.. code-block:: bash Connections: 250K Subscribers: 250K diff --git a/docs/source/cluster.rst b/docs/source/cluster.rst index 5f049f675..8ebd992ba 100644 --- a/docs/source/cluster.rst +++ b/docs/source/cluster.rst @@ -31,7 +31,7 @@ An erlang runtime system called 'node' is identified by a unique name like email Suppose we start four Erlang nodes on localhost: -.. code:: console +.. code-block:: bash erl -name node1@127.0.0.1 erl -name node2@127.0.0.1 @@ -54,7 +54,7 @@ epmd epmd(Erlang Port Mapper Daemon) is a daemon service that is responsible for mapping node names to machine addresses(TCP sockets). The daemon is started automatically on every host where an Erlang node started. -.. code:: console +.. code-block:: bash (node1@127.0.0.1)6> net_adm:names(). {ok,[{"node1",62740}, @@ -79,7 +79,7 @@ Cluster Design The cluster architecture of emqttd broker is based on distrubuted Erlang/OTP and Mnesia database. -The cluster design could be summarized by the following two rules:: +The cluster design could be summarized by the following two rules: 1. When a MQTT client SUBSCRIBE a Topic on a node, the node will tell all the other nodes in the cluster: I subscribed a Topic. @@ -94,7 +94,7 @@ Finally there will be a global route table(Topic -> Node) that replicated to all Topic Trie and Route Table -------------------------- -Every node in the cluster will store a topic trie and route table in mnesia database. +Every node in the cluster will store a topic trie and route table in mnesia database. Suppose that we create subscriptions: @@ -144,22 +144,26 @@ Suppose client1 PUBLISH a message to the topic 't/a', the message Route and Deli Cluster Setup ------------- -Suppose we deploy two nodes cluster on host1, host2: +Suppose we deploy two nodes cluster on s1.emqtt.io, s2.emqtt.io: -+----------------+-----------+---------------------+ -| Node | Host | IP and Port | -+----------------+-----------+---------------------+ -| emqttd@host1 | host1 | 192.168.1.10:1883 | -+----------------+-----------+---------------------+ -| emqttd@host2 | host2 | 192.168.1.20:1883 | -+----------------+-----------+---------------------+ ++--------------------------+-----------------+---------------------+ +| Node | Host(FQDN) | IP and Port | ++--------------------------+-----------------+---------------------+ +| emqttd@s1.emqtt.io or | s1.emqtt.io | 192.168.0.10:1883 | +| emqttd@192.168.0.10 | | | ++--------------------------+-----------------+---------------------+ +| emqttd@s2.emqtt.io or | s2.emqtt.io | 192.168.0.20:1883 | +| emqttd@192.168.0.20 | | | ++--------------------------+-----------------+---------------------+ -emqttd@host1 setting --------------------- +.. WARNING:: The node name is Name@Host, where Host is IP address or the fully qualified host name. + +emqttd@s1.emqtt.io setting +-------------------------- emqttd/etc/vm.args:: - -name emqttd@host1 + -name emqttd@s1.emqtt.io or @@ -167,12 +171,12 @@ emqttd/etc/vm.args:: .. WARNING:: The name cannot be changed after node joined the cluster. -emqttd@host2 setting --------------------- +emqttd@s2.emqtt.io setting +-------------------------- emqttd/etc/vm.args:: - -name emqttd@host2 + -name emqttd@s2.emqtt.io or @@ -181,25 +185,25 @@ emqttd/etc/vm.args:: Join the cluster ---------------- -Start the two broker nodes, and 'cluster join ' on emqttd@host2:: +Start the two broker nodes, and 'cluster join ' on emqttd@s2.emqtt.io:: - $ ./bin/emqttd_ctl cluster join emqttd@host1 + $ ./bin/emqttd_ctl cluster join emqttd@s1.emqtt.io Join the cluster successfully. - Cluster status: [{running_nodes,['emqttd@host1','emqttd@host2']}] + Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}] -Or 'cluster join' on emqttd@host1:: +Or 'cluster join' on emqttd@s1.emqtt.io:: - $ ./bin/emqttd_ctl cluster join emqttd@host2 + $ ./bin/emqttd_ctl cluster join emqttd@s2.emqtt.io Join the cluster successfully. - Cluster status: [{running_nodes,['emqttd@host1','emqttd@host2']}] + Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}] Query the cluster status:: $ ./bin/emqttd_ctl cluster status - Cluster status: [{running_nodes,['emqttd@host1','emqttd@host2']}] + Cluster status: [{running_nodes,['emqttd@s1.emqtt.io','emqttd@s2.emqtt.io']}] Leave the cluster ----------------- @@ -210,14 +214,13 @@ Two ways to leave the cluster: 2. remove: remove other nodes from the cluster -emqttd@host2 node tries to leave the cluster:: +emqttd@s2.emqtt.io node tries to leave the cluster:: $ ./bin/emqttd_ctl cluster leave -Or remove emqttd@host2 node from the cluster on emqttd@host1:: - - $ ./bin/emqttd_ctl cluster remove emqttd@host2 +Or remove emqttd@s2.emqtt.io node from the cluster on emqttd@s1.emqtt.io:: + $ ./bin/emqttd_ctl cluster remove emqttd@s2.emqtt.io -------------------- Session across Nodes @@ -242,7 +245,9 @@ The Firewall If there is a firewall between clustered nodes, the cluster requires to open 4369 port used by epmd daemon, and a port segment for nodes' communication. -Configure the port segment in etc/emqttd.config, for example:: +Configure the port segment in etc/emqttd.config, for example: + +.. code-block:: erlang [{kernel, [ ... @@ -268,5 +273,3 @@ Consistent Hash and DHT ----------------------- Consistent Hash and DHT are popular in the design of NoSQL databases. Cluster of emqttd broker could support 10 million size of global routing table now. We could use the Consistent Hash or DHT to partition the routing table, and evolve the cluster to larger size. - - diff --git a/docs/source/commands.rst b/docs/source/commands.rst index 27242ca15..3aff69d5b 100644 --- a/docs/source/commands.rst +++ b/docs/source/commands.rst @@ -444,7 +444,7 @@ List, load or unload plugins of emqttd broker. +---------------------------+-------------------------+ | plugins load | Load Plugin | +---------------------------+-------------------------+ -| plugins unload | Unload (Plugin) | +| plugins unload | Unload (Plugin) | +---------------------------+-------------------------+ plugins list @@ -746,7 +746,7 @@ listener parameters: | current_clients | Count of current clients | +-----------------+--------------------------------------+ | shutdown_count | Statistics of client shutdown reason | -+-----------------+---------------------------------------+ ++----------------+---------------------------------------+ .. _command_mnesia:: diff --git a/docs/source/config.rst b/docs/source/config.rst index db056e2ef..d964205d0 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -30,10 +30,13 @@ etc/vm.args Configure and Optimize Erlang VM:: ##------------------------------------------------------------------------- - ## Name of the node + ## Name of the node: Name@Host ##------------------------------------------------------------------------- -name emqttd@127.0.0.1 + # or + #-name emqttd@localhost. + ## Cookie for distributed erlang -setcookie emqttdsecretcookie @@ -86,7 +89,7 @@ The two most important parameters in etc/vm.args: +-------+---------------------------------------------------------------------------+ | +P | Max number of Erlang proccesses. A MQTT client consumes two proccesses. | -| | The value should be larger than max_clients * 2 | +| | The value should be larger than max_clients * 2 | +-------+---------------------------------------------------------------------------+ | +Q | Max number of Erlang Ports. A MQTT client consumes one port. | | | The value should be larger than max_clients. | @@ -110,7 +113,7 @@ File Syntax The file users the standard Erlang config syntax, consists of a list of erlang applications and their environments. -.. code:: erlang +.. code-block:: erlang [{kernel, [ {start_timer, true}, @@ -136,13 +139,17 @@ The file adopts Erlang Term Syntax: Log Level and File ------------------ -Logger of emqttd broker is implemented by 'lager' application:: +Logger of emqttd broker is implemented by 'lager' application: + +.. code-block:: erlang {lager, [ ... ]}, -Configure log handlers:: +Configure log handlers: + +.. code-block:: erlang {handlers, [ {lager_console_backend, info}, @@ -169,7 +176,9 @@ Configure log handlers:: emqttd Application ------------------ -The MQTT broker is implemented by erlang 'emqttd' application:: +The MQTT broker is implemented by erlang 'emqttd' application: + +.. code-block:: erlang {emqttd, [ %% Authentication and Authorization @@ -208,14 +217,16 @@ Pluggable Authentication The emqttd broker supports pluggable authentication mechanism with a list of modules and plugins. -The broker provides Username, ClientId, LDAP and anonymous authentication modules by default:: +The broker provides Username, ClientId, LDAP and anonymous authentication modules by default: + +.. code-block:: erlang %% Authetication. Anonymous Default {auth, [ %% Authentication with username, password %% Add users: ./bin/emqttd_ctl users add Username Password %% {username, [{"test", "public"}]}, - + %% Authentication with clientid % {clientid, [{password, no}, {file, "etc/clients.config"}]}, @@ -235,7 +246,7 @@ The broker provides Username, ClientId, LDAP and anonymous authentication module {anonymous, []} ]}, -The modules enabled at the same time compose an authentication chain: +The modules enabled at the same time compose an authentication chain:: ---------------- ---------------- ------------- Client --> | Username | -ignore-> | ClientID | -ignore-> | Anonymous | @@ -243,13 +254,13 @@ The modules enabled at the same time compose an authentication chain: | | | \|/ \|/ \|/ allow | deny allow | deny allow | deny - + .. NOTE:: There are also MySQL、PostgreSQL、Redis、MongoDB Authentication Plugins. Username Authentication ....................... -.. code:: erlang +.. code-block:: erlang {username, [{client1, "passwd1"}, {client2, "passwd2"}]}, @@ -266,7 +277,7 @@ Two ways to configure users: ClientID Authentication ....................... -.. code:: erlang +.. code-block:: erlang {clientid, [{password, no}, {file, "etc/clients.config"}]}, @@ -279,7 +290,7 @@ Configure ClientIDs in etc/clients.config:: LDAP Authentication ................... -.. code:: erlang +.. code-block:: erlang {ldap, [ {servers, ["localhost"]}, @@ -304,7 +315,9 @@ Allow any client to connect to the broker:: ACL --- -Enable the default ACL module:: +Enable the default ACL module: + +.. code-block:: erlang {acl, [ %% Internal ACL module @@ -314,7 +327,7 @@ Enable the default ACL module:: MQTT Packet and ClientID ------------------------ -.. code:: +.. code-block:: erlang {packet, [ @@ -328,7 +341,7 @@ MQTT Packet and ClientID MQTT Client Idle Timeout ------------------------ -.. code:: +.. code-block:: erlang {client, [ %% Socket is connected, but no 'CONNECT' packet received @@ -338,7 +351,7 @@ MQTT Client Idle Timeout MQTT Session ------------ -.. code:: +.. code-block:: erlang {session, [ %% Max number of QoS 1 and 2 messages that can be “in flight” at one time. @@ -388,7 +401,9 @@ The message queue of session stores: 2. Pending messages for inflight window is full -Queue parameters:: +Queue parameters: + +.. code-block:: erlang {queue, [ %% simple | priority @@ -428,7 +443,7 @@ Queue parameters:: Sys Interval of Broker ----------------------- -.. code:: +.. code-block:: erlang %% System interval of publishing $SYS messages {sys_interval, 60}, @@ -436,7 +451,7 @@ Sys Interval of Broker Retained messages ----------------- -.. code:: +.. code-block:: erlang {retained, [ %% Expired after seconds, never expired if 0 @@ -452,12 +467,12 @@ Retained messages PubSub and Router ----------------- -.. code:: erlang +.. code-block:: erlang {pubsub, [ %% PubSub Pool {pool_size, 8}, - + %% Subscription: true | false {subscription, true}, @@ -468,7 +483,7 @@ PubSub and Router Bridge Parameters ----------------- -.. code:: erlang +.. code-block:: erlang {bridge, [ %% Bridge Queue Size @@ -484,9 +499,11 @@ Enable Modules 'presence' module will publish presence message to $SYS topic when a client connected or disconnected:: - {presence, [{qos, 0}]}, + {presence, [{qos, 0}]}, -'subscription' module forces the client to subscribe some topics when connected to the broker:: +'subscription' module forces the client to subscribe some topics when connected to the broker: + +.. code-block:: erlang %% Subscribe topics automatically when client connected {subscription, [ @@ -500,7 +517,9 @@ Enable Modules {"$Q/client/$c", 1} ]} -'rewrite' module supports to rewrite the topic path:: +'rewrite' module supports to rewrite the topic path: + +.. code-block:: erlang %% Rewrite rules {rewrite, [{file, "etc/rewrite.config"}]} @@ -508,7 +527,7 @@ Enable Modules Plugins Folder -------------- -.. code:: erlang +.. code-block:: erlang {plugins, [ %% Plugin App Library Dir @@ -536,7 +555,7 @@ The TCP Ports occupied by emqttd broker by default: | 8083 | MQTT(WebSocket), HTTP API Port | +-----------+-----------------------------------+ -.. code:: erlang +.. code-block:: erlang {listeners, [ @@ -641,7 +660,9 @@ Listener Parameters: etc/acl.config -------------- -The 'etc/acl.config' is the default ACL config for emqttd broker. The rules by default:: +The 'etc/acl.config' is the default ACL config for emqttd broker. The rules by default: + +.. code-block:: erlang %% Allow 'dashboard' to subscribe '$SYS/#' {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. @@ -657,21 +678,23 @@ The 'etc/acl.config' is the default ACL config for emqttd broker. The rules by d An ACL rule is an Erlang tuple. The Access control module of emqttd broker matches the rule one by one from top to bottom:: - --------- --------- --------- + --------- --------- --------- Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | --> Default --------- --------- --------- | | | match match match \|/ \|/ \|/ allow | deny allow | deny allow | deny - + .. _config_rewrite: ------------------ etc/clients.config ------------------ -Enable ClientId Authentication in 'etc/emqttd.config':: +Enable ClientId Authentication in 'etc/emqttd.config': + +.. code-block:: erlang {auth, [ %% Authentication with clientid @@ -688,7 +711,9 @@ Configure all allowed ClientIDs, IP Addresses in etc/clients.config:: etc/rewrite.config ------------------ -The Rewrite Rules for emqttd_mod_rewrite:: +The Rewrite Rules for emqttd_mod_rewrite: + +.. code-block:: erlang {topic, "x/#", [ {rewrite, "^x/y/(.+)$", "z/y/$1"}, @@ -698,4 +723,3 @@ The Rewrite Rules for emqttd_mod_rewrite:: {topic, "y/+/z/#", [ {rewrite, "^y/(.+)/z/(.+)$", "y/z/$2"} ]}. - diff --git a/docs/source/design.rst b/docs/source/design.rst index 6e57d69d8..a9d081cc2 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -32,19 +32,19 @@ System Layers ------------- 1. Connection Layer - + Handle TCP and WebSocket connections, encode/decode MQTT packets. 2. Session Layer - + Process MQTT PUBLISH/SUBSCRIBE Packets received from client, and deliver MQTT messages to client. - + 3. PubSub Layer - + Dispatch MQTT messages to subscribers in a node. 4. Routing(Distributed) Layer - + Route MQTT messages among clustered nodes. ---------------- @@ -93,7 +93,7 @@ A MQTT session will store the subscriptions and inflight messages in memory: 1. The Client’s subscriptions. -2. Inflight qos1/2 messages sent to the client but unacked, QoS 2 messages which +2. Inflight qos1/2 messages sent to the client but unacked, QoS 2 messages which have been sent to the Client, but have not been completely acknowledged. 3. Inflight qos2 messages received from client and waiting for PUBREL. QoS 2 @@ -191,7 +191,9 @@ Authentication and ACL The emqttd broker supports an extensible authentication/ACL mechanism, which is implemented by emqttd_access_control, emqttd_auth_mod and emqttd_acl_mod modules. -emqttd_access_control module provides two APIs that help register/unregister auth or ACL module:: +emqttd_access_control module provides two APIs that help register/unregister auth or ACL module: + +.. code-block:: erlang register_mod(auth | acl, atom(), list()) -> ok | {error, any()}. @@ -200,7 +202,9 @@ emqttd_access_control module provides two APIs that help register/unregister aut Authentication Bahaviour ------------------------- -The emqttd_auth_mod defines an Erlang behaviour for authentication module:: +The emqttd_auth_mod defines an Erlang behaviour for authentication module: + +.. code-block:: erlang -module(emqttd_auth_mod). @@ -243,7 +247,9 @@ The authentication modules implemented by default: Authorization(ACL) ------------------ -The emqttd_acl_mod defines an Erlang behavihour for ACL module:: +The emqttd_acl_mod defines an Erlang behavihour for ACL module: + +.. code-block:: erlang -module(emqttd_acl_mod). @@ -273,7 +279,9 @@ The emqttd_acl_mod defines an Erlang behavihour for ACL module:: -endif. -emqttd_acl_internal implements the default ACL based on etc/acl.config file:: +emqttd_acl_internal implements the default ACL based on etc/acl.config file: + +.. code-block:: erlang %%%----------------------------------------------------------------------------- %%% @@ -360,7 +368,7 @@ Hook Implementation The hook APIs defined in emqttd module: -.. code:: erlang +.. code-block:: erlang -module(emqttd). @@ -376,7 +384,7 @@ The hook APIs defined in emqttd module: And implemented in emqttd_hook module: -.. code:: erlang +.. code-block:: erlang -module(emqttd_hook). @@ -398,12 +406,12 @@ Hook Usage The `emqttd_plugin_template`_ project provides the examples for hook usage: -.. code:: erlang +.. code-block:: erlang -module(emqttd_plugin_template). -export([load/1, unload/0]). - + -export([on_message_publish/2, on_message_delivered/3, on_message_acked/3]). load(Env) -> @@ -439,7 +447,9 @@ Plugin is a normal erlang application that can be started/stopped dynamically by emqttd_plugins Module --------------------- -The plugin mechanism is implemented by emqttd_plugins module:: +The plugin mechanism is implemented by emqttd_plugins module: + +.. code-block:: erlang -module(emqttd_plugins). @@ -468,4 +478,3 @@ http://github.com/emqtt/emqttd_plugin_template .. _eSockd: https://github.com/emqtt/esockd .. _Chain-of-responsibility_pattern: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern .. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template/blob/master/src/emqttd_plugin_template.erl - diff --git a/docs/source/getstarted.rst b/docs/source/getstarted.rst index 37e608d8b..e22f853d0 100644 --- a/docs/source/getstarted.rst +++ b/docs/source/getstarted.rst @@ -61,7 +61,7 @@ Download binary package from: http://emqtt.io/downloads. Installing on Mac, for example: -.. code:: console +.. code-block:: bash unzip emqttd-macosx-0.16.0-beta-20160216.zip && cd emqttd @@ -79,7 +79,7 @@ Installing from Source .. NOTE:: emqttd broker requires Erlang R17+ to build. -.. code:: console +.. code-block:: bash git clone https://github.com/emqtt/emqttd.git @@ -132,7 +132,9 @@ Modules Configure the 'auth', 'module' paragraph in 'etc/emqttd.config' to enable a module. -Enable 'emqttd_auth_username' module:: +Enable 'emqttd_auth_username' module: + +.. code-block:: erlang {access, [ %% Authetication. Anonymous Default @@ -142,7 +144,9 @@ Enable 'emqttd_auth_username' module:: ... -Enable 'emqttd_mod_presence' module:: +Enable 'emqttd_mod_presence' module: + +.. code-block:: erlang {modules, [ %% Client presence management module. @@ -195,7 +199,7 @@ We need tune the OS Kernel, TCP Stack, Erlang VM and emqttd broker for one milli Linux Kernel Parameters ----------------------- -.. code:: +.. code-block:: bash # 2M: sysctl -w fs.file-max=2097152 @@ -208,7 +212,7 @@ Linux Kernel Parameters TCP Stack Parameters -------------------- -.. code:: +.. code-block:: bash # backlog sysctl -w net.core.somaxconn=65536 @@ -232,7 +236,9 @@ emqttd/etc/vm.args:: emqttd broker ------------- -emqttd/etc/emqttd.config:: +emqttd/etc/emqttd.config: + +.. code-block:: erlang {mqtt, 1883, [ %% Size of acceptor pool @@ -254,7 +260,7 @@ emqttd/etc/emqttd.config:: Test Client ----------- -.. code:: +.. code-block:: bash sysctl -w net.ipv4.ip_local_port_range="500 65535" echo 1000000 > /proc/sys/fs/nr_open @@ -290,4 +296,3 @@ GitHub: https://github.com/emqtt .. _emqttd_stomp: https://github.com/emqtt/emqttd_stomp .. _emqttd_sockjs: https://github.com/emqtt/emqttd_sockjs .. _emqttd_recon: https://github.com/emqtt/emqttd_recon - diff --git a/docs/source/guide.rst b/docs/source/guide.rst index bc7963f1e..c732734f4 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -13,7 +13,9 @@ The emqttd broker supports to authenticate MQTT clients with ClientID, Username/ The authentication is provided by a list of extended modules, or MySQL, PostgreSQL and Redis Plugins. -Enable an authentication module in etc/emqttd.config:: +Enable an authentication module in etc/emqttd.config: + +.. code-block:: erlang %% Authentication and Authorization {access, [ @@ -21,7 +23,7 @@ Enable an authentication module in etc/emqttd.config:: {auth, [ %% Authentication with username, password %{username, []}, - + %% Authentication with clientid %{clientid, [{password, no}, {file, "etc/clients.config"}]}, @@ -86,7 +88,7 @@ Two ways to add users: ClientId -------- -.. code:: erlang +.. code-block:: erlang {clientid, [{password, no}, {file, "etc/clients.config"}]}, @@ -99,7 +101,7 @@ Configure ClientIDs in etc/clients.config:: LDAP ---- -.. code:: erlang +.. code-block:: erlang {ldap, [ {servers, ["localhost"]}, @@ -122,7 +124,9 @@ Allow any client to connect to the broker:: MySQL ----- -Authenticate against MySQL database. Support we create a mqtt_user table:: +Authenticate against MySQL database. Support we create a mqtt_user table: + +.. code-block:: sql CREATE TABLE `mqtt_user` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, @@ -134,7 +138,9 @@ Authenticate against MySQL database. Support we create a mqtt_user table:: UNIQUE KEY `mqtt_username` (`username`) ) ENGINE=MyISAM DEFAULT CHARSET=utf8; -Configure the 'authquery' and 'password_hash' in emqttd_plugin_mysql/etc/plugin.config:: +Configure the 'authquery' and 'password_hash' in emqttd_plugin_mysql/etc/plugin.config: + +.. code-block:: erlang [ @@ -161,7 +167,9 @@ Load the plugin:: PostgreSQL ---------- -Authenticate against PostgreSQL database. Create a mqtt_user table:: +Authenticate against PostgreSQL database. Create a mqtt_user table: + +.. code-block:: sql CREATE TABLE mqtt_user ( id SERIAL primary key, @@ -170,7 +178,9 @@ Authenticate against PostgreSQL database. Create a mqtt_user table:: salt character varying(40) ); -Configure the 'authquery' and 'password_hash' in emqttd_plugin_pgsql/etc/plugin.config:: +Configure the 'authquery' and 'password_hash' in emqttd_plugin_pgsql/etc/plugin.config: + +.. code-block:: erlang [ @@ -183,7 +193,7 @@ Configure the 'authquery' and 'password_hash' in emqttd_plugin_pgsql/etc/plugin. %% hash algorithm: md5, sha, sha256, pbkdf2? {password_hash, sha256}, - + ... ]} @@ -198,7 +208,9 @@ Redis Authenticate against Redis. MQTT users could be stored in redis HASH, the key is "mqtt_user:". -Configure 'authcmd' and 'password_hash' in emqttd_plugin_redis/etc/plugin.config:: +Configure 'authcmd' and 'password_hash' in emqttd_plugin_redis/etc/plugin.config: + +.. code-block:: erlang [ {emqttd_plugin_redis, [ @@ -232,7 +244,7 @@ The ACL rules define:: Access Control Module of emqttd broker will match the rules one by one:: - --------- --------- --------- + --------- --------- --------- Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | --> Default --------- --------- --------- | | | @@ -245,14 +257,18 @@ Internal The default ACL of emqttd broker is implemented by an 'internal' module. -Enable the 'internal' ACL module in etc/emqttd.config:: +Enable the 'internal' ACL module in etc/emqttd.config: + +.. code-block:: erlang {acl, [ %% Internal ACL module {internal, [{file, "etc/acl.config"}, {nomatch, allow}]} ]} -The ACL rules of 'internal' module are defined in 'etc/acl.config' file:: +The ACL rules of 'internal' module are defined in 'etc/acl.config' file: + +.. code-block:: erlang %% Allow 'dashboard' to subscribe '$SYS/#' {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. @@ -269,7 +285,9 @@ The ACL rules of 'internal' module are defined in 'etc/acl.config' file:: MySQL ----- -ACL against MySQL database. The mqtt_acl table and default data:: +ACL against MySQL database. The mqtt_acl table and default data: + +.. code-block:: sql CREATE TABLE `mqtt_acl` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, @@ -291,7 +309,9 @@ ACL against MySQL database. The mqtt_acl table and default data:: (6,1,'127.0.0.1',NULL,NULL,2,'#'), (7,1,NULL,'dashboard',NULL,1,'$SYS/#'); -Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_mysql/etc/plugin.config:: +Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_mysql/etc/plugin.config: + +.. code-block:: erlang [ @@ -311,7 +331,9 @@ Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_mysql/etc/plugin.config: PostgreSQL ---------- -ACL against PostgreSQL database. The mqtt_acl table and default data:: +ACL against PostgreSQL database. The mqtt_acl table and default data: + +.. code-block:: sql CREATE TABLE mqtt_acl ( id SERIAL primary key, @@ -332,7 +354,9 @@ ACL against PostgreSQL database. The mqtt_acl table and default data:: (6,1,'127.0.0.1',NULL,NULL,2,'#'), (7,1,NULL,'dashboard',NULL,1,'$SYS/#'); -Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_pgsql/etc/plugin.config:: +Configure 'aclquery' and 'acl_nomatch' in emqttd_plugin_pgsql/etc/plugin.config: + +.. code-block:: erlang [ @@ -357,7 +381,9 @@ Redis ACL against Redis. We store ACL rules for each MQTT client in a Redis List by defualt. The key is "mqtt_acl:", the value is a list of "publish ", "subscribe " or "pubsub ". -Configure 'aclcmd' and 'acl_nomatch' in emqttd_plugin_redis/etc/plugin.config:: +Configure 'aclcmd' and 'acl_nomatch' in emqttd_plugin_redis/etc/plugin.config: + +.. code-block:: erlang [ {emqttd_plugin_redis, [ @@ -394,7 +420,9 @@ For example, we use mosquitto_sub/pub commands:: MQTT V3.1.1 Protocol Specification: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html -MQTT Listener of emqttd broker is configured in etc/emqttd.config:: +MQTT Listener of emqttd broker is configured in etc/emqttd.config: + +.. code-block:: erlang {mqtt, 1883, [ %% Size of acceptor pool @@ -423,7 +451,9 @@ MQTT Listener of emqttd broker is configured in etc/emqttd.config:: ]} ]}, -MQTT(SSL) Listener, Default Port is 8883:: +MQTT(SSL) Listener, Default Port is 8883: + +.. code-block:: erlang {mqtts, 8883, [ %% Size of acceptor pool @@ -492,7 +522,9 @@ The Dashboard plugin provides a test page for WebSocket:: http://127.0.0.1:18083/websocket.html -Listener of WebSocket and HTTP Publish API is configured in etc/emqttd.config:: +Listener of WebSocket and HTTP Publish API is configured in etc/emqttd.config: + +.. code-block:: erlang %% HTTP and WebSocket Listener {http, 8083, [ @@ -525,7 +557,7 @@ For emqttd broker is clustered, the $SYS topic path is started with:: $SYS/brokers/emqttd@host2/uptime -.. NOTE:: The broker only allows clients from localhost to subscribe $SYS topics by default. +.. NOTE:: The broker only allows clients from localhost to subscribe $SYS topics by default. Sys Interval of publishing $SYS messages, could be configured in etc/emqttd.config:: @@ -569,11 +601,11 @@ The topic path started with: $SYS/brokers/${node}/clients/ Properties of 'connected' Payload:: - ipaddress: "127.0.0.1", - username: "test", - session: false, - protocol: 3, - connack: 0, + ipaddress: "127.0.0.1", + username: "test", + session: false, + protocol: 3, + connack: 0, ts: 1432648482 Properties of 'disconnected' Payload:: @@ -614,7 +646,7 @@ Subscriptions +---------------------+---------------------------------------------+ | Topic | Description | +---------------------+---------------------------------------------+ -| subscriptions/count | Count of current subscriptions | +| subscriptions/count | Count of current subscriptions | +---------------------+---------------------------------------------+ | subscriptions/max | Max number of subscriptions | +---------------------+---------------------------------------------+ @@ -756,4 +788,3 @@ Stop a Trace:: .. _emqttd_plugin_mysql: https://github.com/emqtt/emqttd_plugin_mysql .. _emqttd_plugin_pgsql: https://github.com/emqtt/emqttd_plugin_pgsql .. _emqttd_plugin_redis: https://github.com/emqtt/emqttd_plugin_redis - diff --git a/docs/source/install.rst b/docs/source/install.rst index b03e12f73..9939d0c07 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -45,19 +45,19 @@ Installing on Linux Download CentOS Package from: http://emqtt.io/downloads/centos, and then unzip: -.. code:: console +.. code-block:: bash unzip emqttd-centos64-0.16.0-beta-20160216.zip -Start the broker in console mode:: +Start the broker in console mode: -.. code:: console +.. code-block:: bash cd emqttd && ./bin/emqttd console If the broker is started successfully, console will print: -.. code:: console +.. code-block:: bash starting emqttd on node 'emqttd@127.0.0.1' emqttd ctl is starting...[done] @@ -88,7 +88,7 @@ CTRL+C to close the console and stop the broker. Start the broker in daemon mode: -.. code:: console +.. code-block:: bash ./bin/emqttd start @@ -96,7 +96,7 @@ The boot logs in log/emqttd_sasl.log file. Check the running status of the broker: -.. code:: console +.. code-block:: bash $ ./bin/emqttd_ctl status Node 'emqttd@127.0.0.1' is started @@ -132,7 +132,7 @@ Download Mac Package from: http://emqtt.io/downloads/macosx Configure 'lager' log level in 'etc/emqttd.config', all MQTT messages recevied/sent will be printed on console: -.. code:: erlang +.. code-block:: erlang {lager, [ ... @@ -196,7 +196,7 @@ Could use apt-get on Ubuntu, yum on CentOS/RedHat and brew on Mac to install Erl When all dependencies are ready, clone the emqttd project from github.com and build: -.. code:: console +.. code-block:: bash git clone https://github.com/emqtt/emqttd.git @@ -226,7 +226,7 @@ TCP Ports Used The TCP ports used can be configured in etc/emqttd.config: -.. code:: erlang +.. code-block:: erlang {listeners, [ {mqtt, 1883, [ @@ -275,7 +275,7 @@ Two important parameters in etc/vm.args: The maximum number of allowed MQTT clients: -.. code:: erlang +.. code-block:: erlang {listeners, [ {mqtt, 1883, [ @@ -295,7 +295,7 @@ The maximum number of allowed MQTT clients: /etc/init.d/emqttd ------------------- -.. code:: shell +.. code-block:: bash #!/bin/sh # diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 286ad1940..7d42a9514 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -75,7 +75,9 @@ The Web Dashboard for emqttd broker. The plugin will be loaded automatically whe Configure Dashboard ------------------- -emqttd_dashboard/etc/plugin.config:: +emqttd_dashboard/etc/plugin.config: + +.. code-block:: erlang [ {emqttd_dashboard, [ @@ -100,7 +102,7 @@ MQTT Authentication, ACL with MySQL database. MQTT User Table --------------- -.. code:: sql +.. code-block:: sql CREATE TABLE `mqtt_user` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, @@ -115,7 +117,7 @@ MQTT User Table MQTT ACL Table -------------- -.. code:: sql +.. code-block:: sql CREATE TABLE `mqtt_acl` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, @@ -132,7 +134,9 @@ MQTT ACL Table Configure emqttd_plugin_mysql/etc/plugin.config ----------------------------------------------- -Configure MySQL host, username, password and database:: +Configure MySQL host, username, password and database: + +.. code-block:: erlang [ @@ -178,7 +182,7 @@ Configure MySQL host, username, password and database:: Load emqttd_plugin_mysql plugin ------------------------------- -.. code:: +.. code-block:: bash ./bin/emqttd_ctl plugins load emqttd_plugin_mysql @@ -191,7 +195,7 @@ MQTT Authentication, ACL with PostgreSQL Database. MQTT User Table --------------- -.. code:: sql +.. code-block:: sql CREATE TABLE mqtt_user ( id SERIAL primary key, @@ -203,7 +207,7 @@ MQTT User Table MQTT ACL Table -------------- -.. code:: sql +.. code-block:: sql CREATE TABLE mqtt_acl ( id SERIAL primary key, @@ -227,7 +231,9 @@ MQTT ACL Table Configure emqttd_plugin_pgsql/etc/plugin.config ----------------------------------------------- -Configure host, username, password and database of PostgreSQL:: +Configure host, username, password and database of PostgreSQL: + +.. code-block:: erlang [ @@ -274,7 +280,7 @@ Configure host, username, password and database of PostgreSQL:: Load emqttd_plugin_pgsql Plugin ------------------------------- -.. code:: shell +.. code-block:: bash ./bin/emqttd_ctl plugins load emqttd_plugin_pgsql @@ -287,7 +293,7 @@ MQTT Authentication, ACL with Redis. Configure emqttd_plugin_redis/etc/plugin.config ----------------------------------------------- -.. code:: erlang +.. code-block:: erlang [ {emqttd_plugin_redis, [ @@ -331,7 +337,7 @@ Configure emqttd_plugin_redis/etc/plugin.config Load emqttd_plugin_redis Plugin ------------------------------- -.. code:: console +.. code-block:: bash ./bin/emqttd_ctl plugins load emqttd_plugin_redis @@ -346,7 +352,7 @@ Configure emqttd_stomp/etc/plugin.config .. NOTE:: Default Port for STOMP Protocol: 61613 -.. code:: erlang +.. code-block:: erlang [ {emqttd_stomp, [ @@ -378,7 +384,7 @@ Configure emqttd_stomp/etc/plugin.config Load emqttd_stomp Plugin ------------------------ -.. code:: +.. code-block:: bash ./bin/emqttd_ctl plugins load emqttd_stomp @@ -394,7 +400,7 @@ emqttd_sockjs plugin enables web browser to connect to emqttd broker and communi Configure emqttd_sockjs ----------------------- -.. code:: erlang +.. code-block:: erlang [ {emqttd_sockjs, [ @@ -411,7 +417,7 @@ Load emqttd_sockjs Plugin .. NOTE:: emqttd_stomp Plugin required. -.. code:: console +.. code-block:: bash ./bin/emqttd_ctl plugins load emqttd_stomp @@ -431,14 +437,14 @@ The plugin loads `recon`_ library on a running emqttd broker. Recon libray helps Load emqttd_recon Plugin ------------------------ -.. code:: console +.. code-block:: bash ./bin/emqttd_ctl plugins load emqttd_recon Recon CLI --------- -.. code:: console +.. code-block:: bash ./bin/emqttd_ctl recon @@ -473,7 +479,7 @@ Register Auth/ACL Modules emqttd_auth_demo.erl - demo authentication module: -.. code:: erlang +.. code-block:: erlang -module(emqttd_auth_demo). @@ -494,7 +500,7 @@ emqttd_auth_demo.erl - demo authentication module: emqttd_acl_demo.erl - demo ACL module: -.. code:: erlang +.. code-block:: erlang -module(emqttd_acl_demo). @@ -517,7 +523,7 @@ emqttd_acl_demo.erl - demo ACL module: emqttd_plugin_template_app.erl - Register the auth/ACL modules: -.. code:: erlang +.. code-block:: erlang ok = emqttd_access_control:register_mod(auth, emqttd_auth_demo, []), ok = emqttd_access_control:register_mod(acl, emqttd_acl_demo, []), @@ -547,9 +553,11 @@ The plugin could register callbacks for hooks. The hooks will be run by the brok | message.acked | Run when a message(qos1/2) is acked | +------------------------+---------------------------------------+ | client.disconnected | Run when a client is disconnnected | -+----------------------- +---------------------------------------+ ++------------------------+---------------------------------------+ -emqttd_plugin_template.erl for example:: +emqttd_plugin_template.erl for example: + +.. code-block:: erlang %% Called when the plugin application start load(Env) -> @@ -568,7 +576,7 @@ Register CLI Modules emqttd_cli_demo.erl: -.. code:: erlang +.. code-block:: erlang -module(emqttd_cli_demo). @@ -584,7 +592,7 @@ emqttd_cli_demo.erl: emqttd_plugin_template_app.erl - register the CLI module to emqttd broker: -.. code:: erlang +.. code-block:: erlang emqttd_ctl:register_cmd(cmd, {emqttd_cli_demo, cmd}, []). @@ -602,4 +610,3 @@ There will be a new CLI after the plugin loaded:: .. _emqttd_recon: https://github.com/emqtt/emqttd_recon .. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template .. _recon: http://ferd.github.io/recon/ - diff --git a/docs/source/tune.rst b/docs/source/tune.rst index 9407eccac..8cdc3c64a 100644 --- a/docs/source/tune.rst +++ b/docs/source/tune.rst @@ -106,7 +106,9 @@ Tuning and optimize the Erlang VM in etc/vm.args file:: emqttd broker ------------- -Tune the acceptor pool, max_clients limit and sockopts for TCP listener in etc/emqttd.config:: +Tune the acceptor pool, max_clients limit and sockopts for TCP listener in etc/emqttd.config: + +.. code-block:: erlang {mqtt, 1883, [ %% Size of acceptor pool @@ -141,4 +143,3 @@ emqtt_benchmark --------------- Test tool for concurrent connections: http://github.com/emqtt/emqtt_benchmark - diff --git a/include/emqttd.hrl b/include/emqttd.hrl index cb849d246..43f9e36b4 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -125,7 +125,7 @@ sys = false :: boolean(), %% $SYS flag payload :: binary(), %% Payload timestamp :: erlang:timestamp(), %% os:timestamp - extra = [] :: list() + extra = [] :: list() }). -type mqtt_message() :: #mqtt_message{}. diff --git a/rel/files/loaded_plugins b/rel/files/loaded_plugins index 68ba6a41d..e69de29bb 100644 --- a/rel/files/loaded_plugins +++ b/rel/files/loaded_plugins @@ -1 +0,0 @@ -emqttd_dashboard. diff --git a/rel/files/vm.args b/rel/files/vm.args index 245e369c6..27c743997 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -1,7 +1,13 @@ ##------------------------------------------------------------------------- -## Name of the node +## Name of the emqttd node: Name@Host +## +## NOTICE: The Host should be IP address or the fully qualified host name. +## The short hostname cannot work! ##------------------------------------------------------------------------- + -name emqttd@127.0.0.1 +# or +#-name emqttd@localhost. ## Cookie for distributed erlang -setcookie emqttdsecretcookie @@ -34,7 +40,7 @@ ## Valid range is 1-2097151. Default is 1024. ## +zdbbl 8192 -## CPU Schedulers +## Set scheduler bind type. ## +sbt db ##------------------------------------------------------------------------- diff --git a/src/emqttd.app.src b/src/emqttd.app.src index fe6bf3dad..8e102ccc6 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "1.0"}, + {vsn, "1.0.1"}, {id, "emqttd"}, {modules, []}, {registered, []}, diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index d41bf26de..bcc68e5cf 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -368,7 +368,7 @@ vm([]) -> vm(["all"]); vm(["all"]) -> - [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; + [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]]; vm(["load"]) -> [?PRINT("cpu/~-20s: ~s~n", [L, V]) || {L, V} <- emqttd_vm:loads()]; @@ -387,12 +387,18 @@ vm(["io"]) -> ?PRINT("io/~-21s: ~w~n", [Key, get_value(Key, IoInfo)]) end, [max_fds, active_fds]); +vm(["ports"]) -> + foreach(fun({Name, Key}) -> + ?PRINT("ports/~-16s: ~w~n", [Name, erlang:system_info(Key)]) + end, [{count, port_count}, {limit, port_limit}]); + vm(_) -> - ?USAGE([{"vm all", "Show info of erlang vm"}, - {"vm load", "Show load of erlang vm"}, - {"vm memory", "Show memory of erlang vm"}, - {"vm process", "Show process of erlang vm"}, - {"vm io", "Show IO of erlang vm"}]). + ?USAGE([{"vm all", "Show info of Erlang VM"}, + {"vm load", "Show load of Erlang VM"}, + {"vm memory", "Show memory of Erlang VM"}, + {"vm process", "Show process of Erlang VM"}, + {"vm io", "Show IO of Erlang VM"}, + {"vm ports", "Show Ports of Erlang VM"}]). %%-------------------------------------------------------------------- %% @doc mnesia Command diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 3687eaa7f..f3169339a 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -79,11 +79,12 @@ init([OriginConn, MqttEnv]) -> exit({shutdown, Reason}) end, ConnName = esockd_net:format(PeerName), + Self = self(), SendFun = fun(Data) -> try Connection:async_send(Data) of true -> ok catch - error:Error -> exit({shutdown, Error}) + error:Error -> Self ! {shutdown, Error} end end, PktOpts = proplists:get_value(packet, MqttEnv), @@ -138,6 +139,10 @@ handle_cast(Msg, State) -> handle_info(timeout, State) -> shutdown(idle_timeout, State); +%% fix issue #535 +handle_info({shutdown, Error}, State) -> + shutdown(Error, State); + %% Asynchronous SUBACK handle_info({suback, PacketId, GrantedQos}, State) -> with_proto_state(fun(ProtoState) -> diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index de421cfff..5ccb6bc40 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -106,7 +106,7 @@ publish(Topic, Msg) -> %% @doc Dispatch Message to Subscribers -spec(dispatch(binary(), mqtt_message()) -> ok). -dispatch(Queue = <<"$queue/", _T>>, Msg) -> +dispatch(Queue = <<"$queue/", _Q/binary>>, Msg) -> case subscribers(Queue) of [] -> dropped(Queue); diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index badc975eb..fc65e0477 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -20,6 +20,8 @@ -include("emqttd.hrl"). +-include_lib("eunit/include/eunit.hrl"). + all() -> [{group, pubsub}, {group, router}, @@ -38,7 +40,8 @@ groups() -> create_subscription, subscribe_unsubscribe, publish, pubsub, - 'pubsub#', 'pubsub+']}, + 'pubsub#', 'pubsub+', + pubsub_queue]}, {router, [sequence], [router_add_del, router_print, @@ -71,7 +74,8 @@ groups() -> cli_subscriptions, cli_bridges, cli_plugins, - cli_listeners]}]. + cli_listeners, + cli_vm]}]. init_per_suite(Config) -> application:start(lager), @@ -99,7 +103,7 @@ create_subscription(_) -> [#mqtt_subscription{subid = <<"clientId">>, topic = <<"topic/sub">>, qos = 2}] = emqttd_backend:lookup_subscriptions(<<"clientId">>), ok = emqttd_backend:del_subscriptions(<<"clientId">>), - [] = emqttd_backend:lookup_subscriptions(<<"clientId">>). + ?assertEqual([], emqttd_backend:lookup_subscriptions(<<"clientId">>)). subscribe_unsubscribe(_) -> ok = emqttd:subscribe(<<"topic/subunsub">>), @@ -114,7 +118,7 @@ publish(_) -> ok = emqttd:subscribe(<<"test/+">>), timer:sleep(10), emqttd:publish(Msg), - true = receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end. + ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end). pubsub(_) -> Self = self(), @@ -124,7 +128,7 @@ pubsub(_) -> [{Self, <<"a/b/c">>}] = ets:lookup(subscribed, Self), [{<<"a/b/c">>, Self}] = ets:lookup(subscriber, <<"a/b/c">>), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), - true = receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end, + ?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), spawn(fun() -> emqttd:subscribe(<<"a/b/c">>), emqttd:subscribe(<<"c/d/e">>), @@ -138,16 +142,42 @@ pubsub(_) -> emqttd:subscribe(<<"a/#">>), timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), - true = receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end, + ?assert(receive {dispatch, <<"a/#">>, _} -> true after 2 -> false end), emqttd:unsubscribe(<<"a/#">>). 'pubsub+'(_) -> emqttd:subscribe(<<"a/+/+">>), timer:sleep(10), emqttd:publish(emqttd_message:make(ct, <<"a/b/c">>, <<"hello">>)), - true = receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end, + ?assert(receive {dispatch, <<"a/+/+">>, _} -> true after 1 -> false end), emqttd:unsubscribe(<<"a/+/+">>). +pubsub_queue(_) -> + Self = self(), Q = <<"$queue/abc">>, + SubFun = fun() -> + emqttd:subscribe(Q), + {ok, Msgs} = loop_recv(Q, 10), + Self ! {recv, self(), Msgs} + end, + Sub1 = spawn(SubFun), Sub2 = spawn(SubFun), + timer:sleep(5), + emqttd:publish(emqttd_message:make(ct, Q, <<"1", Q/binary>>)), + emqttd:publish(emqttd_message:make(ct, Q, <<"2", Q/binary>>)), + emqttd:publish(emqttd_message:make(ct, Q, <<"3", Q/binary>>)), + ?assert(receive {recv, Sub1, Msgs1} -> length(Msgs1) < 3 end), + ?assert(receive {recv, Sub2, Msgs2} -> length(Msgs2) < 3 end). + +loop_recv(Topic, Timeout) -> + loop_recv(Topic, Timeout, []). + +loop_recv(Topic, Timeout, Acc) -> + receive + {dispatch, Topic, Msg} -> + loop_recv(Topic, Timeout, [Msg|Acc]) + after + Timeout -> {ok, Acc} + end. + %%-------------------------------------------------------------------- %% Router Test %%-------------------------------------------------------------------- @@ -293,7 +323,7 @@ dispatch_retained_messages(_) -> payload = <<"payload">>}, emqttd_retainer:retain(Msg), emqttd_retainer:dispatch(<<"a/b/+">>, self()), - true = receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end, + ?assert(receive {dispatch, <<"a/b/+">>, Msg} -> true after 10 -> false end), emqttd_retainer:retain(#mqtt_message{retain = true, topic = <<"a/b/c">>, payload = <<>>}), [] = emqttd_backend:read_messages(<<"a/b/c">>). @@ -390,3 +420,7 @@ cli_bridges(_) -> cli_listeners(_) -> emqttd_cli:listeners([]). +cli_vm(_) -> + emqttd_cli:vm(), + emqttd_cli:vm(["ports"]). +