Compare commits

...

41 Commits
master ... v3

Author SHA1 Message Date
PengyuDeng ff7e796a09
doc: 完善TCP文档内容 (#16)
* doc: 完善TCP文档内容

* doc: 增加设备上报示例

* doc: 增加请求设备上线示例

* doc: 文档增加byteBuf构建示例

* doc: 使用databuffer构建报文
2024-01-24 19:22:33 +08:00
PengyuDeng 7b9a4cf128
doc: 修复TCP协议解析注释错误 (#15) 2024-01-05 11:27:33 +08:00
老周 0bf5111e31
Update pom.xml 2023-08-03 16:40:12 +08:00
老周 ec45b7c16d
Merge pull request #14 from 445990772/v3
fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示
2023-08-02 09:17:44 +08:00
李洁 fe8ef8a1c4 fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示 2023-08-01 18:04:01 +08:00
445990772 d006a81847
Merge pull request #1 from 445990772/fix-bug
fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示
2023-08-01 17:50:40 +08:00
李洁 a01497f0db fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示 2023-08-01 17:45:19 +08:00
ht-lijie 9f09f83d95 build(官方协议): 更新官方协议jar包 2023-04-04 17:30:41 +08:00
zhouhao e0dc88251e 3.0.0 2023-04-04 09:43:48 +08:00
老周 317dd7df8d
Merge pull request #10 from kyouji/v3
fix(TCP协议): 优化字符串中文乱码的问题
2023-03-09 10:22:29 +08:00
zhangji 216946bc45 fix(TCP协议): 优化字符串中文乱码的问题 2023-03-09 10:12:47 +08:00
zhouhao 28b35d86da 3.0.0 2023-02-24 11:20:48 +08:00
老周 f952f7ba45
Merge pull request #9 from kyouji/v3
优化文档链接
2023-02-13 15:58:38 +08:00
zhangji a1d408a899 优化文档链接 2023-02-13 13:52:05 +08:00
zhouhao 5d4c2c655e 优化 2023-02-03 19:06:13 +08:00
zhouhao e506623e42 update package 2023-01-10 17:26:39 +08:00
zhouhao 5ec684c578 websocket支持 2023-01-10 17:16:39 +08:00
zhouhao 57b0cb0a73 优化 2022-12-26 14:00:10 +08:00
老周 b92c9b5564
Merge pull request #8 from bestfeng1020/event-support
添加3.0协议包事件支持
2022-12-13 18:45:16 +08:00
liujq 598c3f5f5a 添加3.0协议包事件支持 2022-12-13 17:59:37 +08:00
zhouhao a5bf08ae7d update package 2022-12-06 16:08:12 +08:00
zhouhao b478566fc1 使用新的http逻辑 2022-12-06 11:18:41 +08:00
zhouhao a4876e49ce Merge remote-tracking branch 'origin/v3' into v3 2022-11-30 17:39:27 +08:00
zhouhao b2702bd1f7 new package 2022-11-30 17:38:47 +08:00
老周 50654cc65d
Update README.md 2022-11-30 17:35:50 +08:00
zhouhao 5058ddc709 优化说明 2022-11-30 16:55:38 +08:00
zhouhao c254a19ea9 优化tcp udp实现和说明 2022-11-30 16:52:30 +08:00
zhouhao ff37d0733c 时间戳大于0才设置 2022-11-24 21:27:18 +08:00
zhouhao 9a81923167 优化tcp消息 2022-11-24 18:21:52 +08:00
zhouhao 6d46169cc5 优化http处理 2022-11-02 17:45:16 +08:00
zhouhao 1227a8e6ce 重构设备数据存储 2022-08-24 17:59:10 +08:00
zhouhao 3cbbaa6643 优化 2022-08-11 18:07:23 +08:00
zhouhao cf21c84518 优化msgid 2022-08-10 09:22:11 +08:00
zhouhao b0c94bed6b 增加基本的tcp实现 2022-08-08 18:34:15 +08:00
zhouhao c93b5d11a6 增加更新固件消息回复 2022-07-19 14:29:03 +08:00
zhouhao 5e04b035ed 优化 2022-06-23 18:35:13 +08:00
zhouhao 07b7bdf16d 增加http 解析 2022-06-21 17:02:17 +08:00
zhouhao 8e0faae4db 优化 2022-06-20 17:33:07 +08:00
zhouhao 8de80583f6 升级依赖 2022-06-20 17:32:17 +08:00
zhouhao 8b2e692868 修复topic 路由错误 2022-03-27 11:58:19 +08:00
zhouhao d344dba0f1 v3 2022-03-11 09:29:34 +08:00
35 changed files with 2387 additions and 76 deletions

1
.gitignore vendored
View File

@ -25,3 +25,4 @@ hs_err_pid*
/upload /upload
/ui/upload/ /ui/upload/
!/package/** !/package/**
dependency-reduced-pom.xml

View File

@ -1,7 +1,61 @@
## JetLinks 官方设备接入协议 ## JetLinks 官方设备接入协议
类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider` JetLinks官方实现的设备接入协议,可用于参考实现自定义协议开发.
[查看说明](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) 注意: 本协议仅用于参考自定义协议开发,在实际使用中请根据不同的场景进行调整.如认证方式,加密等.
MQTT 用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) ### MQTT
[查看TOPIC说明](http://doc.jetlinks.cn/dev-guide/jetlinks-protocol-support.html)
用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/mqtt-auth-generator.html)
### HTTP
HTTP接入时需要使用`Bearer`
认证,URL和[MQTT的接入Topic]((http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html))一致.
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
### TCP
报文格式说明:
第0-4字节对应的32位整型值为接下来报文的长度,
后续为报文数据,
具体报文格式见: [二进制格式说明](binary-protocol.md)
创建连接后第一个数据包需要发送[认证包](binary-protocol.md#0x01-online-首次连接),
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
### UDP
报文格式说明:
第`0`字节表示认证类型,目前固定为0x00.
第`1-n`字节为`密钥信息`,编码使用`STRING`见: [数据类型定义](binary-protocol.md#数据类型)
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
后续为报文数据,具体报文格式见: [二进制格式说明](binary-protocol.md)
UDP无需发送认证包,但是需要每个报文中都包含密钥信息.
除了ACK以外,其他平台下发的指令也都会包含认证密钥信息,用于设备侧校验请求.
### 测试
可以使用[模拟器](http://github.com/jetlinks/device-simulator)进行模拟测试

167
binary-protocol.md Normal file
View File

@ -0,0 +1,167 @@
## 头信息
| 字节数 | 类型 | 字段 | 备注 |
| :----: | ------- | :--------------------: | ------------------- |
| n | 自定 | 消息长度 | 平台配置项 |
| 1 | INT8 | 消息类型 | 见消息类型定义 |
| 8 | INT64 | UTC时间戳 | |
| 2 | INT16 | 消息序号 | |
| 2 | INT16 | 设备ID长度 | |
| n | STRING | 设备ID | 根据设备ID长度得出 |
| n | MESSAGE | 消息类型对应的编码规则 | |
| 2 | INT16 | secureKeyLength | TCP认真配置密钥长度 |
| n | STRING | secureKey | 密钥 |
## 数据类型
所有数据类型均采用`大端`编码
| Byte | Type | 编码规则 | 备注 |
| :--: | :-----: | ------------------------------------------------------------ | ------------------------------------------------------------ |
| 0x00 | NULL | | |
| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true | |
| 0x02 | INT8 | 1字节 (byte) | |
| 0x03 | INT16 | 2字节整型 (short) | |
| 0x04 | INT32 | 4字节整型 (int) | |
| 0x05 | INT64 | 8字节整型 (long) | |
| 0x06 | UINT8 | 1字节无符号整型 | |
| 0x07 | UINT16 | 2字节无符号整型 | |
| 0x08 | UINT32 | 4字节无符号整型 | |
| 0x09 | FLOAT | 4字节 IEEE 754浮点数 | |
| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 | |
| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 | 2+N2个字节UnsignedShort表示N的长度 |
| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 | 2+N2个字节UnsignedShort表示N的长度 |
| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 | 2+N2个字节UnsignedShort表示ARRAY的长度N=很多*(1+X)1个字节UnsignedShort表示X的数据类型X表示长度见上几行。 |
| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value | 2+N2个字节UnsignedShort表示OBJECT的长度,N是STRING+数据类型的组合了(见上几行)。 |
## 消息类型定义
| Byte | Type | 说明 | Message示例 |
| :--: | :----------------- | ------------ | ------------------------------------------------------------ |
| 0x00 | keepalive | 心跳 | |
| 0x01 | online | 首次连接 | [STRING:密钥信息 ] |
| 0x02 | ack | 应答 | [应答码 ]<br />应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持. |
| 0x03 | reportProperty | 上报属性 | [属性数据:OBJECT类型 ] |
| 0x04 | readProperty | 读取属性 | [属性列表:ARRAY类型 ] |
| 0x05 | readPropertyReply | 读取属性回复 | 读取成功:[`0x01`,属性数据:OBJECT类型 ]<br />读取失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
| 0x06 | writeProperty | 修改属性 | [属性列表:OBJECT类型 ] |
| 0x07 | writePropertyReply | 修改属性回复 | 修改成功:[`0x01`,属性数据:OBJECT类型 ]<br />修改失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
| 0x08 | function | 功能调用 | [功能ID:STRING类型,功能参数:OBJECT类型 ] |
| 0x09 | functionReply | 功能调用回复 | 调用成功:[`0x01`,属性数据:OBJECT类型 ]<br />调用失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
### 备注
`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型.
如: 无错误信息
[ 0x00,`0x00`,`0x00` ]
`INT8(0x02)`类型错误码:`0x04`
[0x00,`0x02,0x04`,0x00 ]
## 示例
### 设备上线
```
000000270100000186c51a890f0001001331363531383533343133303332383934343634000561646d696e
```
```java
String hexForOnline =
"00000027" +//消息长度
"01" + //请求上线
"00000186c51a890f" +//时间戳
"0001" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0005" +//secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一使用byteBuf构建上线消息
String deviceId = "1651853413032894464";
String secureKey = "admin";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x01)
.writeLong(System.currentTimeMillis())
.writeShort(1)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf onlineBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二使用jetlinks-core构建消息
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("1");
message.setTimestamp(1678344096015L);
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
```
### 设备数据上报
```
0000006C0300000186C567FA7900020013313635313835333431333033323839343436340001000474656d700B000433362e35000561646d696e
```
```java
String hexForReport =
"0000006C" +//消息长度
"03" +//上报消息
"00000186C567FA79" + //时间戳
"0002" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0001" + //OBJECT对象数量
"0004" +//key的长度
"74656d70" + //值
"0B" + //value的类型
"0004" +//Value的长度
"33362e35" + //值
"0005" + //secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一
String deviceId = "1651853413032894464";
String secureKey = "admin";
String key = "temp";
String value = "36.5";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x03)
.writeLong(System.currentTimeMillis())
.writeShort(2)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(1)
.writeShort(key.getBytes().length)
.writeBytes(key.getBytes())
.writeByte(0x0B)
.writeShort(value.getBytes().length)
.writeBytes(value.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf reportBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("2");
message.setProperties(Collections.singletonMap("temp", 32.88));
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
```

Binary file not shown.

78
pom.xml
View File

@ -6,17 +6,17 @@
<groupId>org.jetlinks</groupId> <groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId> <artifactId>jetlinks-official-protocol</artifactId>
<version>2.0-SNAPSHOT</version> <version>3.0.0</version>
<name>JetLinks</name> <name>JetLinks</name>
<url>http://jetlinks.org</url> <url>https://jetlinks.org</url>
<inceptionYear>2019</inceptionYear> <inceptionYear>2019</inceptionYear>
<description>JetLinks 物联网平台</description> <description>JetLinks 物联网平台</description>
<licenses> <licenses>
<license> <license>
<name>The Apache License, Version 2.0</name> <name>The Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license> </license>
</licenses> </licenses>
@ -47,7 +47,7 @@
<spring.boot.version>2.2.8.RELEASE</spring.boot.version> <spring.boot.version>2.2.8.RELEASE</spring.boot.version>
<hsweb.framework.version>4.0.3</hsweb.framework.version> <hsweb.framework.version>4.0.3</hsweb.framework.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version> <hsweb.expands.version>3.0.2</hsweb.expands.version>
<reactor.version>Dysprosium-RELEASE</reactor.version> <reactor.version>2020.0.6</reactor.version>
</properties> </properties>
<profiles> <profiles>
@ -122,6 +122,52 @@
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
</profile> </profile>
<profile>
<id>all-in-one</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置主类 -->
<mainClass>org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec</mainClass>
</transformer>
</transformers>
<!-- <artifactSet>-->
<!-- <includes>-->
<!-- &lt;!&ndash; 添加需要打包在一起的第三方依赖信息 &ndash;&gt;-->
<!-- &lt;!&ndash; <include>com.domain:*</include> &ndash;&gt;-->
<!-- </includes>-->
<!-- </artifactSet>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles> </profiles>
<build> <build>
@ -158,20 +204,20 @@
<dependency> <dependency>
<groupId>org.jetlinks</groupId> <groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId> <artifactId>jetlinks-supports</artifactId>
<version>1.1.7</version> <version>1.2.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.eclipse.californium</groupId> <groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId> <artifactId>californium-core</artifactId>
<version>2.2.3</version> <version>3.5.0</version>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.12</version> <version>1.18.24</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
@ -183,14 +229,14 @@
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.13</version> <version>4.13.2</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId> <artifactId>reactor-test</artifactId>
<version>3.3.12.RELEASE</version> <version>3.4.18</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
@ -198,9 +244,15 @@
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
<version>1.2.3</version> <version>1.2.11</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.3.1</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
@ -219,12 +271,12 @@
<repository> <repository>
<id>releases</id> <id>releases</id>
<name>Nexus Release Repository</name> <name>Nexus Release Repository</name>
<url>http://nexus.hsweb.me/content/repositories/releases/</url> <url>https://nexus.jetlinks.cn/content/repositories/releases/</url>
</repository> </repository>
<snapshotRepository> <snapshotRepository>
<id>snapshots</id> <id>snapshots</id>
<name>Nexus Snapshot Repository</name> <name>Nexus Snapshot Repository</name>
<url>http://nexus.hsweb.me/content/repositories/snapshots/</url> <url>https://nexus.jetlinks.cn/content/repositories/snapshots/</url>
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
@ -233,7 +285,7 @@
<repository> <repository>
<id>hsweb-nexus</id> <id>hsweb-nexus</id>
<name>Nexus Release Repository</name> <name>Nexus Release Repository</name>
<url>https://nexus.hsweb.me/content/groups/public/</url> <url>https://nexus.jetlinks.cn/content/groups/public/</url>
<snapshots> <snapshots>
<enabled>true</enabled> <enabled>true</enabled>
<updatePolicy>always</updatePolicy> <updatePolicy>always</updatePolicy>

View File

@ -12,6 +12,8 @@ import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext; import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -23,6 +25,15 @@ import java.util.function.Consumer;
@Slf4j @Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override @Override
public Transport getSupportTransport() { public Transport getSupportTransport() {
return DefaultTransport.CoAP_DTLS; return DefaultTransport.CoAP_DTLS;

View File

@ -10,6 +10,10 @@ import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext; import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.cipher.Ciphers; import org.jetlinks.protocol.official.cipher.Ciphers;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -20,6 +24,13 @@ import java.util.function.Consumer;
@Slf4j @Slf4j
public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
public static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
@Override @Override
public Transport getSupportTransport() { public Transport getSupportTransport() {

View File

@ -3,15 +3,26 @@ package org.jetlinks.protocol.official;
import org.jetlinks.core.defaults.CompositeProtocolSupport; import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec;
import org.jetlinks.protocol.official.udp.UDPDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider { public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata( private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
@ -24,47 +35,92 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
.add("secureId", "secureId", "密钥ID", new StringType()) .add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType()); .add("secureKey", "secureKey", "密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override @Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) { public Mono<CompositeProtocolSupport> create(ServiceContext context) {
return Mono.defer(() -> { return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport(); CompositeProtocolSupport support = new CompositeProtocolSupport();
support.setId("jetlinks.v2.0"); support.setId("jetlinks.v3.0");
support.setName("JetLinks V2.0"); support.setName("JetLinks V3.0");
support.setDescription("JetLinks Protocol Version 2.0"); support.setDescription("JetLinks Protocol Version 3.0");
support.addRoutes(DefaultTransport.MQTT, Arrays
.stream(TopicMessageCodec.values())
.map(TopicMessageCodec::getRoute)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.MQTT,
"document-mqtt.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addRoutes(DefaultTransport.HTTP, Stream
.of(TopicMessageCodec.reportProperty,
TopicMessageCodec.event,
TopicMessageCodec.online,
TopicMessageCodec.offline)
.map(TopicMessageCodec::getRoute)
.filter(route -> route != null && route.isUpstream())
.map(route -> HttpRoute
.builder()
.address(route.getTopic())
.group(route.getGroup())
.contentType(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST)
.description(route.getDescription())
.example(route.getExample())
.build())
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.HTTP,
"document-http.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator()); support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig); support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig);
support.addConfigMetadata(DefaultTransport.CoAP, coapConfig);
support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig);
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT));
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
//TCP
support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig);
support.addMessageCodecSupport(new TcpDeviceMessageCodec());
support.setDocument(DefaultTransport.TCP,
"document-tcp.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
//UDP
support.addConfigMetadata(DefaultTransport.UDP, UDPDeviceMessageCodec.udpConfig);
support.addMessageCodecSupport(new UDPDeviceMessageCodec());
//MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());
//HTTP
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//Websocket
JetLinksHttpDeviceMessageCodec codec = new JetLinksHttpDeviceMessageCodec(DefaultTransport.WebSocket);
support.addMessageCodecSupport(codec);
support.addAuthenticator(DefaultTransport.WebSocket, codec);
support.addRoutes(
DefaultTransport.WebSocket,
Collections.singleton(
WebsocketRoute
.builder()
.path("/{productId:产品ID}/{productId:设备ID}/socket")
.description("通过Websocket接入平台")
.build()
));
//CoAP
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec()); support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
return Mono.just(support); return Mono.just(support);

View File

@ -11,6 +11,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*; import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage; import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils; import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -19,12 +20,69 @@ import reactor.core.publisher.Mono;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Function;
public enum TopicMessageCodec { public enum TopicMessageCodec {
//上报属性数据 //上报属性数据
reportProperty("/*/properties/report", ReportPropertyMessage.class), reportProperty("/*/properties/report",
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性
readProperty("/*/properties/read",
ReadPropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
//读取属性回复
readPropertyReply("/*/properties/read/reply",
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性
writeProperty("/*/properties/write",
WritePropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性回复
writePropertyReply("/*/properties/write/reply",
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//事件上报 //事件上报
event("/*/event/*", EventMessage.class) { event("/*/event/*",
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{eventId:事件ID}";
}
@Override @Override
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String event = topic[topic.length - 1]; String event = topic[topic.length - 1];
@ -42,20 +100,42 @@ public enum TopicMessageCodec {
topics[topics.length - 1] = String.valueOf(event.getEvent()); topics[topics.length - 1] = String.valueOf(event.getEvent());
} }
}, },
//读取属性
readProperty("/*/properties/read", ReadPropertyMessage.class),
//读取属性回复
readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class),
//修改属性
writeProperty("/*/properties/write", WritePropertyMessage.class),
//修改属性回复
writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class),
//调用功能 //调用功能
functionInvoke("/*/function/invoke", FunctionInvokeMessage.class), functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
//调用功能回复 //调用功能回复
functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class), functionInvokeReply("/*/function/invoke/reply",
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
//子设备消息 //子设备消息
child("/*/child/*/**", ChildDeviceMessage.class) { child("/*/child/*/**",
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override @Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
@ -91,7 +171,19 @@ public enum TopicMessageCodec {
} }
}, //子设备消息回复 }, //子设备消息回复
childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) { childReply("/*/child-reply/*/**",
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override @Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
@ -128,13 +220,21 @@ public enum TopicMessageCodec {
} }
}, },
//更新标签 //更新标签
updateTag("/*/tags", UpdateTagMessage.class), updateTag("/*/tags",
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
//注册 //注册
register("/*/register", DeviceRegisterMessage.class), register("/*/register", DeviceRegisterMessage.class),
//注销 //注销
unregister("/*/unregister", DeviceUnRegisterMessage.class), unregister("/*/unregister", DeviceUnRegisterMessage.class),
//更新固件消息 //更新固件消息
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class), upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
//更新固件消息回复
upgradeFirmwareReply("/*/firmware/upgrade/reply", UpgradeFirmwareMessageReply.class),
//更新固件升级进度消息 //更新固件升级进度消息
upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class), upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
//拉取固件 //拉取固件
@ -164,9 +264,15 @@ public enum TopicMessageCodec {
//断开连接回复 //断开连接回复
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class), disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
//上线 //上线
connect("/*/online", DeviceOnlineMessage.class), online("/*/online", DeviceOnlineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备上线")),
//离线 //离线
offline("/*/offline", DeviceOfflineMessage.class), offline("/*/offline", DeviceOfflineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备离线")),
//日志 //日志
log("/*/log", DeviceLogMessage.class), log("/*/log", DeviceLogMessage.class),
//状态检查 //状态检查
@ -174,14 +280,49 @@ public enum TopicMessageCodec {
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class), stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
; ;
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) { TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type,
Function<MqttRoute.Builder, MqttRoute.Builder> routeCustom) {
this.pattern = topic.split("/"); this.pattern = topic.split("/");
this.type = type; this.type = type;
this.route = routeCustom.apply(toRoute()).build();
}
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
this.route = null;
} }
private final String[] pattern; private final String[] pattern;
private final MqttRoute route;
private final Class<? extends DeviceMessage> type; private final Class<? extends DeviceMessage> type;
protected void transMqttTopic(String[] topic) {
}
@SneakyThrows
private MqttRoute.Builder toRoute() {
String[] topics = new String[pattern.length];
System.arraycopy(pattern, 0, topics, 0, pattern.length);
topics[0] = "{productId:产品ID}";
topics[1] = "{deviceId:设备ID}";
transMqttTopic(topics);
StringJoiner joiner = new StringJoiner("/", "/", "");
for (String topic : topics) {
joiner.add(topic);
}
return MqttRoute
.builder(joiner.toString())
.qos(1);
}
public MqttRoute getRoute() {
return route;
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) { public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
return Mono return Mono
.justOrEmpty(fromTopic(topics)) .justOrEmpty(fromTopic(topics))

View File

@ -0,0 +1,7 @@
package org.jetlinks.protocol.official.binary;
public enum AckCode {
ok,
noAuth,
unsupportedMessage
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
public class BinaryAcknowledgeDeviceMessage implements BinaryMessage<AcknowledgeDeviceMessage> {
public static final HeaderKey<String> codeHeader = HeaderKey.of("code", AckCode.ok.name());
private AcknowledgeDeviceMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.ack;
}
@Override
public void read(ByteBuf buf) {
message = new AcknowledgeDeviceMessage();
AckCode code = AckCode.values()[buf.readUnsignedByte()];
message.addHeader(codeHeader, code.name());
}
@Override
public void write(ByteBuf buf) {
AckCode code = AckCode.valueOf(this.message.getHeaderOrDefault(codeHeader));
buf.writeByte(code.ordinal());
}
@Override
public void setMessage(AcknowledgeDeviceMessage message) {
this.message = message;
}
@Override
public AcknowledgeDeviceMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,45 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
/**
*
*/
public class BinaryDeviceOnlineMessage implements BinaryMessage<DeviceOnlineMessage> {
public static final HeaderKey<String> loginToken = HeaderKey.of("token", null);
private DeviceOnlineMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.online;
}
@Override
public void read(ByteBuf buf) {
message = new DeviceOnlineMessage();
message.addHeader(loginToken, (String) DataType.STRING.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING
.write(
buf, message.getHeader(loginToken).orElse("")
);
}
@Override
public void setMessage(DeviceOnlineMessage message) {
this.message = message;
}
@Override
public DeviceOnlineMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,46 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.event.EventMessage;
/**
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryEventMessage implements BinaryMessage<EventMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.event;
}
private EventMessage message;
@Override
public void read(ByteBuf buf) {
message = new EventMessage();
message.setEvent((String) DataType.STRING.read(buf));
message.setData(DataType.OBJECT.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getEvent());
DataType.OBJECT.write(buf, message.getData());
}
@Override
public void setMessage(EventMessage message) {
this.message = message;
}
@Override
public EventMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import java.util.Map;
import java.util.stream.Collectors;
public class BinaryFunctionInvokeMessage implements BinaryMessage<FunctionInvokeMessage> {
private FunctionInvokeMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.function;
}
@Override
public void read(ByteBuf buf) {
message = new FunctionInvokeMessage();
message.setFunctionId((String) DataType.STRING.read(buf));
@SuppressWarnings("all")
Map<String, Object> params = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setInputs(
params
.entrySet()
.stream()
.map(e -> new FunctionParameter(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getFunctionId());
DataType.OBJECT.write(buf,message.inputsToMap());
}
@Override
public void setMessage(FunctionInvokeMessage message) {
this.message = message;
}
@Override
public FunctionInvokeMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryFunctionInvokeMessageReply extends BinaryReplyMessage<FunctionInvokeMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.functionReply;
}
@Override
protected FunctionInvokeMessageReply newMessage() {
return new FunctionInvokeMessageReply();
}
@Override
protected void doReadSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
msg.setFunctionId((String) DataType.readFrom(buf));
msg.setOutput(DataType.readFrom(buf));
}
@Override
protected void doWriteSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
DataType.writeTo(getMessage().getFunctionId(), buf);
DataType.writeTo(msg.getOutput(), buf);
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessage;
import reactor.core.publisher.Flux;
public interface BinaryMessage<T extends DeviceMessage> {
BinaryMessageType getType();
void read(ByteBuf buf);
void write(ByteBuf buf);
void setMessage(T message);
T getMessage();
}

View File

@ -0,0 +1,223 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public enum BinaryMessageType {
//0x00
keepalive(null, null),
//0x01
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
//0x02
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
//0x03
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
//0x04
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
//0x05
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
writePropertyReply(WritePropertyMessageReply.class, BinaryWritePropertyMessageReply::new),
function(FunctionInvokeMessage.class, BinaryFunctionInvokeMessage::new),
functionReply(FunctionInvokeMessageReply.class, BinaryFunctionInvokeMessageReply::new),
event(EventMessage.class, BinaryEventMessage::new);
private final Class<? extends DeviceMessage> forDevice;
private final Supplier<BinaryMessage<DeviceMessage>> forTcp;
private static final BinaryMessageType[] VALUES = values();
public static final HeaderKey<Integer> HEADER_MSG_SEQ = HeaderKey.of("_seq", 0, Integer.class);
@SuppressWarnings("all")
BinaryMessageType(Class<? extends DeviceMessage> forDevice,
Supplier<? extends BinaryMessage<?>> forTcp) {
this.forDevice = forDevice;
this.forTcp = (Supplier) forTcp;
}
private static final Map<String, MsgIdHolder> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.<String, MsgIdHolder>build()
.asMap();
private static class MsgIdHolder {
private int msgId = 0;
private final Map<Integer, String> cached = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.<Integer, String>build()
.asMap();
public int next(String id) {
if (id == null) {
return -1;
}
do {
if (msgId++ < 0) {
msgId = 0;
}
} while (cached.putIfAbsent(msgId, id) != null);
return msgId;
}
public String getAndRemove(int id) {
if (id < 0) {
return null;
}
return cached.remove(id);
}
}
@SneakyThrows
private static MsgIdHolder takeHolder(String deviceId) {
return cache.computeIfAbsent(deviceId, (ignore) -> new MsgIdHolder());
}
public static ByteBuf write(DeviceMessage message, ByteBuf data) {
int msgId = message.getHeaderOrElse(HEADER_MSG_SEQ, () -> takeHolder(message.getDeviceId()).next(message.getMessageId()));
return write(message, msgId, data);
}
public static ByteBuf write(BinaryMessageType type, ByteBuf data) {
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 0-4字节 时间戳
data.writeLong(System.currentTimeMillis());
return data;
}
public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) {
BinaryMessageType type = lookup(message);
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 第1-8字节 时间戳
data.writeLong(message.getTimestamp());
// 9-11字节 消息序号
data.writeShort(msgId);
// 12... 字节 设备ID
DataType.STRING.write(data, message.getDeviceId());
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
tcp.setMessage(message);
//写出数据到ByteBuf
tcp.write(data);
return data;
}
public static DeviceMessage read(ByteBuf data) {
return read(data, null);
}
public static <T> T read(ByteBuf data,
String deviceIdMaybe,
BiFunction<DeviceMessage, Integer, T> handler) {
//第0个字节是消息类型
BinaryMessageType type = VALUES[data.readByte()];
if (type.forTcp == null) {
return null;
}
// 1-8字节 时间戳
long timestamp = data.readLong();
// 9-11字节 消息序号
int msgId = data.readUnsignedShort();
// 12... 字节 设备ID
String deviceId = (String) DataType.STRING.read(data);
if (deviceId == null) {
deviceId = deviceIdMaybe;
}
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
//从ByteBuf读取
tcp.read(data);
DeviceMessage message = tcp.getMessage();
message.thingId(DeviceThingType.device, deviceId);
if (timestamp > 0) {
message.timestamp(timestamp);
}
message.addHeader(HEADER_MSG_SEQ, msgId);
return handler.apply(message, msgId);
}
public static DeviceMessage read(ByteBuf data, String deviceIdMaybe) {
return read(data, deviceIdMaybe, (message, msgId) -> {
String messageId = null;
if (message.getDeviceId() != null) {
//获取实际平台下发的消息ID
MsgIdHolder holder = cache.get(message.getDeviceId());
if (holder != null) {
messageId = holder.getAndRemove(msgId);
}
}
if (messageId == null && msgId > 0) {
messageId = String.valueOf(msgId);
}
message.messageId(messageId);
return message;
});
}
public static BinaryMessageType lookup(DeviceMessage message) {
for (BinaryMessageType value : VALUES) {
if (value.forDevice != null && value.forDevice.isInstance(message)) {
return value;
}
}
throw new UnsupportedOperationException("unsupported device message " + message.getMessageType());
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (BinaryMessageType value : BinaryMessageType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.List;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReadPropertyMessage implements BinaryMessage<ReadPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readProperty;
}
private ReadPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReadPropertyMessage();
@SuppressWarnings("all")
List<String> list = (List<String>) DataType.ARRAY.read(buf);
message.setProperties(list);
}
@Override
public void write(ByteBuf buf) {
DataType.ARRAY.write(buf, message.getProperties());
}
@Override
public void setMessage(ReadPropertyMessage message) {
this.message = message;
}
@Override
public ReadPropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,41 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryReadPropertyMessageReply extends BinaryReplyMessage<ReadPropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected ReadPropertyMessageReply newMessage() {
return new ReadPropertyMessageReply();
}
@Override
protected void doWriteSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
@Override
protected void doReadSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
}

View File

@ -0,0 +1,52 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessageReply;
import java.util.Map;
public abstract class BinaryReplyMessage<T extends DeviceMessageReply> implements BinaryMessage<T> {
private T message;
protected abstract T newMessage();
@Override
public final void read(ByteBuf buf) {
message = newMessage();
boolean success = buf.readBoolean();
if (success) {
doReadSuccess(message, buf);
} else {
message.success(false);
message.code(String.valueOf(DataType.readFrom(buf)));
message.message(String.valueOf(DataType.readFrom(buf)));
}
}
protected abstract void doReadSuccess(T msg, ByteBuf buf);
protected abstract void doWriteSuccess(T msg, ByteBuf buf);
@Override
public final void write(ByteBuf buf) {
buf.writeBoolean(message.isSuccess());
if (message.isSuccess()) {
doWriteSuccess(message, buf);
} else {
DataType.writeTo(message.getCode(), buf);
DataType.writeTo(message.getMessage(), buf);
}
}
@Override
public void setMessage(T message) {
this.message = message;
}
@Override
public T getMessage() {
return message;
}
}

View File

@ -0,0 +1,48 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReportPropertyMessage implements BinaryMessage<ReportPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.reportProperty;
}
private ReportPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReportPropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(ReportPropertyMessage message) {
this.message = message;
}
@Override
public ReportPropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,49 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryWritePropertyMessage implements BinaryMessage<WritePropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.writeProperty;
}
private WritePropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new WritePropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(WritePropertyMessage message) {
this.message = message;
}
@Override
public WritePropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryWritePropertyMessageReply extends BinaryReplyMessage<WritePropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected WritePropertyMessageReply newMessage() {
return new WritePropertyMessageReply();
}
@Override
protected void doReadSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
@Override
protected void doWriteSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
}

View File

@ -0,0 +1,286 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.protocol.official.ObjectMappers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public enum DataType {
//0x00
NULL {
@Override
public Object read(ByteBuf buf) {
return null;
}
@Override
public void write(ByteBuf buf, Object value) {
}
},
//0x01
BOOLEAN {
@Override
public Object read(ByteBuf buf) {
return buf.readBoolean();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeBoolean((Boolean) value);
}
},
//0x02
INT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x03
INT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x04
INT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x05
INT64 {
@Override
public Object read(ByteBuf buf) {
return buf.readLong();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeLong((Long) value);
}
},
//0x06
UINT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x07
UINT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x08
UINT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x09
FLOAT {
@Override
public Object read(ByteBuf buf) {
return buf.readFloat();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeFloat((Float) value);
}
},
//0x0A
DOUBLE {
@Override
public Object read(ByteBuf buf) {
return buf.readDouble();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeDouble((Double) value);
}
},
//0x0B
STRING {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = ((String) value).getBytes();
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0C
BINARY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return bytes;
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = (byte[]) value;
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0D
ARRAY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
List<Object> array = new ArrayList<>(len);
for (int i = 0; i < len; i++) {
array.add(readFrom(buf));
}
return array;
}
@Override
public void write(ByteBuf buf, Object value) {
Collection<Object> array = (Collection<Object>) value;
buf.writeShort(array.size());
for (Object o : array) {
writeTo(o, buf);
}
}
},
//0x0E
OBJECT {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
Map<String, Object> data = Maps.newLinkedHashMapWithExpectedSize(len);
for (int i = 0; i < len; i++) {
data.put((String) STRING.read(buf), readFrom(buf));
}
return data;
}
@Override
public void write(ByteBuf buf, Object value) {
Map<String, Object> data = value instanceof Map ? ((Map) value) : ObjectMappers.JSON_MAPPER.convertValue(value, Map.class);
buf.writeShort(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
STRING.write(buf, entry.getKey());
writeTo(entry.getValue(), buf);
}
}
};
private final static DataType[] VALUES = values();
public abstract Object read(ByteBuf buf);
public abstract void write(ByteBuf buf, Object value);
public static Object readFrom(ByteBuf buf) {
return VALUES[buf.readUnsignedByte()].read(buf);
}
public static void writeTo(Object data, ByteBuf buf) {
DataType type = loopUpType(data);
buf.writeByte(type.ordinal());
type.write(buf, data);
}
private static DataType loopUpType(Object data) {
if (data == null) {
return NULL;
} else if (data instanceof Boolean) {
return BOOLEAN;
} else if (data instanceof Byte) {
return INT8;
} else if (data instanceof Short) {
return INT16;
} else if (data instanceof Integer) {
return INT32;
} else if (data instanceof Long) {
return INT64;
} else if (data instanceof Float) {
return FLOAT;
} else if (data instanceof Double) {
return DOUBLE;
} else if (data instanceof String) {
return STRING;
} else if (data instanceof byte[]) {
return BINARY;
} else if (data instanceof Collection) {
return ARRAY;
} else if (data instanceof Map) {
return OBJECT;
} else {
throw new IllegalArgumentException("Unsupported data type: " + data.getClass());
}
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (DataType value : DataType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}

View File

@ -0,0 +1,234 @@
package org.jetlinks.protocol.official.http;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Objects;
/**
* Http 的消息编解码器
*
* @author zhouhao
* @since 3.0.0
*/
@Slf4j
public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec, Authenticator {
public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置"
, "使用HTTP Bearer Token进行认证")
.add("bearer_token", "Token", "Token", new PasswordType());
private final Transport transport;
public JetLinksHttpDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public JetLinksHttpDeviceMessageCodec() {
this(DefaultTransport.HTTP);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
JSONObject json = context.getMessage().toJson();
//通过websocket下发
return Mono.just(DefaultWebSocketMessage.of(
WebSocketMessage.Type.TEXT,
Unpooled.wrappedBuffer(json.toJSONString().getBytes())));
}
private static SimpleHttpResponseMessage unauthorized(String msg) {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}")
.status(401)
.build();
}
private static SimpleHttpResponseMessage badRequest() {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"bad_request\"}")
.status(400)
.build();
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof HttpExchangeMessage) {
return decodeHttp(context);
}
if (context.getMessage() instanceof WebSocketSessionMessage) {
return decodeWebsocket(context);
}
return Flux.empty();
}
private Flux<DeviceMessage> decodeWebsocket(MessageDecodeContext context) {
WebSocketSessionMessage msg = ((WebSocketSessionMessage) context.getMessage());
return Mono
.justOrEmpty(MessageType.convertMessage(msg.payloadAsJson()))
.cast(DeviceMessage.class)
.flux();
}
private Flux<DeviceMessage> decodeHttp(MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
//校验请求头中的Authorization header,格式:
// Authorization: Bearer <token>
Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null);
if (header == null || header.getValue() == null || header.getValue().length == 0) {
return message
.response(unauthorized("Authorization header is required"))
.thenMany(Mono.empty());
}
String[] token = header.getValue()[0].split(" ");
if (token.length == 1) {
return message
.response(unauthorized("Illegal token format"))
.thenMany(Mono.empty());
}
String basicToken = token[1];
String[] paths = TopicMessageCodec.removeProductPath(message.getPath());
if (paths.length < 1) {
return message
.response(badRequest())
.thenMany(Mono.empty());
}
String deviceId = paths[1];
return context
.getDevice(deviceId)
.flatMap(device -> device.getConfig("bearer_token"))
//校验token
.filter(value -> Objects.equals(value.asString(), basicToken))
//设备或者配置不对
.switchIfEmpty(Mono.defer(() -> message
.response(unauthorized("Device no register or token not match"))
.then(Mono.empty())))
//解码
.flatMapMany(ignore -> doDecode(message, paths))
.switchOnFirst((s, flux) -> {
Mono<Void> handler;
//有结果则认为成功
if (s.hasValue()) {
handler = message.ok("{\"success\":true}");
} else {
return message
.response(badRequest())
.then(Mono.empty());
}
return handler.thenMany(flux);
})
.onErrorResume(err -> message
.error(500, getErrorMessage(err))
.then(Mono.error(err)))
//跟踪信息
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(deviceId),
builder -> builder.setAttribute(DeviceTracer.SpanKey.message, message.print())));
}
private Flux<DeviceMessage> doDecode(HttpExchangeMessage message, String[] paths) {
return message
.payload()
.flatMapMany(buf -> {
byte[] body = ByteBufUtil.getBytes(buf);
return TopicMessageCodec.decode(ObjectMappers.JSON_MAPPER, paths, body);
});
}
public String getErrorMessage(Throwable err) {
if (err instanceof JsonParseException) {
return "{\"success\":false,\"code\":\"request_body_format_error\"}";
}
return "{\"success\":false,\"code\":\"server_error\"}";
}
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator device) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String token = req
.getSocketSession()
.getQueryParameters()
.get("token");
if (StringUtils.isEmpty(token)) {
return Mono.just(AuthenticationResponse.error(401, "认证参数错误"));
}
return device
.getConfig("bearer_token")
//校验token
.filter(value -> Objects.equals(value.asString(), token))
.map(ignore -> AuthenticationResponse.success(device.getDeviceId()))
//未配置或者配置不对
.switchIfEmpty(Mono.fromSupplier(() -> AuthenticationResponse.error(401, "token错误")));
}
static AuthenticationResponse deviceNotFound = AuthenticationResponse.error(404, "设备不存在");
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String[] paths = TopicMessageCodec.removeProductPath(req.getSocketSession().getPath());
if (paths.length < 1) {
return Mono.just(AuthenticationResponse.error(400, "URL格式错误"));
}
return registry
.getDevice(paths[1])
.flatMap(device -> authenticate(request, device))
.defaultIfEmpty(deviceNotFound);
}
}

