design doc

This commit is contained in:
Feng 2016-04-02 13:58:44 +08:00
parent f6d45d81ff
commit fd15099f74
1 changed files with 149 additions and 561 deletions

View File

@ -5,65 +5,6 @@
Design Guide Design Guide
============ ============
.. _design_intro:
----------------
About emqttd 1.0
----------------
Finally, we decide to tag emqttd 1.0 after two years development. An experimental project that try to introcude MQTT protocol to SCADA system.
emqttd消息服务器1.0版本经过两年时间开发开发方式有点像摇滚乐专辑的制作最初从0.1版本一些即兴创作的部分开始,但最终在各层架构设计上做出了正确的选择,整体上体现了某种程度的正交(Orthogonality)和一致性(Consistency)。emqttd 1.0可能是目前开源领域唯一一个几乎不需要用户做太多努力就可以支持到100万连接的MQTT服务器。
C1000K Problem
--------------
多核服务器和现代操作系统内核层面可以很轻松支持100万TCP连接核心问题是应用层面如何处理业务瓶颈。
emqttd消息服务器在业务和应用层面解决了承载100万连接的各类瓶颈问题。连接测试的操作系统内核、TCP协议栈、Erlang虚拟机参数: http://docs.emqtt.cn/zh_CN/latest/tune.html
Fully Asynchronous
------------------
emqttd消息服务器是基于Erlang/OTP平台的全异步的架构异步TCP连接处理、异步主题(Topic)订阅、异步消息发布。只有在资源负载限制部分采用同步设计比如TCP连接创建和Mnesia数据库事务执行。
一条MQTT消息从发布者(Publisher)到订阅者(Subscriber)在emqttd消息服务器内部异步流过一系列Erlang进程Mailbox::
---------- ----------- ----------
Publisher --Msg-->| Client | --Msg--> | Session | --Msg--> | Client | --Msg--> Subscriber
---------- ----------- ----------
Message Persistence?
--------------------
emqttd1.0版本不支持服务器内部消息持久化这是一个架构设计选择。首先emqttd解决的核心问题是连接与路由其次我们认为内置持久化是个错误设计。
传统内置消息持久化的MQ服务器比如广泛使用的JMS服务器ActiveMQ几乎每个大版本都在重新设计持久化部分。内置消息持久化在设计上有两个问题:
1. 如何平衡内存与磁盘使用?消息路由基于内存,消息存储是基于磁盘。
2. 多服务器分布集群架构下如何放置Queue如何复制Queue的消息
Kafka在上述问题上做出了正确的设计一个完全基于磁盘分布式commit log的消息服务器。
emqttd2.0版本计划通过外部存储例如Redis、Kafka、Cassandra、PostgreSQL实现多种方式的消息持久化。
设计上分离消息路由与消息存储职责后,数据复制容灾备份甚至应用集成,可以在数据层面灵活实现。
NetSplit Problem
----------------
The emqttd broker cluster requires reliable network to avoid NetSplit.
emqttd1.0消息服务器集群基于Mnesia数据库设计。NetSplit发生时节点间状态是Erlang节点间可以连通互相询问自己是否宕机对方回答你已经宕机:(
NetSplit故障发生时emqttd消息服务器的log/emqttd_error.log日志会打印critical级别日志::
Mnesia inconsistent_database event: running_partitioned_network, emqttd@host
emqttd集群部署在同一IDC网络下NetSplit发生的几率很低一旦发生又很难自动处理。所以emqttd1.0版本设计选择是集群不自动化处理NetSplit需要人工重启部分节点。
.. _design_architecture: .. _design_architecture:
------------ ------------
@ -73,123 +14,109 @@ Architecture
Concept Model Concept Model
------------- -------------
emqttd消息服务器概念上更像一台网络路由器(Router)或交换机(Switch),而不是传统的企业级消息服务器(MQ)。相比网络路由器按IP地址或MPLS标签路由报文emqttd按主题树(Topic Trie)发布订阅模式在集群节点间路由MQTT消息: The emqttd broker 1.0 is more like a network Switch or Router, not traditional enterprise message queue. Compared to a network router that routes packets based on IP or MPLS label, the emqttd broker routes MQTT messages based on a topic trie.
.. image:: _static/images/concept.png .. image:: _static/images/concept.png
Design Principle Design Philosophy
---------------- -----------------
1. emqttd消息服务器核心解决的问题处理海量的并发MQTT连接与路由消息。 1. Focus on handling millions of MQTT connections and route MQTT messages between clustered nodes.
2. 充分利用Erlang/OTP平台软实时、低延时、高并发、分布容错的优势。 2. Embrace Erlang/OTP, The Soft-Realtime, Low-Latency, Concurrent and Fault-Tolerant platform.
3. 连接(Connection)、会话(Session)、路由(Router)、集群(Cluster)分层。 3. Connection, Session, PubSub, Router and Distributed Layers.
4. 消息路由平面(Flow Plane)与控制管理平面(Control Plane)分离。 4. Seperate the Message Flow Plane and Control/Management Plane.
5. 支持后端数据库或NoSQL实现数据持久化、容灾备份与应用集成。 5. Stream out the MQTT messages to various backends.
System Layers System Layers
------------- -------------
1. Network Layer: 负责TCP连接处理、MQTT协议编解码。 .. code::
2. Session Layer: 处理MQTT协议发布订阅消息交互流程。 -------------- ----------- ---------- ----------
Client --> | Connection | --> | Session | --> | PubSub | --> | Router |
-------------- ----------- ---------- ----------
3. PubSub Layer: 节点内路由派发MQTT消息。 1. Connection Layer:
4. Route Layer: 分布节点间路由MQTT消息。 Handle TCP and WebSocket connections, encode/decode MQTT packets.
5. Authentication and ACL: 连接层支持可扩展的认证与访问控制模块。 2. Session Layer:
6. Hooks and Plugins: 系统每层提供可扩展的钩子,支持插件方式扩展服务器。 Process MQTT PUBLISH/SUBSCRIBE Packets recevied from client, and deliver MQTT messages to client.
------------- 3. PubSub Layer:
Network Layer
-------------
连接层处理服务端Socket连接与MQTT协议编解码 Route and dispatch MQTT messages to subscribers in a node
1. 基于 `eSockd`_ 框架的异步TCP服务端 4. Route(Distributed) Layer:
2. TCP Acceptor池与异步TCP Accept
3. TCP/SSL, WebSocket/SSL连接支持 Route MQTT messages between nodes in a cluster
4. 最大并发连接数限制
5. 基于IP地址(CIDR)访问控制 ----------------
6. 基于Leaky Bucket的流控 Connection Layer
7. MQTT协议编解码 ----------------
8. MQTT协议心跳检测
9. MQTT协议报文处理 This layer is built on the `eSockd`_ library which is a general Non-blocking TCP/SSL Socket Server:
* General Non-blocking TCP/SSL Socket Server
* Acceptor Pool and Asynchronous TCP Accept * Acceptor Pool and Asynchronous TCP Accept
* Parameterized Connection Module * Parameterized Connection Module
* Max connections management * Max connections management
* Allow/Deny by peer address or CIDR * Allow/Deny by peer address or CIDR
* Keepalive Support * Keepalive Support
* Rate Limit by Leaky Bucket * Rate Limit based on The Leaky Bucket Algorithm
The_Leaky_Bucket_Algorithm * Fully Asynchronous TCP RECV/SEND
This layer is also responsible for encoding/decoding MQTT frames:
1. Parse MQTT frames received from client
2. Serialize MQTT frames sent to client
3. MQTT Connection Keepalive
Main modules of this layer:
+------------------+--------------------------+
| Module | Description |
+==================+==========================+
| emqttd_client | TCP Client |
+------------------+--------------------------+
| emqttd_ws_client | WebSocket Client |
+------------------+--------------------------+
| emqttd_protocol | MQTT Protocol Handler |
+------------------+--------------------------+
| emqttd_parser | MQTT Frame Parser |
+------------------+--------------------------+
| emqttd_serializer| MQTT Frame Serializer |
+------------------+--------------------------+
------------- -------------
Session Layer Session Layer
------------- -------------
会话层处理MQTT协议发布订阅(Publish/Subscribe)业务交互流程: The session layer processes MQTT PUBLISH/SUBSCRIBE packets received from client, and deliver PUBLISH packets to client.
1. 缓存MQTT客户端的全部订阅(Subscription)并终结订阅QoS A MQTT session will store the subscriptions and inflight messages in memory:
2. 处理Qos0/1/2消息接收与下发消息超时重传与离线消息保存 1. The Clients subscriptions.
3. 飞行窗口(Inflight Window),下发消息吞吐控制与顺序保证 2. Inflight qos1/2 messages sent to the client but unacked, QoS 2 messages which
have been sent to the Client, but have not been completely acknowledged.
4. 保存服务器发送到客户端的已发送未确认的Qos1/2消息 3. Inflight qos2 messages received from client and waiting for pubrel. QoS 2
messages which have been received from the Client, but have not been
completely acknowledged.
5. 缓存客户端发送到服务端未接收到PUBREL的QoS2消息 4. All qos1, qos2 messages published to when client has been disconnected.
6. 客户端离线时保存持久会话的离线Qos1/2消息
%% @doc Session for persistent MQTT client.
%%
%% Session State in the broker consists of:
%%
%% 1. The Clients subscriptions.
%%
%% 2. inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2
%% messages which have been sent to the Client, but have not been completely
%% acknowledged.
%%
%% 3. inflight qos2 messages received from client and waiting for pubrel. QoS 2
%% messages which have been received from the Client, but have not been
%% completely acknowledged.
%%
%% 4. all qos1, qos2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client.
%%
%% 5. Optionally, QoS 0 messages pending transmission to the Client.
%%
%% State of Message: newcome, inflight, pending
Main module of this layer is emqttd_session.
MQueue and Inflight Window MQueue and Inflight Window
--------------------------- --------------------------
%% Concept of Message Queue and Inflight Window: Concept of Message Queue and Inflight Window::
%%
%% |<----------------- Max Len ----------------->|
%% -----------------------------------------------
%% IN -> | Messages Queue | Inflight Window | -> Out
%% -----------------------------------------------
%% |<--- Win Size --->|
%%
%%
%% 1. Inflight Window to store the messages delivered and awaiting for puback.
%%
%% 2. Enqueue messages when the inflight window is full.
%%
%% 3. If the queue is full, dropped qos0 messages if store_qos0 is true,
%% otherwise dropped the oldest one.
会话层通过一个内存消息队列和飞行窗口处理下发消息::
|<----------------- Max Len ----------------->| |<----------------- Max Len ----------------->|
----------------------------------------------- -----------------------------------------------
@ -197,30 +124,34 @@ MQueue and Inflight Window
----------------------------------------------- -----------------------------------------------
|<--- Win Size --->| |<--- Win Size --->|
飞行窗口(Inflight Window)保存当前正在发送未确认的Qos1/2消息。窗口值越大吞吐越高窗口值越小消息顺序越严格。 1. Inflight Window to store the messages delivered and awaiting for puback.
当客户端离线或者飞行窗口(Inflight Window)满时消息缓存到队列。如果消息队列满先丢弃Qos0消息或最早进入队列的消息。 2. Enqueue messages when the inflight window is full.
3. If the queue is full, dropped qos0 messages if store_qos0 is true,
otherwise dropped the oldest one.
The larger the inflight window size, the higher the throughput. The smaller the window size, the more strict the message order.
PacketId and MessageId PacketId and MessageId
---------------------- ----------------------
The 16bits PacketId is defined by MQTT Protocol Specification, used by client/server to PUBLISH/PUBACK packets. A GUID(128bits global unique Id) will be generated by the borker and assigend to a MQTT message.
MQTT协议定义了一个16bits的报文ID(PacketId)用于客户端到服务器的报文收发与确认。MQTT发布报文(PUBLISH)进入消息服务器后转换为一个消息对象并分配128bits消息ID(MessageId)。 Format of the global unique messsage id::
The 128bits global unique messsage id:: +------------------------+----------------+------------+
--------------------------------------------------------
| Timestamp | NodeID + PID | Sequence | | Timestamp | NodeID + PID | Sequence |
+------------------------+----------------+------------+
|<------- 64bits ------->|<--- 48bits --->|<- 16bits ->| |<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
-------------------------------------------------------- +------------------------+----------------+------------+
1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp 1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
2. NodeId: encode node() to 2 bytes integer 2. NodeId: encode node() to 2 bytes integer
3. Pid: encode pid to 4 bytes integer 3. Pid: encode pid to 4 bytes integer
4. Sequence: 2 bytes sequence in one process 4. Sequence: 2 bytes sequence in one process
The PacketId and MessageId in a End-to-End Message PubSub Sequence::
端到端消息发布订阅(Pub/Sub)过程中发布报文ID与报文QoS终结在会话层由唯一ID标识的MQTT消息对象在节点间路由::
PktId <-- Session --> MsgId <-- Router --> MsgId <-- Session --> PktId PktId <-- Session --> MsgId <-- Router --> MsgId <-- Session --> PktId
@ -228,17 +159,17 @@ The 128bits global unique messsage id::
PubSub Layer PubSub Layer
------------ ------------
The PubSub layer would maintain a subscription table and publish MQTT messages to subscribers. The PubSub layer maintains a subscription table and responsible to dispatch MQTT messages to subscribers.
.. image:: _static/images/dispatch.png .. image:: _static/images/dispatch.png
MQTT messages will be dispatched to the subscriber's session, and finally be delivered to the client. MQTT messages will be dispatched to the subscriber's session, which finally delivers the message to client.
----------- -------------
Route Layer Routing Layer
----------- -------------
The route layer would maintain and replicate the global Topic Trie and Routing Table. The topic tire is composed of wildcard topics created by subscribers, and the Routing Table map a topic to nodes in the cluster. The routing(distributed) layer maintains and replicates the global Topic Trie and Routing Table. The topic tire is composed of wildcard topics created by subscribers, and the Routing Table map a topic to nodes in the cluster.
For example, if node1 subscribed 't/+/x' and 't/+/y', node2 subscribed 't/#' and node3 subscribed 't/a', there will be a topic trie and route table:: For example, if node1 subscribed 't/+/x' and 't/+/y', node2 subscribed 't/#' and node3 subscribed 't/a', there will be a topic trie and route table::
@ -255,36 +186,21 @@ For example, if node1 subscribed 't/+/x' and 't/+/y', node2 subscribed 't/#' and
| t/a -> node3 | | t/a -> node3 |
------------------------- -------------------------
The route layer would route MQTT messages between nodes in a cluster by topic trie match and routing table lookup. The routing layer would route MQTT messages between clustered nodes by topic trie match and routing table lookup, and follow the ruels below:
.. image:: _static/images/route.png 1. A message only gets forwarded to other cluster nodes if a cluster node is interested in it. This reduces the network traffic tremendously, because it prevents nodes from forwarding unnecessary messages.
## Cluster Design
1. One 'disc_copies' node and many 'ram_copies' nodes.
2. Topic trie tree will be copied to every clusterd node.
3. Subscribers to topic will be stored in each node and will not be copied.
## Cluster Strategy
TODO:...
1. A message only gets forwarded to other cluster nodes if a cluster node is interested in it. this reduces the network traffic tremendously, because it prevents nodes from forwarding unnecessary messages.
2. As soon as a client on a node subscribes to a topic it becomes known within the cluster. If one of the clients somewhere in the cluster is publishing to this topic, the message will be delivered to its subscriber no matter to which cluster node it is connected. 2. As soon as a client on a node subscribes to a topic it becomes known within the cluster. If one of the clients somewhere in the cluster is publishing to this topic, the message will be delivered to its subscriber no matter to which cluster node it is connected.
.. _design_auth_acl: .. _design_auth_acl:
---------------------- ----------------------
Authentication and ACL Authentication and ACL
---------------------- ----------------------
emqttd消息服务器支持可扩展的认证与访问控制由emqttd_access_control、emqttd_auth_mod和emqttd_acl_mod模块实现。 The emqttd broker supports an extensible authentication/ACL mechanism, which is implemented by emqttd_access_control, emqttd_auth_mod and emqttd_acl_mod modules.
emqttd_access_control模块提供了注册认证扩展接口:: emqttd_access_control module provides two APIs that help register/unregister auth or ACL module::
register_mod(auth | acl, atom(), list()) -> ok | {error, any()}. register_mod(auth | acl, atom(), list()) -> ok | {error, any()}.
@ -293,7 +209,7 @@ emqttd_access_control模块提供了注册认证扩展接口::
Authentication Bahavihour Authentication Bahavihour
------------------------- -------------------------
emqttd_auth_mod定义认证扩展模块Behavihour:: The emqttd_auth_mod defines an Erlang behavihour for authentication module::
-module(emqttd_auth_mod). -module(emqttd_auth_mod).
@ -319,24 +235,24 @@ emqttd_auth_mod定义认证扩展模块Behavihour::
-endif. -endif.
emqttd消息服务器自身实现的认证模块包括: The authentication modules implemented by default:
+-----------------------+--------------------------------+ +-----------------------+--------------------------------+
| 模块 | 认证方式 | | Module | Authentication |
+-----------------------+--------------------------------+ +-----------------------+--------------------------------+
| emqttd_auth_username | 用户名密码认证 | | emqttd_auth_username | Username and Password |
+-----------------------+--------------------------------+ +-----------------------+--------------------------------+
| emqttd_auth_clientid | ClientID认证 | | emqttd_auth_clientid | ClientID |
+-----------------------+--------------------------------+ +-----------------------+--------------------------------+
| emqttd_auth_ldap | LDAP认证 | | emqttd_auth_ldap | LDAP |
+-----------------------+--------------------------------+ +-----------------------+--------------------------------+
| emqttd_auth_anonymous | 匿名认证 | | emqttd_auth_anonymous | Anonymous |
+-----------------------+--------------------------------+ +-----------------------+--------------------------------+
Authorization(ACL) Authorization(ACL)
------------------ ------------------
emqttd_acl_mod模块定义访问控制Behavihour:: The emqttd_acl_mod defines an Erlang behavihour for ACL module::
-module(emqttd_acl_mod). -module(emqttd_acl_mod).
@ -366,7 +282,7 @@ emqttd_acl_mod模块定义访问控制Behavihour::
-endif. -endif.
emqttd_acl_internal模块实现缺省的基于etc/acl.config文件的访问控制:: emqttd_acl_internal implements the default ACL based on etc/acl.config file::
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% %%%
@ -400,32 +316,31 @@ emqttd_acl_internal模块实现缺省的基于etc/acl.config文件的访问控
Hooks Design Hooks Design
------------ ------------
What's Hook The emqttd broker implements a simple but powerful hooks mechanism to help users develop plugin. The broker would run the hooks when a client is connected/disconnected, a topic is subscribed/unsubscribed or a MQTT message is published/delivered/acked:
-----------
emqttd消息服务器在客户端上下线、主题订阅、消息收发位置设计了扩展钩子(Hook): Hooks defined by the emqttd 1.0 broker:
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| 钩子 | 说明 | | Hook | Description |
+========================+==================================+ +========================+======================================================+
| client.connected | 客户端上线 | | client.connected | Run when client connected to the broker successfully |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| client.subscribe | 客户端订阅主题前 | | client.subscribe | Run before client subscribes topics |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| client.subscribe.after | 客户端订阅主题后 | | client.subscribe.after | Run After client subscribed topics |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| client.unsubscribe | 客户端取消订阅主题 | | client.unsubscribe | Run when client unsubscribes topics |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| message.publish | MQTT消息发布 | | message.publish | Run when a MQTT message is published |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| message.delivered | MQTT消息送达 | | message.delivered | Run when a MQTT message is delivered |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| message.acked | MQTT消息回执 | | message.acked | Run when a MQTT message is acked |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
| client.disconnected | 客户端连接断开 | | client.disconnected | Run when client disconnnected from broker |
+------------------------+----------------------------------+ +------------------------+------------------------------------------------------+
钩子(Hook)采用职责链设计模式(`Chain-of-responsibility_pattern`_),扩展模块或插件向钩子注册回调函数,系统在客户端上下线、主题订阅或消息发布确认时,触发钩子顺序执行回调函数:: The emqttd broker uses the `Chain-of-responsibility_pattern`_ to implement hook mechanism. The callback functions registered to hook will be executed one bye one::
-------- ok | {ok, NewAcc} -------- ok | {ok, NewAcc} -------- -------- ok | {ok, NewAcc} -------- ok | {ok, NewAcc} --------
(Args, Acc) --> | Fun1 | -------------------> | Fun2 | -------------------> | Fun3 | --> {ok, Acc} | {stop, Acc} (Args, Acc) --> | Fun1 | -------------------> | Fun2 | -------------------> | Fun3 | --> {ok, Acc} | {stop, Acc}
@ -433,24 +348,26 @@ emqttd消息服务器在客户端上下线、主题订阅、消息收发位置
| | | | | |
stop | {stop, NewAcc} stop | {stop, NewAcc} stop | {stop, NewAcc} stop | {stop, NewAcc} stop | {stop, NewAcc} stop | {stop, NewAcc}
不同钩子的回调函数输入参数不同,用户可参考插件模版的 `emqttd_plugin_template`_ 模块,每个回调函数应该返回: The callback function for hook should return:
+-----------------+------------------------+ +-----------------+------------------------+
| 返回 | 说明 | | Return | Description |
+=================+========================+ +=================+========================+
| ok | 继续执行 | | ok | Continue |
+-----------------+------------------------+ +-----------------+------------------------+
| {ok, NewAcc} | 返回累积参数继续执行 | | {ok, NewAcc} | Return Acc and Continue|
+-----------------+------------------------+ +-----------------+------------------------+
| stop | 停止执行 | | stop | Break |
+-----------------+------------------------+ +-----------------+------------------------+
| {stop, NewAcc} | 返回累积参数停止执行 | | {stop, NewAcc} | Return Acc and Break |
+-----------------+------------------------+ +-----------------+------------------------+
The input arguments for a callback function is different depending on the type of hook. Clone the `emqttd_plugin_template`_ to check how to use hooks.
Hook Implementation Hook Implementation
------------------- -------------------
emqttd模块封装了Hook接口: The hook APIs defined in emqttd module:
.. code:: erlang .. code:: erlang
@ -466,7 +383,7 @@ emqttd模块封装了Hook接口:
run_hooks(Hook :: atom(), Args :: list(any()), Acc :: any()) -> {ok | stop, any()}. run_hooks(Hook :: atom(), Args :: list(any()), Acc :: any()) -> {ok | stop, any()}.
emqttd_hook模块实现Hook机制: And implemented in emqttd_hook module:
.. code:: erlang .. code:: erlang
@ -485,10 +402,10 @@ emqttd_hook模块实现Hook机制:
lookup(HookPoint :: atom()) -> [#callback{}]. lookup(HookPoint :: atom()) -> [#callback{}].
Use Hooks Hook Usage
--------- ----------
`emqttd_plugin_template`_ 提供了全部钩子的使用示例,例如端到端的消息处理回调: `emqttd_plugin_template`_ provides the examples for hook usage:
.. code:: erlang .. code:: erlang
@ -526,9 +443,12 @@ Use Hooks
Plugin Design Plugin Design
------------- -------------
插件是一个普通的Erlang应用(Application)放置在emqttd/plugins目录可以被动态加载。插件主要通过钩子(Hook)机制扩展服务器功能,或通过注册扩展模块方式集成认证访问控制。 Plugin is a normal erlang application that could be started/stopped dynamically by a running emqttd broker.
emqttd_plugins模块实现插件机制提供加载卸载插件API:: emqttd_plugins Module
---------------------
The plugin mechanism is implemented by emqttd_plugins module::
-module(emqttd_plugins). -module(emqttd_plugins).
@ -540,353 +460,21 @@ emqttd_plugins模块实现插件机制提供加载卸载插件API::
%% @doc UnLoad a Plugin %% @doc UnLoad a Plugin
unload(PluginName :: atom()) -> ok | {error, any()}. unload(PluginName :: atom()) -> ok | {error, any()}.
用户可通过'./bin/emqttd_ctl'命令行加载卸载插件:: Load a Plugin
-------------
Use './bin/emqttd_ctl' CLI to load/unload a plugin::
./bin/emqttd_ctl plugins load emqttd_plugin_redis ./bin/emqttd_ctl plugins load emqttd_plugin_redis
./bin/emqttd_ctl plugins unload emqttd_plugin_redis ./bin/emqttd_ctl plugins unload emqttd_plugin_redis
开发者请参考模版插件: http://github.com/emqtt/emqttd_plugin_template Plugin Template
.. _design_erlang:
----------
Erlang/OTP
----------
1. Pool, Pool, Pool... Use the awesome GProc libary: https://github.com/uwiger/gproc
2. 异步,异步,异步消息...连接层到路由层异步消息,同步请求用于负载保护
3. 避免进程Mailbox累积消息负载高的进程可以使用gen_server2
4. 消息流经的Socket连接、会话进程必须Hibernate主动回收binary句柄
5. 多使用Binary数据避免进程间内存复制
6. 使用ETS, ETS, ETS...Message Passing Vs ETS
7. 避免ETS表非键值字段select, match
8. 避免大量数据ETS读写, 每次ETS读写会复制内存可使用lookup_element, update_counter
9. 适当开启ETS表{write_concurrency, true}
10. 保护Mnesia数据库事务尽量减少事务数量避免事务过载(overload)
11. 避免Mnesia数据表索引和非键值字段match, select
---------------
Pubsub Sequence
--------------- ---------------
## PubSub Sequence http://github.com/emqtt/emqttd_plugin_template
### Clean Session = 1
```
title PubSub Sequence(Clean Session = 1)
ClientA-->PubSub: Publish Message
PubSub-->ClientB: Dispatch Message
```
![PubSub_CleanSess_1](http://emqtt.io/static/img/design/PubSub_CleanSess_1.png)
### Clean Session = 0
```
title PubSub Sequence(Clean Session = 0)
ClientA-->SessionA: Publish Message
SessionA-->PubSub: Publish Message
PubSub-->SessionB: Dispatch Message
SessionB-->ClientB: Dispatch Message
```
![PubSub_CleanSess_0](http://emqtt.io/static/img/design/PubSub_CleanSess_0.png)
## Qos
PubQos | SubQos | In Message | Out Message
-------|--------|------------|-------------
0 | 0 | 0 | 0
0 | 1 | 0 | 0
0 | 2 | 0 | 0
1 | 0 | 1 | 0
1 | 1 | 1 | 1
1 | 2 | 1 | 1
2 | 0 | 2 | 0
2 | 1 | 2 | 1
2 | 2 | 2 | 2
## Topic Functions Benchmark
Mac Air(11):
Function | Time(microseconds)
-------------|--------------------
match | 6.25086
triples | 13.86881
words | 3.41177
binary:split | 3.03776
iMac:
Function | Time(microseconds)
-------------|--------------------
match | 3.2348
triples | 6.93524
words | 1.89616
binary:split | 1.65243
--------------
Cluster Design
--------------
....
## Cluster Architecture
![Cluster Design](http://emqtt.io/static/img/Cluster.png)
## Cluster Command
```sh
./bin/emqttd_ctl cluster DiscNode
```
## Mnesia Example
```
(emqttd3@127.0.0.1)3> mnesia:info().
---> Processes holding locks <---
---> Processes waiting for locks <---
---> Participant transactions <---
---> Coordinator transactions <---
---> Uncertain transactions <---
---> Active tables <---
mqtt_retained : with 6 records occupying 221 words of mem
topic_subscriber: with 0 records occupying 305 words of mem
topic_trie_node: with 129 records occupying 3195 words of mem
topic_trie : with 128 records occupying 3986 words of mem
topic : with 93 records occupying 1797 words of mem
schema : with 6 records occupying 1081 words of mem
===> System info in version "4.12.4", debug level = none <===
opt_disc. Directory "/Users/erylee/Projects/emqttd/rel/emqttd3/data/mnesia" is NOT used.
use fallback at restart = false
running db nodes = ['emqttd2@127.0.0.1','emqttd@127.0.0.1','emqttd3@127.0.0.1']
stopped db nodes = []
master node tables = []
remote = []
ram_copies = [mqtt_retained,schema,topic,topic_subscriber,topic_trie,
topic_trie_node]
disc_copies = []
disc_only_copies = []
[{'emqttd2@127.0.0.1',ram_copies},
{'emqttd3@127.0.0.1',ram_copies},
{'emqttd@127.0.0.1',disc_copies}] = [schema]
[{'emqttd2@127.0.0.1',ram_copies},
{'emqttd3@127.0.0.1',ram_copies},
{'emqttd@127.0.0.1',ram_copies}] = [topic,topic_trie,topic_trie_node,
mqtt_retained]
[{'emqttd3@127.0.0.1',ram_copies}] = [topic_subscriber]
44 transactions committed, 5 aborted, 0 restarted, 0 logged to disc
0 held locks, 0 in queue; 0 local transactions, 0 remote
0 transactions waits for other nodes: []
```
## Cluster vs Bridge
Cluster will copy topic trie tree between nodes, Bridge will not.
-------------
Hooks Design
-------------
## Overview
emqttd supported a simple hooks mechanism in 0.8.0 release to extend the broker. The designed is improved in 0.9.0 release.
## API
emqttd_broker Hook API:
```
-export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]).
```
### Hook
```
-spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
hook(Hook, Name, MFA) ->
...
```
### Unhook
```
-spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
unhook(Hook, Name) ->
...
```
### Foreach Hooks
```
-spec foreach_hooks(Hook :: atom(), Args :: list()) -> any().
foreach_hooks(Hook, Args) ->
...
```
### Foldl Hooks
```
-spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any().
foldl_hooks(Hook, Args, Acc0) ->
...
```
## Hooks
Name | Type | Description
--------------- | ----------| --------------
client.connected | foreach | Run when client connected successfully
client.subscribe | foldl | Run before client subscribe topics
client.subscribe.after | foreach | Run After client subscribe topics
client.unsubscribe | foldl | Run when client unsubscribe topics
message.publish | foldl | Run when message is published
message.acked | foreach | Run when message is acked
client.disconnected | foreach | Run when client is disconnnected
## End-to-End Message Pub/Ack
Could use 'message.publish', 'message.acked' hooks to implement end-to-end message pub/ack:
```
PktId <-- --> MsgId <-- --> MsgId <-- --> PktId
|<--- Qos --->|<---PubSub--->|<-- Qos -->|
```
## Limit
The design is experimental.
-------------
Plugin Design
-------------
## Overview
**Notice that 0.11.0 release use rebar to manage plugin's deps.**
A plugin is just an erlang application that extends emqttd broker.
The plugin application should be put in "emqttd/plugins/" folder to build.
## Plugin Project
You could create a standalone plugin project outside emqttd, and then add it to "emqttd/plugins/" folder by "git submodule".
Git submodule to compile emqttd_dashboard plugin with the broker, For example:
```
git submodule add https://github.com/emqtt/emqttd_dashboard.git plugins/emqttd_dashboard
make && make dist
```
## plugin.config
**Each plugin should have a 'etc/plugin.config' file**
For example, project structure of emqttd_dashboard plugin:
```
LICENSE
README.md
ebin
etc
priv
rebar.config
src
```
etc/plugin.config for emqttd_dashboard plugin:
```
[
{emqttd_dashboard, [
{listener,
{emqttd_dashboard, 18083, [
{acceptors, 4},
{max_clients, 512}]}}
]}
].
```
## rebar.config
**Plugin should use 'rebar.config' to manage depencies**
emqttd_plugin_pgsql plugin's rebar.config, for example:
```
%% -*- erlang -*-
{deps, [
{epgsql, ".*",{git, "https://github.com/epgsql/epgsql.git", {branch, "master"}}}
]}.
```
## Build emqttd with plugins
Put all the plugins you required in 'plugins/' folder of emqttd project, and then:
```
make && make dist
```
## Load Plugin
'./bin/emqttd_ctl' to load/unload plugin, when emqttd broker started.
```
./bin/emqttd_ctl plugins load emqttd_plugin_demo
./bin/emqttd_ctl plugins unload emqttd_plugin_demo
```
## List Plugins
```
./bin/emqttd_ctl plugins list
```
## API
```
%% Load all active plugins after broker started
emqttd_plugins:load()
%% Load new plugin
emqttd_plugins:load(Name)
%% Unload all active plugins before broker stopped
emqttd_plugins:unload()
%% Unload a plugin
emqttd_plugins:unload(Name)
```
.. _eSockd: https://github.com/emqtt/esockd .. _eSockd: https://github.com/emqtt/esockd
.. _Chain-of-responsibility_pattern: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern .. _Chain-of-responsibility_pattern: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
.. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template/blob/master/src/emqttd_plugin_template.erl .. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template/blob/master/src/emqttd_plugin_template.erl