Compare commits

..

11 Commits
v3 ... v3.1

Author SHA1 Message Date
zhouhao 0dda72995b build(maven): 升级依赖版本 2023-08-04 17:30:34 +08:00
zhouhao 486179b41f 增加tcp报文说明 2023-07-07 16:27:55 +08:00
zhouhao b234e8ebed Merge remote-tracking branch 'origin/v3.1' into v3.1
# Conflicts:
#	package/jetlinks-official-protocol-3.1.0-SNAPSHOT.jar
2023-04-04 09:46:08 +08:00
zhouhao dc8dce81f0 3.1.0 2023-04-04 09:45:44 +08:00
zhouhao 9ee6f210b4 3.1.0 2023-04-04 09:45:08 +08:00
zhouhao e034489976 feat(数采网关): 增加数采网关相关topic处理 2023-04-04 09:44:34 +08:00
zhouhao c16f63ebab 3.1.0-SNAPSHOT 2023-04-04 09:44:34 +08:00
zhouhao f1a5a0d8ac 3.1.0 2023-04-04 09:42:57 +08:00
老周 692e6c76cc
Merge pull request #11 from jetlinks/v3
fix(TCP协议): 优化字符串中文乱码的问题
2023-03-09 10:23:05 +08:00
zhouhao e93f678cf8 feat(数采网关): 增加数采网关相关topic处理 2023-02-24 13:40:37 +08:00
zhouhao e5babc4647 3.1.0-SNAPSHOT 2023-02-24 11:23:52 +08:00
6 changed files with 124 additions and 172 deletions

View File