View File

@ -0,0 +1,127 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.SneakyThrows;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TcpDevice {
@SneakyThrows
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
int start = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int count = args.length > 1 ? Integer.parseInt(args[1]) : 8000;
String[] hosts = args.length > 2 ? args[2].split(",") : new String[]{"0.0.0.0"};
Flux.range(start, count)
.flatMap(i -> Mono
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
RecordParser parser = RecordParser.newFixed(4);
AtomicReference<Buffer> buffer = new AtomicReference<>();
parser.handler(buf -> {
buffer.accumulateAndGet(buf, (a, b) -> {
if (a == null) {
parser.fixedSizeMode(buf.getInt(0));
return b;
}
parser.fixedSizeMode(4);
sink.success("tcp-off-" + i + ":" + socket.localAddress());
BinaryMessageType
.read(b.getByteBuf(),
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
return null;
});
});
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(parser);
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
});
}),
1024
)
.count()
.subscribe(System.out::println);
System.in.read();
}
protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) {
if (!(downstream instanceof AcknowledgeDeviceMessage)) {
// System.out.println(downstream);
}
DeviceMessage reply = null;
if (downstream instanceof ReadPropertyMessage) {
reply = ((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
"temp0",
ThreadLocalRandom
.current()
.nextFloat() * 100
));
} else if (downstream instanceof WritePropertyMessage) {
reply = ((WritePropertyMessage) downstream)
.newReply()
.success(((WritePropertyMessage) downstream).getProperties());
}
if (reply != null) {
socket.write(
Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(
reply
, seq, Unpooled.buffer())))
);
}
}
}

