emqx/apps/emqx_exhook/docs/design.md

4.1 KiB
Raw Permalink Blame History

设计

动机

在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力:

  1. emqx-extension-hook 提供了使用 Java, Python 向 Broker 挂载钩子的功能
  2. emqx-exproto 提供了使用 JavaPython 编写用户自定义协议接入插件的功能

但在后续的支持中发现许多难以处理的问题:

  1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。
  2. erlport 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。
  3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。
  4. erlport 会占用 stdin stdout

因此,我们计划重构这部分的实现,其中主要的内容是:

  1. 使用 gRPC 替换 erlport
  2. emqx-extension-hook 重命名为 emqx-exhook

旧版本的设计参考:emqx-extension-hook design in v4.2.0

设计

架构如下:

  EMQ X                                    
+========================+                 +========+==========+
|    ExHook              |                 |        |          |
|   +----------------+   |      gRPC       | gRPC   |  User's  |
|   |   gRPC Client  | ------------------> | Server |  Codes   |
|   +----------------+   |    (HTTP/2)     |        |          |
|                        |                 |        |          |
+========================+                 +========+==========+

emqx-exhook 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。

和 emqx 原生的钩子一致emqx-exhook 也支持链式的方式计算和返回:

gRPC 服务示例

用户需要实现的方法,和数据类型的定义在 priv/protos/exhook.proto 文件中。例如,其支持的接口有:

syntax = "proto3";

package emqx.exhook.v1;

service HookProvider {

  rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};

  rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};

  rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};

  rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};

  rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};

  rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};

  rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};

  rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};

  rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};

  rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};

  rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};

  rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};

  rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};

  rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};

  rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};

  rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};

  rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};

  rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};

  rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};

  rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};

  rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}

配置文件示例

## 配置 gRPC 服务地址 (HTTP)
##
## s1 为服务器的名称
exhook.server.s1.url = http://127.0.0.1:9001

## 配置 gRPC 服务地址 (HTTPS)
##
## s2 为服务器名称
exhook.server.s2.url = https://127.0.0.1:9002
exhook.server.s2.cacertfile = ca.pem
exhook.server.s2.certfile = cert.pem
exhook.server.s2.keyfile = key.pem