@ -1,167 +1,89 @@
## 头信息 ## 头信息
| 字节数 | 类型 | 字段 | 备注 | | 字节序 | 类型 | 字段 |
| :----: | ------- | :--------------------: | ------------------- | |:----:|---------|:-----------:|
| n | 自定 | 消息长度 | 平台配置项 | | 0 | INT8 | 消息类型 |
| 1 | INT8 | 消息类型 | 见消息类型定义 | | 1-8 | INT64 | UTC时间戳 |
| 8 | INT64 | UTC时间戳 | | | 9-11 | INT16 | 消息序号 |
| 2 | INT16 | 消息序号 | | | 12-n | STRING | 设备ID |
| 2 | INT16 | 设备ID长度 | | | ... | MESSAGE | 消息类型对应的编码规则 |
| n | STRING | 设备ID | 根据设备ID长度得出 |
| n | MESSAGE | 消息类型对应的编码规则 | |
| 2 | INT16 | secureKeyLength | TCP认真配置密钥长度 |
| n | STRING | secureKey | 密钥 |
## 数据类型 ## 数据类型
所有数据类型均采用`大端`编码 所有数据类型均采用`大端`编码
| Byte | Type | 编码规则 | 备注 | | Byte | Type | 编码规则 |
| :--: | :-----: | ------------------------------------------------------------ | ------------------------------------------------------------ | |:----:|:-------:|---------------------------------------------|
| 0x00 | NULL | | | | 0x00 | NULL | 0x01 |
| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true | | | 0x01 | BOOLEAN | 1字节 0x00为false 其他为true |
| 0x02 | INT8 | 1字节 (byte) | | | 0x02 | INT8 | 1字节 (byte) |
| 0x03 | INT16 | 2字节整型 (short) | | | 0x03 | INT16 | 2字节整型 (short) |
| 0x04 | INT32 | 4字节整型 (int) | | | 0x04 | INT32 | 4字节整型 (int) |
| 0x05 | INT64 | 8字节整型 (long) | | | 0x05 | INT64 | 8字节整型 (long) |
| 0x06 | UINT8 | 1字节无符号整型 | | | 0x06 | UINT8 | 1字节无符号整型 |
| 0x07 | UINT16 | 2字节无符号整型 | | | 0x07 | UINT16 | 2字节无符号整型 |
| 0x08 | UINT32 | 4字节无符号整型 | | | 0x08 | UINT32 | 4字节无符号整型 |
| 0x09 | FLOAT | 4字节 IEEE 754浮点数 | | | 0x09 | FLOAT | 4字节 IEEE 754浮点数 |
| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 | | | 0x0a | DOUBLE | 8字节 IEEE 754浮点数 |
| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 | 2+N2个字节UnsignedShort表示N的长度 | | 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 |
| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 | 2+N2个字节UnsignedShort表示N的长度 | | 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 |
| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 | 2+N2个字节UnsignedShort表示ARRAY的长度N=很多*(1+X)1个字节UnsignedShort表示X的数据类型X表示长度见上几行。 | | 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 |
| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value | 2+N2个字节UnsignedShort表示OBJECT的长度,N是STRING+数据类型的组合了(见上几行)。 | | 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value |
## 消息类型定义 ## 消息类型定义
| Byte | Type | 说明 | Message示例 | | Byte | Type | 说明 |
| :--: | :----------------- | ------------ | ------------------------------------------------------------ | |:----:|:-------------------|--------|
| 0x00 | keepalive | 心跳 | | | 0x00 | keepalive | 心跳 |
| 0x01 | online | 首次连接 | [STRING:密钥信息 ] | | 0x01 | online | 首次连接 |
| 0x02 | ack | 应答 | [应答码 ]<br />应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持. | | 0x02 | ack | 应答 |
| 0x03 | reportProperty | 上报属性 | [属性数据:OBJECT类型 ] | | 0x03 | reportProperty | 上报属性 |
| 0x04 | readProperty | 读取属性 | [属性列表:ARRAY类型 ] | | 0x04 | readProperty | 读取属性 |
| 0x05 | readPropertyReply | 读取属性回复 | 读取成功:[`0x01`,属性数据:OBJECT类型 ]<br />读取失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] | | 0x05 | readPropertyReply | 读取属性回复 |
| 0x06 | writeProperty | 修改属性 | [属性列表:OBJECT类型 ] | | 0x06 | writeProperty | 修改属性 |
| 0x07 | writePropertyReply | 修改属性回复 | 修改成功:[`0x01`,属性数据:OBJECT类型 ]<br />修改失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] | | 0x07 | writePropertyReply | 修改属性回复 |
| 0x08 | function | 功能调用 | [功能ID:STRING类型,功能参数:OBJECT类型 ] | | 0x08 | function | 功能调用 |
| 0x09 | functionReply | 功能调用回复 | 调用成功:[`0x01`,属性数据:OBJECT类型 ]<br />调用失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] | | 0x09 | functionReply | 功能调用回复 |
### 备注 ### 0x00 keepalive 心跳
[ 0x00 ]
### 0x01 online 首次连接
[ 0x01,STRING:密钥信息 ]
### 0x02 ack 应答
[ 0x02,应答码 ]
应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持.
### 0x03 reportProperty 上报属性
[ 0x03,属性数据:OBJECT类型 ]
### 0x04 readProperty 读取属性
[ 0x04,属性列表:ARRAY类型 ]
### 0x05 readPropertyReply 读取属性回复
读取成功:
[ 0x05,`0x01`,属性数据:OBJECT类型 ]
读取失败:
[ 0x05,`0x00`,错误码:动态类型,错误消息:动态类型 ]
`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型. `动态读取`表示类型不确定,根据对应的`数据类型`来定义类型.
如: 无错误信息 如: 无错误信息
[ 0x00,`0x00`,`0x00` ] [ 0x05,0x00,`0x00`,`0x00` ]
`INT8(0x02)`类型错误码:`0x04` `INT8(0x02)`类型错误码:`0x04`
[0x00,`0x02,0x04`,0x00 ] [ 0x05,0x00,`0x02,0x04`,0x00 ]
## 示例 TODO 更多消息类型
### 设备上线
```
000000270100000186c51a890f0001001331363531383533343133303332383934343634000561646d696e
```
```java
String hexForOnline =
"00000027" +//消息长度
"01" + //请求上线
"00000186c51a890f" +//时间戳
"0001" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0005" +//secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一使用byteBuf构建上线消息
String deviceId = "1651853413032894464";
String secureKey = "admin";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x01)
.writeLong(System.currentTimeMillis())
.writeShort(1)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf onlineBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二使用jetlinks-core构建消息
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("1");
message.setTimestamp(1678344096015L);
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
```
### 设备数据上报
```
0000006C0300000186C567FA7900020013313635313835333431333033323839343436340001000474656d700B000433362e35000561646d696e
```
```java
String hexForReport =
"0000006C" +//消息长度
"03" +//上报消息
"00000186C567FA79" + //时间戳
"0002" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0001" + //OBJECT对象数量
"0004" +//key的长度
"74656d70" + //值
"0B" + //value的类型
"0004" +//Value的长度
"33362e35" + //值
"0005" + //secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一
String deviceId = "1651853413032894464";
String secureKey = "admin";
String key = "temp";
String value = "36.5";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x03)
.writeLong(System.currentTimeMillis())
.writeShort(2)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(1)
.writeShort(key.getBytes().length)
.writeBytes(key.getBytes())
.writeByte(0x0B)
.writeShort(value.getBytes().length)
.writeBytes(value.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf reportBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("2");
message.setProperties(Collections.singletonMap("temp", 32.88));
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
```

10
pom.xml
View File

@ -6,7 +6,7 @@
<groupId>org.jetlinks</groupId> <groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId> <artifactId>jetlinks-official-protocol</artifactId>
<version>3.0.0</version> <version>3.1.0-SNAPSHOT</version>
<name>JetLinks</name> <name>JetLinks</name>
<url>https://jetlinks.org</url> <url>https://jetlinks.org</url>
@ -44,8 +44,7 @@
<project.build.locales>zh_CN</project.build.locales> <project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk> <project.build.jdk>${java.version}</project.build.jdk>
<spring.boot.version>2.2.8.RELEASE</spring.boot.version> <hsweb.framework.version>4.0.16</hsweb.framework.version>
<hsweb.framework.version>4.0.3</hsweb.framework.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version> <hsweb.expands.version>3.0.2</hsweb.expands.version>
<reactor.version>2020.0.6</reactor.version> <reactor.version>2020.0.6</reactor.version>
</properties> </properties>
@ -204,7 +203,7 @@
<dependency> <dependency>
<groupId>org.jetlinks</groupId> <groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId> <artifactId>jetlinks-supports</artifactId>
<version>1.2.0</version> <version>1.2.1</version>
</dependency> </dependency>
<dependency> <dependency>
@ -286,6 +285,9 @@
<id>hsweb-nexus</id> <id>hsweb-nexus</id>
<name>Nexus Release Repository</name> <name>Nexus Release Repository</name>
<url>https://nexus.jetlinks.cn/content/groups/public/</url> <url>https://nexus.jetlinks.cn/content/groups/public/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots> <snapshots>
<enabled>true</enabled> <enabled>true</enabled>
<updatePolicy>always</updatePolicy> <updatePolicy>always</updatePolicy>

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.message.*; import org.jetlinks.core.message.*;
import org.jetlinks.core.message.collector.*;
import org.jetlinks.core.message.event.EventMessage; import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*; import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionInvokeMessage;
@ -278,6 +279,35 @@ public enum TopicMessageCodec {
//状态检查 //状态检查
stateCheck("/*/state-check", DeviceStateCheckMessage.class), stateCheck("/*/state-check", DeviceStateCheckMessage.class),
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class), stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
//数采相关
collector("/*/collector/report", ReportCollectorDataMessage.class
, builder -> builder
.upstream(true)
.group("数采网关")
.description("上报数采点位数据")),
collectorRead("/*/collector/read",
ReadCollectorDataMessage.class,
builder -> builder
.downstream(true)
.group("数采网关")
.description("平台读取点位数据")),
collectorReadReply("/*/collector/read/reply",
ReadCollectorDataMessageReply.class,
builder -> builder
.upstream(true)
.group("数采网关")
.description("平台读取点位数据结果回复")),
collectorWrite("/*/collector/write", WriteCollectorDataMessage.class,
builder -> builder
.downstream(true)
.group("数采网关")
.description("平台修改点位数据")),
collectorWriteReply("/*/collector/write/reply", WriteCollectorDataMessageReply.class,
builder -> builder
.upstream(true)
.group("数采网关")
.description("平台修改点位数据结果回复")),
; ;
TopicMessageCodec(String topic, TopicMessageCodec(String topic,

View File

@ -152,11 +152,11 @@ public enum BinaryMessageType {
if (type.forTcp == null) { if (type.forTcp == null) {
return null; return null;
} }
// 1-8字节 时间戳 // 1-4字节 时间戳
long timestamp = data.readLong(); long timestamp = data.readLong();
// 9-11字节 消息序号 // 5-6字节 消息序号
int msgId = data.readUnsignedShort(); int msgId = data.readUnsignedShort();
// 12... 字节 设备ID // 7... 字节 设备ID
String deviceId = (String) DataType.STRING.read(data); String deviceId = (String) DataType.STRING.read(data);
if (deviceId == null) { if (deviceId == null) {
deviceId = deviceIdMaybe; deviceId = deviceIdMaybe;

View File

@ -10,6 +10,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*; import org.jetlinks.core.message.property.*;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import reactor.test.StepVerifier;
import java.util.Collections; import java.util.Collections;
@ -26,14 +27,11 @@ public class BinaryMessageTypeTest {
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer()); ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
System.out.println(ByteBufUtil.prettyHexDump(byteBuf)); System.out.println(ByteBufUtil.prettyHexDump(byteBuf));
ByteBuf buf = Unpooled
System.out.println(ByteBufUtil.prettyHexDump(Unpooled
.buffer() .buffer()
.writeInt(byteBuf.readableBytes()) .writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf); .writeBytes(byteBuf)));
System.out.println(ByteBufUtil.prettyHexDump(buf));
//登录报文
System.out.println(ByteBufUtil.hexDump(buf));
} }
@Test @Test
@ -99,20 +97,20 @@ public class BinaryMessageTypeTest {
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer()); ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
// System.out.println(ByteBufUtil.prettyHexDump(data)); System.out.println("TCP报文: "+ByteBufUtil.hexDump(Unpooled
ByteBuf buf = Unpooled.buffer() .buffer()
.writeInt(data.readableBytes()) .writeInt(data.readableBytes())
.writeBytes(data); .writeBytes(data.duplicate())));
System.out.println(ByteBufUtil.prettyHexDump(buf));
System.out.println(ByteBufUtil.hexDump(buf)); System.out.println(ByteBufUtil.prettyHexDump(data));
//将长度字节读取后直接解析报文正文
buf.readInt(); DeviceMessage read = BinaryMessageType.read(data);
DeviceMessage read = BinaryMessageType.read(buf);
if (null != read.getHeaders()) { if (null != read.getHeaders()) {
read.getHeaders().forEach(message::addHeader); read.getHeaders().forEach(message::addHeader);
} }
System.out.println(read); System.out.println(read);
//tcp时 发送的完整报文.
Assert.assertEquals(read.toString(), message.toString()); Assert.assertEquals(read.toString(), message.toString());
} }