View File

@ -0,0 +1,123 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.AckCode;
import org.jetlinks.protocol.official.binary.BinaryAcknowledgeDeviceMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class TcpDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata tcpConfig = new DefaultConfigMetadata(
"TCP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.TCP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//read index
payload.readInt();
//处理tcp连接后的首次消息
if (context.getDevice() == null) {
return handleLogin(payload, context);
}
return Mono.justOrEmpty(BinaryMessageType.read(payload, context.getDevice().getDeviceId()));
}
private Mono<DeviceMessage> handleLogin(ByteBuf payload, MessageDecodeContext context) {
DeviceMessage message = BinaryMessageType.read(payload);
if (message instanceof DeviceOnlineMessage) {
String token = message
.getHeader(BinaryDeviceOnlineMessage.loginToken)
.orElse(null);
String deviceId = message.getDeviceId();
return context
.getDevice(deviceId)
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
} else {
return ack(message, AckCode.noAuth, context);
}
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return Unpooled.wrappedBuffer(
Unpooled.buffer().writeInt(payload.writerIndex()),
payload);
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
if(source==null){
return Mono.empty();
}
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(EncodedMessage.simple(
wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))
))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return Mono.just(EncodedMessage.simple(
wrapByteByf(
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
)
));
}
}

View File

