4.1 KiB
4.1 KiB
设计
动机
在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力:
emqx-extension-hook
提供了使用 Java, Python 向 Broker 挂载钩子的功能emqx-exproto
提供了使用 Java,Python 编写用户自定义协议接入插件的功能
但在后续的支持中发现许多难以处理的问题:
- 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。
erlport
使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。- 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。
erlport
会占用stdin
stdout
。
因此,我们计划重构这部分的实现,其中主要的内容是:
- 使用
gRPC
替换erlport
。 - 将
emqx-extension-hook
重命名为emqx-exhook
旧版本的设计参考:emqx-extension-hook design in v4.2.0
设计
架构如下:
EMQ X
+========================+ +========+==========+
| ExHook | | | |
| +----------------+ | gRPC | gRPC | User's |
| | gRPC Client | ------------------> | Server | Codes |
| +----------------+ | (HTTP/2) | | |
| | | | |
+========================+ +========+==========+
emqx-exhook
通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。
和 emqx 原生的钩子一致,emqx-exhook 也支持链式的方式计算和返回:

gRPC 服务示例
用户需要实现的方法,和数据类型的定义在 priv/protos/exhook.proto
文件中。例如,其支持的接口有:
syntax = "proto3";
package emqx.exhook.v1;
service HookProvider {
rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}
配置文件示例
## 配置 gRPC 服务地址 (HTTP)
##
## s1 为服务器的名称
exhook.server.s1.url = http://127.0.0.1:9001
## 配置 gRPC 服务地址 (HTTPS)
##
## s2 为服务器名称
exhook.server.s2.url = https://127.0.0.1:9002
exhook.server.s2.cacertfile = ca.pem
exhook.server.s2.certfile = cert.pem
exhook.server.s2.keyfile = key.pem