@ -0,0 +1,114 @@
package org.jetlinks.protocol.official.udp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class UDPDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata udpConfig = new DefaultConfigMetadata(
"UDP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.UDP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//todo 认证类型, 0 token,1 sign
byte authType = payload.readByte();
//前面是token
String token = (String) DataType.STRING.read(payload);
//接下来是消息
DeviceMessage message = BinaryMessageType.read(payload);
return context
.getDevice(message.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return payload;
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(doEncode(message, ""))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return context
.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.map(config -> doEncode(deviceMessage, config.asString())));
}
private EncodedMessage doEncode(DeviceMessage message, String token) {
ByteBuf buf = Unpooled.buffer();
//todo 认证类型, 0 token,1 sign
buf.writeByte(0);
//token
DataType.STRING.write(buf, token);
//指令
return EncodedMessage.simple(wrapByteByf(BinaryMessageType.write(message, buf)));
}
}

View File

@ -0,0 +1,29 @@
#### 使用HTTP推送设备数据
上报属性例子:
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
上报事件例子:
```http request
POST /{productId}/{deviceId}/event/{eventId}
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"data":{
"address": ""
}
}
```

View File

@ -0,0 +1,12 @@
### 认证说明
CONNECT报文:
```text
clientId: 设备ID
username: secureId+"|"+timestamp
password: md5(secureId+"|"+timestamp+"|"+secureKey)
```
说明: secureId以及secureKey在创建设备产品或设备实例时进行配置.
timestamp为当前时间戳(毫秒),与服务器时间不能相差5分钟.
md5为32位,不区分大小写.

View File

View File

@ -309,17 +309,26 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test @Test
public void testPropertiesReport() { public void testPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() Message message = codec.decode(createMessageContext(
SimpleMqttMessage
.builder()
.topic("/product1/device1/properties/report") .topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}" .payload(Unpooled.wrappedBuffer(("{\"messageId\":\"test\"," +
.getBytes())) "\"properties\":{\"sn\":\"test\"}," +
.build())).blockFirst(); "\"propertySourceTimes\":{\"sn\":10086}" +
"}").getBytes()))
.build()))
.blockFirst();
Assert.assertTrue(message instanceof ReportPropertyMessage); Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message); ReportPropertyMessage reply = ((ReportPropertyMessage) message);
Assert.assertNotNull(reply.getPropertySourceTimes());
Assert.assertEquals(reply.getPropertySourceTimes().get("sn"), Long.valueOf(10086L));
Assert.assertEquals(reply.getDeviceId(), "device1"); Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test"); Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test")); Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply); System.out.println(reply);
} }
@ -390,11 +399,15 @@ public class JetLinksMqttDeviceMessageCodecTest {
Assert.assertNotNull(currentReply); Assert.assertNotNull(currentReply);
Assert.assertEquals(((MqttMessage) currentReply).getTopic(),"/product1/device1/time-sync/reply"); Assert.assertEquals(((MqttMessage) currentReply).getTopic(), "/product1/device1/time-sync/reply");
Assert.assertTrue(currentReply.payloadAsString().contains("timestamp")); Assert.assertTrue(currentReply.payloadAsString().contains("timestamp"));
} }
public void testTopic() {
}
public MessageEncodeContext createMessageContext(Message message) { public MessageEncodeContext createMessageContext(Message message) {
System.out.println(message.toString()); System.out.println(message.toString());
return new MessageEncodeContext() { return new MessageEncodeContext() {

View File

@ -1,10 +1,12 @@
package org.jetlinks.protocol.official; package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetlinks.core.codec.defaults.TopicPayloadCodec;
import org.jetlinks.core.message.ChildDeviceMessage; import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.event.EventMessage; import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage; import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.route.Route;
import org.junit.Test; import org.junit.Test;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
@ -38,6 +40,15 @@ public class TopicMessageCodecTest {
} }
@Test
public void testRoute() {
for (TopicMessageCodec value : TopicMessageCodec.values()) {
Route route = value.getRoute();
if (null != route)
System.out.println(route.getAddress());
}
}
@Test @Test
public void doTest() { public void doTest() {
testChild(ObjectMappers.JSON_MAPPER); testChild(ObjectMappers.JSON_MAPPER);
@ -58,6 +69,6 @@ public class TopicMessageCodecTest {
DeviceMessage msg = TopicMessageCodec DeviceMessage msg = TopicMessageCodec
.decode(ObjectMappers.JSON_MAPPER, payload.getTopic(), payload.getPayload()) .decode(ObjectMappers.JSON_MAPPER, payload.getTopic(), payload.getPayload())
.blockLast(); .blockLast();
assertEquals(msg.toJson(),eventMessage.toJson()); assertEquals(msg.toJson(), eventMessage.toJson());
} }
} }

View File

@ -0,0 +1,119 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class BinaryMessageTypeTest {
@Test
public void testOnline() {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1000");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
System.out.println(ByteBufUtil.prettyHexDump(byteBuf));
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
System.out.println(ByteBufUtil.prettyHexDump(buf));
//登录报文
System.out.println(ByteBufUtil.hexDump(buf));
}
@Test
public void testReport() {
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
}
@Test
public void testRead() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonList("temp"));
doTest(message);
ReadPropertyMessageReply reply = new ReadPropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testWrite() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
WritePropertyMessageReply reply = new WritePropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setFunctionId("123");
message.setDeviceId("test");
message.setMessageId("test123");
message.addInput("test", 1);
doTest(message);
FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setOutput(123);
doTest(reply);
}
public void doTest(DeviceMessage message) {
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
// System.out.println(ByteBufUtil.prettyHexDump(data));
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
System.out.println(ByteBufUtil.prettyHexDump(buf));
System.out.println(ByteBufUtil.hexDump(buf));
//将长度字节读取后直接解析报文正文
buf.readInt();
DeviceMessage read = BinaryMessageType.read(buf);
if (null != read.getHeaders()) {
read.getHeaders().forEach(message::addHeader);
}
System.out.println(read);
Assert.assertEquals(read.toString(), message.toString());
}
}