Compare commits

...

66 Commits
1.0 ... v3

Author SHA1 Message Date
PengyuDeng ff7e796a09
doc: 完善TCP文档内容 (#16)
* doc: 完善TCP文档内容

* doc: 增加设备上报示例

* doc: 增加请求设备上线示例

* doc: 文档增加byteBuf构建示例

* doc: 使用databuffer构建报文
2024-01-24 19:22:33 +08:00
PengyuDeng 7b9a4cf128
doc: 修复TCP协议解析注释错误 (#15) 2024-01-05 11:27:33 +08:00
老周 0bf5111e31
Update pom.xml 2023-08-03 16:40:12 +08:00
老周 ec45b7c16d
Merge pull request #14 from 445990772/v3
fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示
2023-08-02 09:17:44 +08:00
李洁 fe8ef8a1c4 fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示 2023-08-01 18:04:01 +08:00
445990772 d006a81847
Merge pull request #1 from 445990772/fix-bug
fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示
2023-08-01 17:50:40 +08:00
李洁 a01497f0db fix(单元测试): 单元测试类生成报文无法上线,优化输出报文显示 2023-08-01 17:45:19 +08:00
ht-lijie 9f09f83d95 build(官方协议): 更新官方协议jar包 2023-04-04 17:30:41 +08:00
zhouhao e0dc88251e 3.0.0 2023-04-04 09:43:48 +08:00
老周 317dd7df8d
Merge pull request #10 from kyouji/v3
fix(TCP协议): 优化字符串中文乱码的问题
2023-03-09 10:22:29 +08:00
zhangji 216946bc45 fix(TCP协议): 优化字符串中文乱码的问题 2023-03-09 10:12:47 +08:00
zhouhao 28b35d86da 3.0.0 2023-02-24 11:20:48 +08:00
老周 f952f7ba45
Merge pull request #9 from kyouji/v3
优化文档链接
2023-02-13 15:58:38 +08:00
zhangji a1d408a899 优化文档链接 2023-02-13 13:52:05 +08:00
zhouhao 5d4c2c655e 优化 2023-02-03 19:06:13 +08:00
zhouhao e506623e42 update package 2023-01-10 17:26:39 +08:00
zhouhao 5ec684c578 websocket支持 2023-01-10 17:16:39 +08:00
zhouhao 57b0cb0a73 优化 2022-12-26 14:00:10 +08:00
老周 b92c9b5564
Merge pull request #8 from bestfeng1020/event-support
添加3.0协议包事件支持
2022-12-13 18:45:16 +08:00
liujq 598c3f5f5a 添加3.0协议包事件支持 2022-12-13 17:59:37 +08:00
zhouhao a5bf08ae7d update package 2022-12-06 16:08:12 +08:00
zhouhao b478566fc1 使用新的http逻辑 2022-12-06 11:18:41 +08:00
zhouhao a4876e49ce Merge remote-tracking branch 'origin/v3' into v3 2022-11-30 17:39:27 +08:00
zhouhao b2702bd1f7 new package 2022-11-30 17:38:47 +08:00
老周 50654cc65d
Update README.md 2022-11-30 17:35:50 +08:00
zhouhao 5058ddc709 优化说明 2022-11-30 16:55:38 +08:00
zhouhao c254a19ea9 优化tcp udp实现和说明 2022-11-30 16:52:30 +08:00
zhouhao ff37d0733c 时间戳大于0才设置 2022-11-24 21:27:18 +08:00
zhouhao 9a81923167 优化tcp消息 2022-11-24 18:21:52 +08:00
zhouhao 6d46169cc5 优化http处理 2022-11-02 17:45:16 +08:00
zhouhao 1227a8e6ce 重构设备数据存储 2022-08-24 17:59:10 +08:00
zhouhao 3cbbaa6643 优化 2022-08-11 18:07:23 +08:00
zhouhao cf21c84518 优化msgid 2022-08-10 09:22:11 +08:00
zhouhao b0c94bed6b 增加基本的tcp实现 2022-08-08 18:34:15 +08:00
zhouhao c93b5d11a6 增加更新固件消息回复 2022-07-19 14:29:03 +08:00
zhouhao 5e04b035ed 优化 2022-06-23 18:35:13 +08:00
zhouhao 07b7bdf16d 增加http 解析 2022-06-21 17:02:17 +08:00
zhouhao 8e0faae4db 优化 2022-06-20 17:33:07 +08:00
zhouhao 8de80583f6 升级依赖 2022-06-20 17:32:17 +08:00
zhouhao 8b2e692868 修复topic 路由错误 2022-03-27 11:58:19 +08:00
zhouhao d344dba0f1 v3 2022-03-11 09:29:34 +08:00
zhou-hao f670bf0bb5 jetlinks-supports 1.1.7 2021-10-14 11:27:54 +08:00
zhou-hao 21a0908d0a 增加时间同步 2021-07-30 18:46:09 +08:00
zhou-hao f38b76fc24 增加时间同步 2021-07-30 18:45:23 +08:00
zhou-hao 263ab0ebb3 优化说明 2021-07-29 11:30:05 +08:00
zhou-hao 60f3c3ccca 增加MQTT用户名密码生成工具 2021-07-29 11:17:24 +08:00
zhou-hao bc576fb8a6 event标识以topic中的为准 2021-07-28 15:06:08 +08:00
zhou-hao 5e326544f7 event标识以topic中的为准 2021-07-28 15:05:19 +08:00
zhou-hao 02dc934820 add ReadFirmwareMessage 2021-07-20 15:38:11 +08:00
zhou-hao 6e4ff8e32f NoArgsConstructor 2021-07-19 11:39:04 +08:00
老周 3c0e3aa046
Merge pull request #1 from jetlinks/v2
V2
2021-07-06 15:41:16 +08:00
zhou-hao c87c98aee3 拉取固件更新回复 2021-05-21 15:05:31 +08:00
zhou-hao fd8358ad0a 拉取固件更新回复 2021-05-21 15:04:48 +08:00
zhou-hao 4d194d2cc2 增加child-reply 2021-04-25 17:13:55 +08:00
zhou-hao 73dd691413 new package 2021-04-01 16:06:36 +08:00
zhou-hao 6aca5b336b add source plugin 2021-04-01 16:04:55 +08:00
zhou-hao 67c5e2c884 优化产品ID处理 2021-04-01 13:41:02 +08:00
zhou-hao 66a09c951c 优化设备消息 2021-03-17 17:06:15 +08:00
zhou-hao c160f00fea 增加上下线消息 2021-03-05 18:37:01 +08:00
zhou-hao 08d90485be upgrade 2021-01-26 16:42:22 +08:00
zhou-hao 36516ae5b1 不序列化null字段 2021-01-26 16:41:16 +08:00
zhou-hao 9ff537a4ea 修复encode错误 2021-01-26 16:40:57 +08:00
zhou-hao 53b349efee 2.0 2021-01-21 17:01:14 +08:00
zhou-hao 5ba0238f8b deploy 2021-01-21 17:00:52 +08:00
zhou-hao c642d17f4e 2.0-SNAPSHOT 2021-01-21 16:43:37 +08:00
zhou-hao 59c981bf95 优化协议 2021-01-21 11:21:28 +08:00
47 changed files with 3498 additions and 629 deletions

3
.gitignore vendored
View File

@ -24,4 +24,5 @@ hs_err_pid*
/static/
/upload
/ui/upload/
!/package/**
!/package/**
dependency-reduced-pom.xml

View File

@ -1,5 +1,61 @@
## JetLinks 官方设备接入协议
类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider`
JetLinks官方实现的设备接入协议,可用于参考实现自定义协议开发.
[查看说明](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html)
注意: 本协议仅用于参考自定义协议开发,在实际使用中请根据不同的场景进行调整.如认证方式,加密等.
### MQTT
[查看TOPIC说明](http://doc.jetlinks.cn/dev-guide/jetlinks-protocol-support.html)
用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/mqtt-auth-generator.html)
### HTTP
HTTP接入时需要使用`Bearer`
认证,URL和[MQTT的接入Topic]((http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html))一致.
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
### TCP
报文格式说明:
第0-4字节对应的32位整型值为接下来报文的长度,
后续为报文数据,
具体报文格式见: [二进制格式说明](binary-protocol.md)
创建连接后第一个数据包需要发送[认证包](binary-protocol.md#0x01-online-首次连接),
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
### UDP
报文格式说明:
第`0`字节表示认证类型,目前固定为0x00.
第`1-n`字节为`密钥信息`,编码使用`STRING`见: [数据类型定义](binary-protocol.md#数据类型)
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
后续为报文数据,具体报文格式见: [二进制格式说明](binary-protocol.md)
UDP无需发送认证包,但是需要每个报文中都包含密钥信息.
除了ACK以外,其他平台下发的指令也都会包含认证密钥信息,用于设备侧校验请求.
### 测试
可以使用[模拟器](http://github.com/jetlinks/device-simulator)进行模拟测试

167
binary-protocol.md Normal file
View File

@ -0,0 +1,167 @@
## 头信息
| 字节数 | 类型 | 字段 | 备注 |
| :----: | ------- | :--------------------: | ------------------- |
| n | 自定 | 消息长度 | 平台配置项 |
| 1 | INT8 | 消息类型 | 见消息类型定义 |
| 8 | INT64 | UTC时间戳 | |
| 2 | INT16 | 消息序号 | |
| 2 | INT16 | 设备ID长度 | |
| n | STRING | 设备ID | 根据设备ID长度得出 |
| n | MESSAGE | 消息类型对应的编码规则 | |
| 2 | INT16 | secureKeyLength | TCP认真配置密钥长度 |
| n | STRING | secureKey | 密钥 |
## 数据类型
所有数据类型均采用`大端`编码
| Byte | Type | 编码规则 | 备注 |
| :--: | :-----: | ------------------------------------------------------------ | ------------------------------------------------------------ |
| 0x00 | NULL | | |
| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true | |
| 0x02 | INT8 | 1字节 (byte) | |
| 0x03 | INT16 | 2字节整型 (short) | |
| 0x04 | INT32 | 4字节整型 (int) | |
| 0x05 | INT64 | 8字节整型 (long) | |
| 0x06 | UINT8 | 1字节无符号整型 | |
| 0x07 | UINT16 | 2字节无符号整型 | |
| 0x08 | UINT32 | 4字节无符号整型 | |
| 0x09 | FLOAT | 4字节 IEEE 754浮点数 | |
| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 | |
| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 | 2+N2个字节UnsignedShort表示N的长度 |
| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 | 2+N2个字节UnsignedShort表示N的长度 |
| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 | 2+N2个字节UnsignedShort表示ARRAY的长度N=很多*(1+X)1个字节UnsignedShort表示X的数据类型X表示长度见上几行。 |
| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value | 2+N2个字节UnsignedShort表示OBJECT的长度,N是STRING+数据类型的组合了(见上几行)。 |
## 消息类型定义
| Byte | Type | 说明 | Message示例 |
| :--: | :----------------- | ------------ | ------------------------------------------------------------ |
| 0x00 | keepalive | 心跳 | |
| 0x01 | online | 首次连接 | [STRING:密钥信息 ] |
| 0x02 | ack | 应答 | [应答码 ]<br />应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持. |
| 0x03 | reportProperty | 上报属性 | [属性数据:OBJECT类型 ] |
| 0x04 | readProperty | 读取属性 | [属性列表:ARRAY类型 ] |
| 0x05 | readPropertyReply | 读取属性回复 | 读取成功:[`0x01`,属性数据:OBJECT类型 ]<br />读取失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
| 0x06 | writeProperty | 修改属性 | [属性列表:OBJECT类型 ] |
| 0x07 | writePropertyReply | 修改属性回复 | 修改成功:[`0x01`,属性数据:OBJECT类型 ]<br />修改失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
| 0x08 | function | 功能调用 | [功能ID:STRING类型,功能参数:OBJECT类型 ] |
| 0x09 | functionReply | 功能调用回复 | 调用成功:[`0x01`,属性数据:OBJECT类型 ]<br />调用失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
### 备注
`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型.
如: 无错误信息
[ 0x00,`0x00`,`0x00` ]
`INT8(0x02)`类型错误码:`0x04`
[0x00,`0x02,0x04`,0x00 ]
## 示例
### 设备上线
```
000000270100000186c51a890f0001001331363531383533343133303332383934343634000561646d696e
```
```java
String hexForOnline =
"00000027" +//消息长度
"01" + //请求上线
"00000186c51a890f" +//时间戳
"0001" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0005" +//secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一使用byteBuf构建上线消息
String deviceId = "1651853413032894464";
String secureKey = "admin";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x01)
.writeLong(System.currentTimeMillis())
.writeShort(1)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf onlineBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二使用jetlinks-core构建消息
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("1");
message.setTimestamp(1678344096015L);
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
```
### 设备数据上报
```
0000006C0300000186C567FA7900020013313635313835333431333033323839343436340001000474656d700B000433362e35000561646d696e
```
```java
String hexForReport =
"0000006C" +//消息长度
"03" +//上报消息
"00000186C567FA79" + //时间戳
"0002" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0001" + //OBJECT对象数量
"0004" +//key的长度
"74656d70" + //值
"0B" + //value的类型
"0004" +//Value的长度
"33362e35" + //值
"0005" + //secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一
String deviceId = "1651853413032894464";
String secureKey = "admin";
String key = "temp";
String value = "36.5";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x03)
.writeLong(System.currentTimeMillis())
.writeShort(2)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(1)
.writeShort(key.getBytes().length)
.writeBytes(key.getBytes())
.writeByte(0x0B)
.writeShort(value.getBytes().length)
.writeBytes(value.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf reportBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("2");
message.setProperties(Collections.singletonMap("temp", 32.88));
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
```

Binary file not shown.

208
pom.xml
View File

@ -6,17 +6,17 @@
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0</version>
<name>JetLinks</name>
<url>http://jetlinks.org</url>
<url>https://jetlinks.org</url>
<inceptionYear>2019</inceptionYear>
<description>JetLinks 物联网平台</description>
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
@ -47,9 +47,129 @@
<spring.boot.version>2.2.8.RELEASE</spring.boot.version>
<hsweb.framework.version>4.0.3</hsweb.framework.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<reactor.version>Dysprosium-RELEASE</reactor.version>
<reactor.version>2020.0.6</reactor.version>
</properties>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.3</version>
<extensions>true</extensions>
<configuration>
<serverId>sonatype-releases</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
<stagingProgressTimeoutMinutes>120</stagingProgressTimeoutMinutes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
<useReleaseProfile>false</useReleaseProfile>
<releaseProfiles>release</releaseProfiles>
<goals>deploy</goals>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>sonatype-releases</id>
<name>sonatype repository</name>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
</repository>
<snapshotRepository>
<id>sonatype-snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
</profile>
<profile>
<id>all-in-one</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置主类 -->
<mainClass>org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec</mainClass>
</transformer>
</transformers>
<!-- <artifactSet>-->
<!-- <includes>-->
<!-- &lt;!&ndash; 添加需要打包在一起的第三方依赖信息 &ndash;&gt;-->
<!-- &lt;!&ndash; <include>com.domain:*</include> &ndash;&gt;-->
<!-- </includes>-->
<!-- </artifactSet>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
@ -61,6 +181,21 @@
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
@ -69,37 +204,88 @@
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.1.4-SNAPSHOT</version>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
<version>2.2.3</version>
<version>3.5.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.3.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>2.11.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<distributionManagement>
<repository>
<id>releases</id>
<name>Nexus Release Repository</name>
<url>https://nexus.jetlinks.cn/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>https://nexus.jetlinks.cn/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<repositories>
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>http://nexus.hsweb.me/content/groups/public/</url>
<url>https://nexus.jetlinks.cn/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
@ -108,9 +294,9 @@
<repository>
<id>aliyun-nexus</id>
<name>aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>
</project>

View File

@ -0,0 +1,81 @@
package org.jetlinks.protocol.official;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Slf4j
public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec {
protected abstract Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response);
protected String getPath(CoapMessage message){
String path = message.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
}
return path;
}
protected String getDeviceId(CoapMessage message){
String deviceId = message.getStringOption(2100).orElse(null);
String[] paths = TopicMessageCodec.removeProductPath(getPath(message));
if (StringUtils.isEmpty(deviceId) && paths.length > 1) {
deviceId = paths[1];
}
return deviceId;
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
AtomicBoolean alreadyReply = new AtomicBoolean();
Consumer<Object> responseHandler = (resp) -> {
if (alreadyReply.compareAndSet(false, true)) {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
if (resp instanceof byte[]) {
exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp));
}
}
};
return this
.decode(exchangeMessage, context, responseHandler)
.doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST);
})
.switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST)));
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
@Nonnull
@Override
public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -0,0 +1,73 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.functional.TimeSyncRequest;
import org.jetlinks.protocol.official.functional.TimeSyncResponse;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.function.Function;
/**
* 功能性的topic,不和平台交互
*/
public enum FunctionalTopicHandlers {
//同步时间
timeSync("/*/*/time-sync") {
@SneakyThrows
@SuppressWarnings("all")
Mono<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
TopicPayload topicPayload = new TopicPayload();
topicPayload.setTopic(String.join("/", topic) + "/reply");
TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class);
TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis());
topicPayload.setPayload(mapper.writeValueAsBytes(response));
//直接回复给设备
return sender
.apply(topicPayload)
.then(Mono.empty());
}
};
FunctionalTopicHandlers(String topic) {
this.pattern = topic.split("/");
}
private final String[] pattern;
abstract Publisher<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender);
public static Publisher<DeviceMessage> handle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
return Mono
.justOrEmpty(fromTopic(topic))
.flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender));
}
static Optional<FunctionalTopicHandlers> fromTopic(String[] topic) {
for (FunctionalTopicHandlers value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
}

View File

@ -1,22 +1,38 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
@ -24,55 +40,79 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
}
public Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
return Mono.defer(() -> {
String path = message.getPath();
public Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
if (context.getDevice() == null) {
return Flux.empty();
}
return Flux.defer(() -> {
String path = getPath(message);
String deviceId = getDeviceId(message);
String sign = message.getStringOption(2110).orElse(null);
String token = message.getStringOption(2111).orElse(null);
String payload = message.getPayload().toString(StandardCharsets.UTF_8);
if ("/auth".equals(path)) {
byte[] payload = message.payloadAsBytes();
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
if (StringUtils.isEmpty(deviceId)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
// TODO: 2021/7/30 移到 FunctionalTopicHandlers
if (path.endsWith("/request-token")) {
//认证
return context.getDevice()
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return context.getDevice()
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
})
return context
.getDevice(deviceId)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMap(device -> device
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, deviceId, payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return device
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
}))
.then(Mono.empty());
}
if (StringUtils.isEmpty(token)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return context.getDevice()
.getConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
}))
.flatMap(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return Mono
.just(decode(path, JSON.parseObject(payload)).getMessage())
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
})
.doOnSuccess(msg -> {
response.accept(CoAP.ResponseCode.CREATED);
})
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getSelfConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}))
.doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
response.accept(CoAP.ResponseCode.BAD_REQUEST);
@ -81,41 +121,18 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
}
@Nonnull
@Override
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
return decode(exchangeMessage, context, resp -> {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
});
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Mono.empty();
}
protected boolean verifySign(String secureKey, String deviceId, String payload, String sign) {
protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) {
//验证签名
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(payload.concat(secureKey)).equalsIgnoreCase(sign)) {
byte[] secureKeyBytes = secureKey.getBytes();
byte[] signPayload = Arrays.copyOf(payload, payload.length + secureKeyBytes.length);
System.arraycopy(secureKeyBytes, 0, signPayload, 0, secureKeyBytes.length);
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(signPayload).equalsIgnoreCase(sign)) {
log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
return false;
}
return true;
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -1,85 +1,75 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.jetlinks.core.Value;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
public static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP;
}
protected JSONObject decode(String text) {
return JSON.parseObject(text);
}
protected Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context) {
String path = message.getPath();
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
String path = getPath(message);
String deviceId = getDeviceId(message);
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
return context
.getDevice()
.getConfigs("encAlg", "secureKey")
.flatMap(configs -> {
Ciphers ciphers = configs.getValue("encAlg").map(Value::asString).flatMap(Ciphers::of).orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
ByteBuf byteBuf = message.getPayload();
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
byteBuf.resetReaderIndex();
String payload = new String(ciphers.decrypt(req, secureKey));
//解码
return Mono.just(decode(path, decode(payload)).getMessage());
});
.getDevice(deviceId)
.flatMapMany(device -> device
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey);
//解码
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}));
}
protected Mono<? extends Message> decode(CoapExchangeMessage message, MessageDecodeContext context) {
CoapExchange exchange = message.getExchange();
return decode((CoapMessage) message, context)
.doOnSuccess(msg -> {
exchange.respond(CoAP.ResponseCode.CREATED);
exchange.accept();
})
.switchIfEmpty(Mono.fromRunnable(() -> {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
}))
.doOnError(error -> {
log.error("decode coap message error", error);
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
});
}
@Nonnull
@Override
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
return Mono.defer(() -> {
log.debug("handle coap message:\n{}", context.getMessage());
if (context.getMessage() instanceof CoapExchangeMessage) {
return decode(((CoapExchangeMessage) context.getMessage()), context);
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context);
}
return Mono.empty();
});
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -2,16 +2,20 @@ package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.server.session.DeviceSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* <pre>
@ -49,13 +53,15 @@ import java.nio.charset.StandardCharsets;
* @since 1.0.0
*/
@Slf4j
public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
private Transport transport;
private final Transport transport;
private final ObjectMapper mapper;
public JetLinksMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public JetLinksMqttDeviceMessageCodec() {
@ -71,21 +77,32 @@ public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec im
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.defer(() -> {
Message message = context.getMessage();
if (message instanceof DisconnectDeviceMessage) {
return ((ToDeviceMessageContext) context)
.disconnect()
.then(Mono.empty());
}
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
EncodedTopic convertResult = encode(deviceMessage.getDeviceId(), deviceMessage);
TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
return context.getDevice()
.getConfig(DeviceConfigKey.productId)
return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage.builder()
.map(productId -> SimpleMqttMessage
.builder()
.clientId(deviceMessage.getDeviceId())
.topic("/" .concat(productId).concat(convertResult.topic))
.topic("/".concat(productId).concat(convertResult.getTopic()))
.payloadType(MessagePayloadType.JSON)
.payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(convertResult.payload)))
.payload(Unpooled.wrappedBuffer(convertResult.getPayload()))
.build());
} else {
return Mono.empty();
@ -95,18 +112,45 @@ public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec im
@Nonnull
@Override
public Mono<Message> decode(@Nonnull MessageDecodeContext context) {
return Mono.fromSupplier(() -> {
MqttMessage message = (MqttMessage) context.getMessage();
String topic = message.getTopic();
String jsonData = message.getPayload().toString(StandardCharsets.UTF_8);
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
MqttMessage message = (MqttMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(context.getDevice(),
message.getTopic().split("/"),
payload,
mapper,
reply -> doReply(context, reply)))
;
}
private Mono<Void> doReply(MessageCodecContext context, TopicPayload reply) {
if (context instanceof FromDeviceMessageContext) {
return ((FromDeviceMessageContext) context)
.getSession()
.send(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
} else if (context instanceof ToDeviceMessageContext) {
return ((ToDeviceMessageContext) context)
.sendToDevice(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
}
return Mono.empty();
JSONObject object = JSON.parseObject(jsonData, JSONObject.class);
if (object == null) {
throw new UnsupportedOperationException("cannot parse payload:{}" + jsonData);
}
return decode(topic, object).getMessage();
});
}
}

View File

@ -3,68 +3,124 @@ package org.jetlinks.protocol.official;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec;
import org.jetlinks.protocol.official.udp.UDPDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
"MQTT认证配置"
, "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" +
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
.add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) {
return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport();
support.setId("jetlinks.v1.0");
support.setName("JetLinks V1.0");
support.setDescription("JetLinks Protocol Version 1.0");
support.setId("jetlinks.v3.0");
support.setName("JetLinks V3.0");
support.setDescription("JetLinks Protocol Version 3.0");
support.addRoutes(DefaultTransport.MQTT, Arrays
.stream(TopicMessageCodec.values())
.map(TopicMessageCodec::getRoute)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.MQTT,
"document-mqtt.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addRoutes(DefaultTransport.HTTP, Stream
.of(TopicMessageCodec.reportProperty,
TopicMessageCodec.event,
TopicMessageCodec.online,
TopicMessageCodec.offline)
.map(TopicMessageCodec::getRoute)
.filter(route -> route != null && route.isUpstream())
.map(route -> HttpRoute
.builder()
.address(route.getTopic())
.group(route.getGroup())
.contentType(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST)
.description(route.getDescription())
.example(route.getExample())
.build())
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.HTTP,
"document-http.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig);
support.addConfigMetadata(DefaultTransport.CoAP, coapConfig);
support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig);
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT));
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
//TCP
support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig);
support.addMessageCodecSupport(new TcpDeviceMessageCodec());
support.setDocument(DefaultTransport.TCP,
"document-tcp.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
//UDP
support.addConfigMetadata(DefaultTransport.UDP, UDPDeviceMessageCodec.udpConfig);
support.addMessageCodecSupport(new UDPDeviceMessageCodec());
//MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());
//HTTP
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//Websocket
JetLinksHttpDeviceMessageCodec codec = new JetLinksHttpDeviceMessageCodec(DefaultTransport.WebSocket);
support.addMessageCodecSupport(codec);
support.addAuthenticator(DefaultTransport.WebSocket, codec);
support.addRoutes(
DefaultTransport.WebSocket,
Collections.singleton(
WebsocketRoute
.builder()
.path("/{productId:产品ID}/{productId:设备ID}/socket")
.description("通过Websocket接入平台")
.build()
));
//CoAP
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
return Mono.just(support);

View File

@ -1,290 +0,0 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.supports.utils.MqttTopicUtils;
import org.springframework.util.Assert;
import java.util.Map;
import java.util.Optional;
class JetlinksTopicMessageCodec {
@Getter
protected class DecodeResult {
private Map<String, String> args;
private boolean child;
private boolean event;
private boolean readPropertyReply;
private boolean writePropertyReply;
private boolean functionInvokeReply;
private boolean reportProperties;
private boolean derivedMetadata;
private boolean register;
private boolean unregister;
private boolean requestFirmware;
private boolean reportFirmware;
private boolean upgradeFirmwareProgress;
private boolean readFirmwareReply;
public DecodeResult(String topic) {
this.topic = topic;
args = TopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
if (topic.contains("child")) {
child = true;
args.putAll(TopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
}
if (topic.contains("event")) {
event = true;
args.putAll(TopicUtils.getPathVariables("/**/event/{eventId}", topic));
}
derivedMetadata = topic.endsWith("metadata/derived");
if (event) {
} else if (reportProperties = topic.endsWith("properties/report")) {
} else if (unregister = topic.endsWith("unregister")) {
} else if (register = topic.endsWith("register")) {
} else if (readPropertyReply = topic.endsWith("properties/read/reply")) {
} else if (writePropertyReply = topic.endsWith("properties/write/reply")) {
} else if (functionInvokeReply = topic.endsWith("function/invoke/reply")) {
} else if (upgradeFirmwareProgress = topic.endsWith("firmware/upgrade/progress")) {
} else if (requestFirmware = topic.endsWith("firmware/pull")) {
} else if (reportFirmware = topic.endsWith("firmware/report")) {
} else if (readFirmwareReply = topic.endsWith("firmware/read/reply")) {
} else if (derivedMetadata = topic.endsWith("metadata/derived")) {
}
}
private final String topic;
public String getDeviceId() {
return args.get("deviceId");
}
public String getChildDeviceId() {
return args.get("childDeviceId");
}
protected Message message;
}
protected EncodedTopic encode(String deviceId, Message message) {
Assert.hasText(deviceId, "deviceId can not be null");
Assert.notNull(message, "message can not be null");
if (message instanceof ReadPropertyMessage) {
String topic = "/" .concat(deviceId).concat("/properties/read");
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("properties", ((ReadPropertyMessage) message).getProperties());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof WritePropertyMessage) {
String topic = "/" .concat(deviceId).concat("/properties/write");
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("properties", ((WritePropertyMessage) message).getProperties());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof FunctionInvokeMessage) {
String topic = "/" .concat(deviceId).concat("/function/invoke");
FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message);
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("function", invokeMessage.getFunctionId());
mqttData.put("inputs", invokeMessage.getInputs());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof UpgradeFirmwareMessage) {
String topic = "/" .concat(deviceId).concat("/firmware/upgrade");
UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message);
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("url", firmwareMessage.getUrl());
mqttData.put("sign", firmwareMessage.getSign());
mqttData.put("version", firmwareMessage.getVersion());
mqttData.put("signMethod", firmwareMessage.getSignMethod());
mqttData.put("parameters", firmwareMessage.getParameters());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof ReadFirmwareMessage) {
String topic = "/" .concat(deviceId).concat("/firmware/read");
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof RequestFirmwareMessageReply) {
String topic = "/" .concat(deviceId).concat("/firmware/pull/reply");
RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message);
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("url", firmwareMessage.getUrl());
mqttData.put("sign", firmwareMessage.getSign());
mqttData.put("version", firmwareMessage.getVersion());
mqttData.put("signMethod", firmwareMessage.getSignMethod());
mqttData.put("parameters", firmwareMessage.getParameters());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof ChildDeviceMessage) {
ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message);
EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
String topic = "/" .concat(deviceId).concat("/child").concat(result.topic);
result.payload.put("deviceId", childDeviceMessage.getChildDeviceId());
return new EncodedTopic(topic, result.payload);
}
return null;
}
protected DecodeResult decode(String topic, JSONObject object) {
DecodeResult result = new DecodeResult(topic);
Message message = null;
if (result.isEvent()) {
message = decodeEvent(result, object);
} else if (result.isReportProperties()) {
message = decodeReportPropertyReply(result, object);
} else if (result.isReadPropertyReply()) {
message = decodeReadPropertyReply(result, object);
} else if (result.isWritePropertyReply()) {
message = decodeWritePropertyReply(result, object);
} else if (result.isFunctionInvokeReply()) {
message = decodeInvokeReply(result, object);
} else if (result.isRegister()) {
message = decodeRegister(result, object);
} else if (result.isUnregister()) {
message = decodeUnregister(result, object);
} else if (result.isDerivedMetadata()) {
message = decodeDerivedMetadata(result, object);
} else if (result.isReadFirmwareReply()) {
message = object.toJavaObject(ReadFirmwareMessageReply.class);
} else if (result.isRequestFirmware()) {
message = object.toJavaObject(RequestFirmwareMessage.class);
} else if (result.isReportFirmware()) {
message = object.toJavaObject(ReportFirmwareMessage.class);
} else if (result.isUpgradeFirmwareProgress()) {
message = object.toJavaObject(UpgradeFirmwareProgressMessage.class);
}else if (topic.endsWith("connected")) {
message = object.toJavaObject(DeviceOnlineMessage.class);
} else if (topic.endsWith("disconnect")) {
message = object.toJavaObject(DeviceOfflineMessage.class);
}
if (result.isChild()) {
if (message == null) {
throw new UnsupportedOperationException("unsupported topic:" + topic);
}
applyCommons(message, result, object);
ChildDeviceMessage children = new ChildDeviceMessage();
children.setChildDeviceId(result.getChildDeviceId());
children.setDeviceId(result.getDeviceId());
children.setChildDeviceMessage(message);
children.setTimestamp(Optional.ofNullable(object.getLong("timestamp")).orElse(System.currentTimeMillis()));
Optional.ofNullable(object.getString("messageId")).ifPresent(children::setMessageId);
result.message = children;
} else {
if (message == null) {
throw new UnsupportedOperationException("unsupported topic:" + topic);
}
applyCommons(message, result, object);
result.message = message;
}
return result;
}
private Message decodeEvent(DecodeResult result, JSONObject event) {
EventMessage message = event.toJavaObject(EventMessage.class);
message.setData(event.get("data"));
message.setEvent(result.args.get("eventId"));
return message;
}
private Message decodeReadPropertyReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(ReadPropertyMessageReply.class);
}
private Message decodeReportPropertyReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(ReportPropertyMessage.class);
}
private Message decodeWritePropertyReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(WritePropertyMessageReply.class);
}
private Message decodeInvokeReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(FunctionInvokeMessageReply.class);
}
private Message decodeRegister(DecodeResult result, JSONObject data) {
return data.toJavaObject(DeviceRegisterMessage.class);
}
private Message decodeUnregister(DecodeResult result, JSONObject data) {
return data.toJavaObject(DeviceUnRegisterMessage.class);
}
private Message decodeDerivedMetadata(DecodeResult result, JSONObject data) {
return data.toJavaObject(DerivedMetadataMessage.class);
}
private void applyCommons(Message message, DecodeResult result, JSONObject data) {
if (message instanceof CommonDeviceMessageReply) {
CommonDeviceMessageReply reply = ((CommonDeviceMessageReply) message);
reply.setSuccess(Optional.ofNullable(data.getBoolean("success")).orElse(true));
reply.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
if (result.isChild()) {
reply.setDeviceId(result.getChildDeviceId());
} else {
reply.setDeviceId(result.getDeviceId());
}
}
if (message instanceof CommonDeviceMessage) {
CommonDeviceMessage msg = ((CommonDeviceMessage) message);
msg.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
if (result.isChild()) {
msg.setDeviceId(result.getChildDeviceId());
} else {
msg.setDeviceId(result.getDeviceId());
}
}
}
@Getter
@Setter
@AllArgsConstructor
protected class EncodedTopic {
String topic;
JSONObject payload;
}
@Getter
@Setter
protected class Decoded {
Message message;
}
}

View File

@ -0,0 +1,28 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
public class ObjectMappers {
public static final ObjectMapper JSON_MAPPER;
public static final ObjectMapper CBOR_MAPPER;
static {
JSON_MAPPER = Jackson2ObjectMapperBuilder
.json()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
;
CBOR_MAPPER = Jackson2ObjectMapperBuilder
.cbor()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
}

View File

@ -0,0 +1,403 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Function;
public enum TopicMessageCodec {
//上报属性数据
reportProperty("/*/properties/report",
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性
readProperty("/*/properties/read",
ReadPropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
//读取属性回复
readPropertyReply("/*/properties/read/reply",
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性
writeProperty("/*/properties/write",
WritePropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性回复
writePropertyReply("/*/properties/write/reply",
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//事件上报
event("/*/event/*",
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{eventId:事件ID}";
}
@Override
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String event = topic[topic.length - 1];
return Mono.from(super.doDecode(mapper, topic, payload))
.cast(EventMessage.class)
.doOnNext(e -> e.setEvent(event))
.cast(DeviceMessage.class);
}
@Override
void refactorTopic(String[] topics, DeviceMessage message) {
super.refactorTopic(topics, message);
EventMessage event = ((EventMessage) message);
topics[topics.length - 1] = String.valueOf(event.getEvent());
}
},
//调用功能
functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
//调用功能回复
functionInvokeReply("/*/function/invoke/reply",
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
//子设备消息
child("/*/child/*/**",
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessage msg = new ChildDeviceMessage();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessage deviceMessage = ((ChildDeviceMessage) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
}
}, //子设备消息回复
childReply("/*/child-reply/*/**",
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessageReply msg = new ChildDeviceMessageReply();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
}
},
//更新标签
updateTag("/*/tags",
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
//注册
register("/*/register", DeviceRegisterMessage.class),
//注销
unregister("/*/unregister", DeviceUnRegisterMessage.class),
//更新固件消息
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
//更新固件消息回复
upgradeFirmwareReply("/*/firmware/upgrade/reply", UpgradeFirmwareMessageReply.class),
//更新固件升级进度消息
upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
//拉取固件
requestFirmware("/*/firmware/pull", RequestFirmwareMessage.class),
//拉取固件更新回复
requestFirmwareReply("/*/firmware/pull/reply", RequestFirmwareMessageReply.class),
//上报固件版本
reportFirmware("/*/firmware/report", ReportFirmwareMessage.class),
//读取固件回复
readFirmware("/*/firmware/read", ReadFirmwareMessage.class),
//读取固件回复
readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class),
//派生物模型上报
derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class),
//透传设备消息
direct("/*/direct", DirectDeviceMessage.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
DirectDeviceMessage message = new DirectDeviceMessage();
message.setDeviceId(topic[1]);
message.setPayload(payload);
return Mono.just(message);
}
},
//断开连接消息
disconnect("/*/disconnect", DisconnectDeviceMessage.class),
//断开连接回复
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
//上线
online("/*/online", DeviceOnlineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备上线")),
//离线
offline("/*/offline", DeviceOfflineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备离线")),
//日志
log("/*/log", DeviceLogMessage.class),
//状态检查
stateCheck("/*/state-check", DeviceStateCheckMessage.class),
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
;
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type,
Function<MqttRoute.Builder, MqttRoute.Builder> routeCustom) {
this.pattern = topic.split("/");
this.type = type;
this.route = routeCustom.apply(toRoute()).build();
}
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
this.route = null;
}
private final String[] pattern;
private final MqttRoute route;
private final Class<? extends DeviceMessage> type;
protected void transMqttTopic(String[] topic) {
}
@SneakyThrows
private MqttRoute.Builder toRoute() {
String[] topics = new String[pattern.length];
System.arraycopy(pattern, 0, topics, 0, pattern.length);
topics[0] = "{productId:产品ID}";
topics[1] = "{deviceId:设备ID}";
transMqttTopic(topics);
StringJoiner joiner = new StringJoiner("/", "/", "");
for (String topic : topics) {
joiner.add(topic);
}
return MqttRoute
.builder(joiner.toString())
.qos(1);
}
public MqttRoute getRoute() {
return route;
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
return Mono
.justOrEmpty(fromTopic(topics))
.flatMapMany(topicMessageCodec -> topicMessageCodec.doDecode(mapper, topics, payload));
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String topic, byte[] payload) {
return decode(mapper, topic.split("/"), payload);
}
public static TopicPayload encode(ObjectMapper mapper, DeviceMessage message) {
return fromMessage(message)
.orElseThrow(() -> new UnsupportedOperationException("unsupported message:" + message.getMessageType()))
.doEncode(mapper, message);
}
static Optional<TopicMessageCodec> fromTopic(String[] topic) {
for (TopicMessageCodec value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
static Optional<TopicMessageCodec> fromMessage(DeviceMessage message) {
for (TopicMessageCodec value : values()) {
if (value.type == message.getClass()) {
return Optional.of(value);
}
}
return Optional.empty();
}
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
return Mono
.fromCallable(() -> {
DeviceMessage message = mapper.readValue(payload, type);
FastBeanCopier.copy(Collections.singletonMap("deviceId", topic[1]), message);
return message;
});
}
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
refactorTopic(topics, message);
return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message));
}
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) {
String[] topics = Arrays.copyOf(pattern, pattern.length);
return doEncode(mapper, topics, message);
}
void refactorTopic(String[] topics, DeviceMessage message) {
topics[1] = message.getDeviceId();
}
/**
* 移除topic中的产品信息,topic第一个层为产品ID在解码时,不需要此信息,所以需要移除之.
*
* @param topic topic
* @return 移除后的topic
*/
public static String[] removeProductPath(String topic) {
if (!topic.startsWith("/")) {
topic = "/" + topic;
}
String[] topicArr = topic.split("/");
String[] topics = Arrays.copyOfRange(topicArr, 1, topicArr.length);
topics[0] = "";
return topics;
}
}

View File

@ -0,0 +1,17 @@
package org.jetlinks.protocol.official;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TopicPayload {
private String topic;
private byte[] payload;
}

View File

@ -0,0 +1,7 @@
package org.jetlinks.protocol.official.binary;
public enum AckCode {
ok,
noAuth,
unsupportedMessage
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
public class BinaryAcknowledgeDeviceMessage implements BinaryMessage<AcknowledgeDeviceMessage> {
public static final HeaderKey<String> codeHeader = HeaderKey.of("code", AckCode.ok.name());
private AcknowledgeDeviceMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.ack;
}
@Override
public void read(ByteBuf buf) {
message = new AcknowledgeDeviceMessage();
AckCode code = AckCode.values()[buf.readUnsignedByte()];
message.addHeader(codeHeader, code.name());
}
@Override
public void write(ByteBuf buf) {
AckCode code = AckCode.valueOf(this.message.getHeaderOrDefault(codeHeader));
buf.writeByte(code.ordinal());
}
@Override
public void setMessage(AcknowledgeDeviceMessage message) {
this.message = message;
}
@Override
public AcknowledgeDeviceMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,45 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
/**
*
*/
public class BinaryDeviceOnlineMessage implements BinaryMessage<DeviceOnlineMessage> {
public static final HeaderKey<String> loginToken = HeaderKey.of("token", null);
private DeviceOnlineMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.online;
}
@Override
public void read(ByteBuf buf) {
message = new DeviceOnlineMessage();
message.addHeader(loginToken, (String) DataType.STRING.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING
.write(
buf, message.getHeader(loginToken).orElse("")
);
}
@Override
public void setMessage(DeviceOnlineMessage message) {
this.message = message;
}
@Override
public DeviceOnlineMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,46 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.event.EventMessage;
/**
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryEventMessage implements BinaryMessage<EventMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.event;
}
private EventMessage message;
@Override
public void read(ByteBuf buf) {
message = new EventMessage();
message.setEvent((String) DataType.STRING.read(buf));
message.setData(DataType.OBJECT.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getEvent());
DataType.OBJECT.write(buf, message.getData());
}
@Override
public void setMessage(EventMessage message) {
this.message = message;
}
@Override
public EventMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import java.util.Map;
import java.util.stream.Collectors;
public class BinaryFunctionInvokeMessage implements BinaryMessage<FunctionInvokeMessage> {
private FunctionInvokeMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.function;
}
@Override
public void read(ByteBuf buf) {
message = new FunctionInvokeMessage();
message.setFunctionId((String) DataType.STRING.read(buf));
@SuppressWarnings("all")
Map<String, Object> params = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setInputs(
params
.entrySet()
.stream()
.map(e -> new FunctionParameter(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getFunctionId());
DataType.OBJECT.write(buf,message.inputsToMap());
}
@Override
public void setMessage(FunctionInvokeMessage message) {
this.message = message;
}
@Override
public FunctionInvokeMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryFunctionInvokeMessageReply extends BinaryReplyMessage<FunctionInvokeMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.functionReply;
}
@Override
protected FunctionInvokeMessageReply newMessage() {
return new FunctionInvokeMessageReply();
}
@Override
protected void doReadSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
msg.setFunctionId((String) DataType.readFrom(buf));
msg.setOutput(DataType.readFrom(buf));
}
@Override
protected void doWriteSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
DataType.writeTo(getMessage().getFunctionId(), buf);
DataType.writeTo(msg.getOutput(), buf);
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessage;
import reactor.core.publisher.Flux;
public interface BinaryMessage<T extends DeviceMessage> {
BinaryMessageType getType();
void read(ByteBuf buf);
void write(ByteBuf buf);
void setMessage(T message);
T getMessage();
}

View File

@ -0,0 +1,223 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public enum BinaryMessageType {
//0x00
keepalive(null, null),
//0x01
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
//0x02
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
//0x03
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
//0x04
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
//0x05
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
writePropertyReply(WritePropertyMessageReply.class, BinaryWritePropertyMessageReply::new),
function(FunctionInvokeMessage.class, BinaryFunctionInvokeMessage::new),
functionReply(FunctionInvokeMessageReply.class, BinaryFunctionInvokeMessageReply::new),
event(EventMessage.class, BinaryEventMessage::new);
private final Class<? extends DeviceMessage> forDevice;
private final Supplier<BinaryMessage<DeviceMessage>> forTcp;
private static final BinaryMessageType[] VALUES = values();
public static final HeaderKey<Integer> HEADER_MSG_SEQ = HeaderKey.of("_seq", 0, Integer.class);
@SuppressWarnings("all")
BinaryMessageType(Class<? extends DeviceMessage> forDevice,
Supplier<? extends BinaryMessage<?>> forTcp) {
this.forDevice = forDevice;
this.forTcp = (Supplier) forTcp;
}
private static final Map<String, MsgIdHolder> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.<String, MsgIdHolder>build()
.asMap();
private static class MsgIdHolder {
private int msgId = 0;
private final Map<Integer, String> cached = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.<Integer, String>build()
.asMap();
public int next(String id) {
if (id == null) {
return -1;
}
do {
if (msgId++ < 0) {
msgId = 0;
}
} while (cached.putIfAbsent(msgId, id) != null);
return msgId;
}
public String getAndRemove(int id) {
if (id < 0) {
return null;
}
return cached.remove(id);
}
}
@SneakyThrows
private static MsgIdHolder takeHolder(String deviceId) {
return cache.computeIfAbsent(deviceId, (ignore) -> new MsgIdHolder());
}
public static ByteBuf write(DeviceMessage message, ByteBuf data) {
int msgId = message.getHeaderOrElse(HEADER_MSG_SEQ, () -> takeHolder(message.getDeviceId()).next(message.getMessageId()));
return write(message, msgId, data);
}
public static ByteBuf write(BinaryMessageType type, ByteBuf data) {
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 0-4字节 时间戳
data.writeLong(System.currentTimeMillis());
return data;
}
public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) {
BinaryMessageType type = lookup(message);
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 第1-8字节 时间戳
data.writeLong(message.getTimestamp());
// 9-11字节 消息序号
data.writeShort(msgId);
// 12... 字节 设备ID
DataType.STRING.write(data, message.getDeviceId());
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
tcp.setMessage(message);
//写出数据到ByteBuf
tcp.write(data);
return data;
}
public static DeviceMessage read(ByteBuf data) {
return read(data, null);
}
public static <T> T read(ByteBuf data,
String deviceIdMaybe,
BiFunction<DeviceMessage, Integer, T> handler) {
//第0个字节是消息类型
BinaryMessageType type = VALUES[data.readByte()];
if (type.forTcp == null) {
return null;
}
// 1-8字节 时间戳
long timestamp = data.readLong();
// 9-11字节 消息序号
int msgId = data.readUnsignedShort();
// 12... 字节 设备ID
String deviceId = (String) DataType.STRING.read(data);
if (deviceId == null) {
deviceId = deviceIdMaybe;
}
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
//从ByteBuf读取
tcp.read(data);
DeviceMessage message = tcp.getMessage();
message.thingId(DeviceThingType.device, deviceId);
if (timestamp > 0) {
message.timestamp(timestamp);
}
message.addHeader(HEADER_MSG_SEQ, msgId);
return handler.apply(message, msgId);
}
public static DeviceMessage read(ByteBuf data, String deviceIdMaybe) {
return read(data, deviceIdMaybe, (message, msgId) -> {
String messageId = null;
if (message.getDeviceId() != null) {
//获取实际平台下发的消息ID
MsgIdHolder holder = cache.get(message.getDeviceId());
if (holder != null) {
messageId = holder.getAndRemove(msgId);
}
}
if (messageId == null && msgId > 0) {
messageId = String.valueOf(msgId);
}
message.messageId(messageId);
return message;
});
}
public static BinaryMessageType lookup(DeviceMessage message) {
for (BinaryMessageType value : VALUES) {
if (value.forDevice != null && value.forDevice.isInstance(message)) {
return value;
}
}
throw new UnsupportedOperationException("unsupported device message " + message.getMessageType());
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (BinaryMessageType value : BinaryMessageType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.List;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReadPropertyMessage implements BinaryMessage<ReadPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readProperty;
}
private ReadPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReadPropertyMessage();
@SuppressWarnings("all")
List<String> list = (List<String>) DataType.ARRAY.read(buf);
message.setProperties(list);
}
@Override
public void write(ByteBuf buf) {
DataType.ARRAY.write(buf, message.getProperties());
}
@Override
public void setMessage(ReadPropertyMessage message) {
this.message = message;
}
@Override
public ReadPropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,41 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryReadPropertyMessageReply extends BinaryReplyMessage<ReadPropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected ReadPropertyMessageReply newMessage() {
return new ReadPropertyMessageReply();
}
@Override
protected void doWriteSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
@Override
protected void doReadSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
}

View File

@ -0,0 +1,52 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessageReply;
import java.util.Map;
public abstract class BinaryReplyMessage<T extends DeviceMessageReply> implements BinaryMessage<T> {
private T message;
protected abstract T newMessage();
@Override
public final void read(ByteBuf buf) {
message = newMessage();
boolean success = buf.readBoolean();
if (success) {
doReadSuccess(message, buf);
} else {
message.success(false);
message.code(String.valueOf(DataType.readFrom(buf)));
message.message(String.valueOf(DataType.readFrom(buf)));
}
}
protected abstract void doReadSuccess(T msg, ByteBuf buf);
protected abstract void doWriteSuccess(T msg, ByteBuf buf);
@Override
public final void write(ByteBuf buf) {
buf.writeBoolean(message.isSuccess());
if (message.isSuccess()) {
doWriteSuccess(message, buf);
} else {
DataType.writeTo(message.getCode(), buf);
DataType.writeTo(message.getMessage(), buf);
}
}
@Override
public void setMessage(T message) {
this.message = message;
}
@Override
public T getMessage() {
return message;
}
}

View File

@ -0,0 +1,48 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReportPropertyMessage implements BinaryMessage<ReportPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.reportProperty;
}
private ReportPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReportPropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(ReportPropertyMessage message) {
this.message = message;
}
@Override
public ReportPropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,49 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryWritePropertyMessage implements BinaryMessage<WritePropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.writeProperty;
}
private WritePropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new WritePropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(WritePropertyMessage message) {
this.message = message;
}
@Override
public WritePropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryWritePropertyMessageReply extends BinaryReplyMessage<WritePropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected WritePropertyMessageReply newMessage() {
return new WritePropertyMessageReply();
}
@Override
protected void doReadSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
@Override
protected void doWriteSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
}

View File

@ -0,0 +1,286 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.protocol.official.ObjectMappers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public enum DataType {
//0x00
NULL {
@Override
public Object read(ByteBuf buf) {
return null;
}
@Override
public void write(ByteBuf buf, Object value) {
}
},
//0x01
BOOLEAN {
@Override
public Object read(ByteBuf buf) {
return buf.readBoolean();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeBoolean((Boolean) value);
}
},
//0x02
INT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x03
INT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x04
INT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x05
INT64 {
@Override
public Object read(ByteBuf buf) {
return buf.readLong();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeLong((Long) value);
}
},
//0x06
UINT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x07
UINT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x08
UINT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x09
FLOAT {
@Override
public Object read(ByteBuf buf) {
return buf.readFloat();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeFloat((Float) value);
}
},
//0x0A
DOUBLE {
@Override
public Object read(ByteBuf buf) {
return buf.readDouble();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeDouble((Double) value);
}
},
//0x0B
STRING {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = ((String) value).getBytes();
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0C
BINARY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return bytes;
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = (byte[]) value;
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0D
ARRAY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
List<Object> array = new ArrayList<>(len);
for (int i = 0; i < len; i++) {
array.add(readFrom(buf));
}
return array;
}
@Override
public void write(ByteBuf buf, Object value) {
Collection<Object> array = (Collection<Object>) value;
buf.writeShort(array.size());
for (Object o : array) {
writeTo(o, buf);
}
}
},
//0x0E
OBJECT {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
Map<String, Object> data = Maps.newLinkedHashMapWithExpectedSize(len);
for (int i = 0; i < len; i++) {
data.put((String) STRING.read(buf), readFrom(buf));
}
return data;
}
@Override
public void write(ByteBuf buf, Object value) {
Map<String, Object> data = value instanceof Map ? ((Map) value) : ObjectMappers.JSON_MAPPER.convertValue(value, Map.class);
buf.writeShort(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
STRING.write(buf, entry.getKey());
writeTo(entry.getValue(), buf);
}
}
};
private final static DataType[] VALUES = values();
public abstract Object read(ByteBuf buf);
public abstract void write(ByteBuf buf, Object value);
public static Object readFrom(ByteBuf buf) {
return VALUES[buf.readUnsignedByte()].read(buf);
}
public static void writeTo(Object data, ByteBuf buf) {
DataType type = loopUpType(data);
buf.writeByte(type.ordinal());
type.write(buf, data);
}
private static DataType loopUpType(Object data) {
if (data == null) {
return NULL;
} else if (data instanceof Boolean) {
return BOOLEAN;
} else if (data instanceof Byte) {
return INT8;
} else if (data instanceof Short) {
return INT16;
} else if (data instanceof Integer) {
return INT32;
} else if (data instanceof Long) {
return INT64;
} else if (data instanceof Float) {
return FLOAT;
} else if (data instanceof Double) {
return DOUBLE;
} else if (data instanceof String) {
return STRING;
} else if (data instanceof byte[]) {
return BINARY;
} else if (data instanceof Collection) {
return ARRAY;
} else if (data instanceof Map) {
return OBJECT;
} else {
throw new IllegalArgumentException("Unsupported data type: " + data.getClass());
}
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (DataType value : DataType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}

View File

@ -0,0 +1,10 @@
package org.jetlinks.protocol.official.functional;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TimeSyncRequest {
private String messageId;
}

View File

@ -0,0 +1,15 @@
package org.jetlinks.protocol.official.functional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TimeSyncResponse {
private String messageId;
private long timestamp;
}

View File

@ -0,0 +1,234 @@
package org.jetlinks.protocol.official.http;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Objects;
/**
* Http 的消息编解码器
*
* @author zhouhao
* @since 3.0.0
*/
@Slf4j
public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec, Authenticator {
public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置"
, "使用HTTP Bearer Token进行认证")
.add("bearer_token", "Token", "Token", new PasswordType());
private final Transport transport;
public JetLinksHttpDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public JetLinksHttpDeviceMessageCodec() {
this(DefaultTransport.HTTP);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
JSONObject json = context.getMessage().toJson();
//通过websocket下发
return Mono.just(DefaultWebSocketMessage.of(
WebSocketMessage.Type.TEXT,
Unpooled.wrappedBuffer(json.toJSONString().getBytes())));
}
private static SimpleHttpResponseMessage unauthorized(String msg) {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}")
.status(401)
.build();
}
private static SimpleHttpResponseMessage badRequest() {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"bad_request\"}")
.status(400)
.build();
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof HttpExchangeMessage) {
return decodeHttp(context);
}
if (context.getMessage() instanceof WebSocketSessionMessage) {
return decodeWebsocket(context);
}
return Flux.empty();
}
private Flux<DeviceMessage> decodeWebsocket(MessageDecodeContext context) {
WebSocketSessionMessage msg = ((WebSocketSessionMessage) context.getMessage());
return Mono
.justOrEmpty(MessageType.convertMessage(msg.payloadAsJson()))
.cast(DeviceMessage.class)
.flux();
}
private Flux<DeviceMessage> decodeHttp(MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
//校验请求头中的Authorization header,格式:
// Authorization: Bearer <token>
Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null);
if (header == null || header.getValue() == null || header.getValue().length == 0) {
return message
.response(unauthorized("Authorization header is required"))
.thenMany(Mono.empty());
}
String[] token = header.getValue()[0].split(" ");
if (token.length == 1) {
return message
.response(unauthorized("Illegal token format"))
.thenMany(Mono.empty());
}
String basicToken = token[1];
String[] paths = TopicMessageCodec.removeProductPath(message.getPath());
if (paths.length < 1) {
return message
.response(badRequest())
.thenMany(Mono.empty());
}
String deviceId = paths[1];
return context
.getDevice(deviceId)
.flatMap(device -> device.getConfig("bearer_token"))
//校验token
.filter(value -> Objects.equals(value.asString(), basicToken))
//设备或者配置不对
.switchIfEmpty(Mono.defer(() -> message
.response(unauthorized("Device no register or token not match"))
.then(Mono.empty())))
//解码
.flatMapMany(ignore -> doDecode(message, paths))
.switchOnFirst((s, flux) -> {
Mono<Void> handler;
//有结果则认为成功
if (s.hasValue()) {
handler = message.ok("{\"success\":true}");
} else {
return message
.response(badRequest())
.then(Mono.empty());
}
return handler.thenMany(flux);
})
.onErrorResume(err -> message
.error(500, getErrorMessage(err))
.then(Mono.error(err)))
//跟踪信息
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(deviceId),
builder -> builder.setAttribute(DeviceTracer.SpanKey.message, message.print())));
}
private Flux<DeviceMessage> doDecode(HttpExchangeMessage message, String[] paths) {
return message
.payload()
.flatMapMany(buf -> {
byte[] body = ByteBufUtil.getBytes(buf);
return TopicMessageCodec.decode(ObjectMappers.JSON_MAPPER, paths, body);
});
}
public String getErrorMessage(Throwable err) {
if (err instanceof JsonParseException) {
return "{\"success\":false,\"code\":\"request_body_format_error\"}";
}
return "{\"success\":false,\"code\":\"server_error\"}";
}
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator device) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String token = req
.getSocketSession()
.getQueryParameters()
.get("token");
if (StringUtils.isEmpty(token)) {
return Mono.just(AuthenticationResponse.error(401, "认证参数错误"));
}
return device
.getConfig("bearer_token")
//校验token
.filter(value -> Objects.equals(value.asString(), token))
.map(ignore -> AuthenticationResponse.success(device.getDeviceId()))
//未配置或者配置不对
.switchIfEmpty(Mono.fromSupplier(() -> AuthenticationResponse.error(401, "token错误")));
}
static AuthenticationResponse deviceNotFound = AuthenticationResponse.error(404, "设备不存在");
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String[] paths = TopicMessageCodec.removeProductPath(req.getSocketSession().getPath());
if (paths.length < 1) {
return Mono.just(AuthenticationResponse.error(400, "URL格式错误"));
}
return registry
.getDevice(paths[1])
.flatMap(device -> authenticate(request, device))
.defaultIfEmpty(deviceNotFound);
}
}

View File

@ -0,0 +1,127 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.SneakyThrows;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TcpDevice {
@SneakyThrows
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
int start = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int count = args.length > 1 ? Integer.parseInt(args[1]) : 8000;
String[] hosts = args.length > 2 ? args[2].split(",") : new String[]{"0.0.0.0"};
Flux.range(start, count)
.flatMap(i -> Mono
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
RecordParser parser = RecordParser.newFixed(4);
AtomicReference<Buffer> buffer = new AtomicReference<>();
parser.handler(buf -> {
buffer.accumulateAndGet(buf, (a, b) -> {
if (a == null) {
parser.fixedSizeMode(buf.getInt(0));
return b;
}
parser.fixedSizeMode(4);
sink.success("tcp-off-" + i + ":" + socket.localAddress());
BinaryMessageType
.read(b.getByteBuf(),
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
return null;
});
});
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(parser);
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
});
}),
1024
)
.count()
.subscribe(System.out::println);
System.in.read();
}
protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) {
if (!(downstream instanceof AcknowledgeDeviceMessage)) {
// System.out.println(downstream);
}
DeviceMessage reply = null;
if (downstream instanceof ReadPropertyMessage) {
reply = ((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
"temp0",
ThreadLocalRandom
.current()
.nextFloat() * 100
));
} else if (downstream instanceof WritePropertyMessage) {
reply = ((WritePropertyMessage) downstream)
.newReply()
.success(((WritePropertyMessage) downstream).getProperties());
}
if (reply != null) {
socket.write(
Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(
reply
, seq, Unpooled.buffer())))
);
}
}
}

View File

@ -0,0 +1,123 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.AckCode;
import org.jetlinks.protocol.official.binary.BinaryAcknowledgeDeviceMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class TcpDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata tcpConfig = new DefaultConfigMetadata(
"TCP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.TCP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//read index
payload.readInt();
//处理tcp连接后的首次消息
if (context.getDevice() == null) {
return handleLogin(payload, context);
}
return Mono.justOrEmpty(BinaryMessageType.read(payload, context.getDevice().getDeviceId()));
}
private Mono<DeviceMessage> handleLogin(ByteBuf payload, MessageDecodeContext context) {
DeviceMessage message = BinaryMessageType.read(payload);
if (message instanceof DeviceOnlineMessage) {
String token = message
.getHeader(BinaryDeviceOnlineMessage.loginToken)
.orElse(null);
String deviceId = message.getDeviceId();
return context
.getDevice(deviceId)
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
} else {
return ack(message, AckCode.noAuth, context);
}
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return Unpooled.wrappedBuffer(
Unpooled.buffer().writeInt(payload.writerIndex()),
payload);
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
if(source==null){
return Mono.empty();
}
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(EncodedMessage.simple(
wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))
))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return Mono.just(EncodedMessage.simple(
wrapByteByf(
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
)
));
}
}

View File

@ -0,0 +1,114 @@
package org.jetlinks.protocol.official.udp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class UDPDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata udpConfig = new DefaultConfigMetadata(
"UDP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.UDP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//todo 认证类型, 0 token,1 sign
byte authType = payload.readByte();
//前面是token
String token = (String) DataType.STRING.read(payload);
//接下来是消息
DeviceMessage message = BinaryMessageType.read(payload);
return context
.getDevice(message.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return payload;
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(doEncode(message, ""))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return context
.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.map(config -> doEncode(deviceMessage, config.asString())));
}
private EncodedMessage doEncode(DeviceMessage message, String token) {
ByteBuf buf = Unpooled.buffer();
//todo 认证类型, 0 token,1 sign
buf.writeByte(0);
//token
DataType.STRING.write(buf, token);
//指令
return EncodedMessage.simple(wrapByteByf(BinaryMessageType.write(message, buf)));
}
}

View File

@ -0,0 +1,29 @@
#### 使用HTTP推送设备数据
上报属性例子:
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
上报事件例子:
```http request
POST /{productId}/{deviceId}/event/{eventId}
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"data":{
"address": ""
}
}
```

View File

@ -0,0 +1,12 @@
### 认证说明
CONNECT报文:
```text
clientId: 设备ID
username: secureId+"|"+timestamp
password: md5(secureId+"|"+timestamp+"|"+secureKey)
```
说明: secureId以及secureKey在创建设备产品或设备实例时进行配置.
timestamp为当前时间戳(毫秒),与服务器时间不能相差5分钟.
md5为32位,不区分大小写.

View File

View File

@ -1 +0,0 @@
org.jetlinks.protocol.official.JetLinksProtocolSupportProvider

View File

@ -5,6 +5,7 @@ import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.Option;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
@ -19,59 +20,71 @@ import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.CoapExchangeMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicReference;
public class JetLinksCoapDeviceMessageCodecTest {
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
DeviceOperator device;
private String key = RandomUtil.randomChar(16);
private final String key = RandomUtil.randomChar(16);
private TestDeviceRegistry registry;
AtomicReference<Message> messageRef = new AtomicReference<>();
private CoapServer server;
@After
public void shutdown(){
server.stop();
}
@Before
public void init() {
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
device = registry.register(DeviceInfo.builder()
.id("test")
.protocol("jetlinks")
.build())
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
device = registry
.register(DeviceInfo.builder()
.id("test")
.protocol("jetlinks")
.build())
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
.block();
}
@Test
@SneakyThrows
public void test() {
AtomicReference<Message> messageRef = new AtomicReference<>();
CoapServer server = new CoapServer() {
server = new CoapServer() {
@Override
protected Resource createRoot() {
return new CoapResource("/", true) {
@Override
public void handlePOST(CoapExchange exchange) {
codec.decode(new MessageDecodeContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return new CoapExchangeMessage(exchange);
}
codec
.decode(new MessageDecodeContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return new CoapExchangeMessage(exchange);
}
@Override
public DeviceOperator getDevice() {
return device;
}
})
.doOnSuccess(messageRef::set)
@Override
public DeviceOperator getDevice() {
return device;
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
})
.doOnNext(messageRef::set)
.doOnError(Throwable::printStackTrace)
.subscribe();
}
@ -85,28 +98,51 @@ public class JetLinksCoapDeviceMessageCodecTest {
};
Endpoint endpoint = new CoapEndpoint.Builder()
.setPort(12345).build();
.setPort(12341).build();
server.addEndpoint(endpoint);
server.start();
}
@Test
@SneakyThrows
public void test() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"data\":1}";
request.setURI("coap://localhost:12345/test/test/event/event1");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(),key));
request.setURI("coap://localhost:12341/test/test/event/event1");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Thread.sleep(1000);
Assert.assertNotNull(messageRef.get());
Assert.assertNotNull(messageRef.get());
Assert.assertTrue(messageRef.get() instanceof EventMessage);
System.out.println(messageRef.get());
}
@Test
@SneakyThrows
public void testTimeSync() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"messageId\":1}";
request.setURI("coap://localhost:12341/test/test/time-sync");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Assert.assertTrue(response.getResponseText().contains("timestamp"));
}
}

View File

@ -7,7 +7,6 @@ import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.ProductInfo;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DerivedMetadataMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
@ -16,36 +15,40 @@ import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec;
import org.jetlinks.core.server.session.DeviceSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
public class JetLinksMqttDeviceMessageCodecTest {
org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
TestDeviceRegistry registry;
private EncodedMessage currentReply;
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
registry.register(ProductInfo.builder()
.id("product1")
.protocol("jetlinks")
.build())
.id("product1")
.protocol("jetlinks")
.build())
.flatMap(product -> registry.register(DeviceInfo.builder()
.id("device1")
.productId("product1")
.build()))
.subscribe();
.id("device1")
.productId("product1")
.build()))
.block();
}
@ -85,9 +88,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
@ -101,18 +105,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
ReadPropertyMessageReply reply = (ReadPropertyMessageReply)childReply.getChildDeviceMessage();;
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -151,16 +156,17 @@ public class JetLinksMqttDeviceMessageCodecTest {
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/write");
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/device1/properties/write");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testWritePropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof WritePropertyMessageReply);
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
@ -172,22 +178,22 @@ public class JetLinksMqttDeviceMessageCodecTest {
}
@Test
public void testWriteChildPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
WritePropertyMessageReply reply = (WritePropertyMessageReply)childReply.getChildDeviceMessage();;
WritePropertyMessageReply reply = (WritePropertyMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -231,9 +237,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testInvokeFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).block();
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
@ -247,18 +254,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testInvokeChildFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).block();
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply)childReply.getChildDeviceMessage();;
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -269,9 +277,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).block();
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof EventMessage);
EventMessage reply = ((EventMessage) message);
@ -284,13 +293,14 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).block();
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
Assert.assertTrue(message instanceof ChildDeviceMessage);
EventMessage reply = ((EventMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
EventMessage reply = ((EventMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100);
@ -299,16 +309,26 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Message message = codec.decode(createMessageContext(
SimpleMqttMessage
.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer(("{\"messageId\":\"test\"," +
"\"properties\":{\"sn\":\"test\"}," +
"\"propertySourceTimes\":{\"sn\":10086}" +
"}").getBytes()))
.build()))
.blockFirst();
Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
Assert.assertNotNull(reply.getPropertySourceTimes());
Assert.assertEquals(reply.getPropertySourceTimes().get("sn"), Long.valueOf(10086L));
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply);
}
@ -316,13 +336,14 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
Assert.assertTrue(message instanceof ChildDeviceMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
@ -333,9 +354,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).block();
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof DerivedMetadataMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
@ -347,20 +369,45 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).block();
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
Assert.assertTrue(message instanceof ChildDeviceMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1");
System.out.println(reply);
}
@Test
public void testTimeSync() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/time-sync")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\"}"
.getBytes()))
.build()))
.blockFirst();
Assert.assertNull(message);
Assert.assertNotNull(currentReply);
Assert.assertEquals(((MqttMessage) currentReply).getTopic(), "/product1/device1/time-sync/reply");
Assert.assertTrue(currentReply.payloadAsString().contains("timestamp"));
}
public void testTopic() {
}
public MessageEncodeContext createMessageContext(Message message) {
System.out.println(message.toString());
return new MessageEncodeContext() {
@ -374,25 +421,99 @@ public class JetLinksMqttDeviceMessageCodecTest {
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
public MessageDecodeContext createMessageContext(EncodedMessage message) {
System.out.println(message.toString());
return new MessageDecodeContext() {
return new FromDeviceMessageContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return message;
}
@Override
public DeviceSession getSession() {
return new MockSession();
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
class MockSession implements DeviceSession {
@Override
public String getId() {
return "device1";
}
@Override
public String getDeviceId() {
return "device1";
}
@Nullable
@Override
public DeviceOperator getOperator() {
return registry.getDevice("device1").block();
}
@Override
public long lastPingTime() {
return 0;
}
@Override
public long connectTime() {
return 0;
}
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
currentReply = encodedMessage;
return Mono.just(true);
}
@Override
public Transport getTransport() {
return null;
}
@Override
public void close() {
}
@Override
public void ping() {
}
@Override
public boolean isAlive() {
return false;
}
@Override
public void onClose(Runnable call) {
}
}
}

View File

@ -1,24 +0,0 @@
package org.jetlinks.protocol.official;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class JetlinksTopicMessageCodecTest {
JetlinksTopicMessageCodec codec = new JetlinksTopicMessageCodec();
@Test
public void testReadProperty() {
ReadPropertyMessage readProperty = new ReadPropertyMessage();
readProperty.setProperties(Arrays.asList("name"));
readProperty.setMessageId("test");
JetlinksTopicMessageCodec.EncodedTopic topic = codec.encode("test", readProperty);
Assert.assertEquals(topic.getTopic(),"/test/properties/read");
}
}

View File

@ -0,0 +1,74 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetlinks.core.codec.defaults.TopicPayloadCodec;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.route.Route;
import org.junit.Test;
import reactor.test.StepVerifier;
import static org.junit.Assert.*;
public class TopicMessageCodecTest {
public void testChild(ObjectMapper objectMapper) {
ChildDeviceMessage message = new ChildDeviceMessage();
message.setDeviceId("test");
ReportPropertyMessage msg = new ReportPropertyMessage();
msg.setDeviceId("childId");
message.setChildDeviceMessage(msg);
message.setTimestamp(msg.getTimestamp());
TopicPayload payload = TopicMessageCodec.child.doEncode(objectMapper, message);
System.out.println(payload.getPayload().length);
assertEquals("/test/child/childId/properties/report", payload.getTopic());
TopicMessageCodec
.decode(objectMapper, payload.getTopic(), payload.getPayload())
.as(StepVerifier::create)
.expectNextMatches(deviceMessage -> {
System.out.println(message);
System.out.println(deviceMessage);
return deviceMessage.toJson().equals(message.toJson());
})
.verifyComplete();
}
@Test
public void testRoute() {
for (TopicMessageCodec value : TopicMessageCodec.values()) {
Route route = value.getRoute();
if (null != route)
System.out.println(route.getAddress());
}
}
@Test
public void doTest() {
testChild(ObjectMappers.JSON_MAPPER);
testChild(ObjectMappers.CBOR_MAPPER);
}
@Test
public void testEvent() {
EventMessage eventMessage = new EventMessage();
eventMessage.setEvent("test");
eventMessage.setDeviceId("test-device");
eventMessage.setData("123");
TopicPayload payload = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, eventMessage);
assertEquals(payload.getTopic(), "/test-device/event/test");
DeviceMessage msg = TopicMessageCodec
.decode(ObjectMappers.JSON_MAPPER, payload.getTopic(), payload.getPayload())
.blockLast();
assertEquals(msg.toJson(), eventMessage.toJson());
}
}

View File

@ -0,0 +1,119 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class BinaryMessageTypeTest {
@Test
public void testOnline() {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1000");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
System.out.println(ByteBufUtil.prettyHexDump(byteBuf));
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
System.out.println(ByteBufUtil.prettyHexDump(buf));
//登录报文
System.out.println(ByteBufUtil.hexDump(buf));
}
@Test
public void testReport() {
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
}
@Test
public void testRead() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonList("temp"));
doTest(message);
ReadPropertyMessageReply reply = new ReadPropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testWrite() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
WritePropertyMessageReply reply = new WritePropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setFunctionId("123");
message.setDeviceId("test");
message.setMessageId("test123");
message.addInput("test", 1);
doTest(message);
FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setOutput(123);
doTest(reply);
}
public void doTest(DeviceMessage message) {
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
// System.out.println(ByteBufUtil.prettyHexDump(data));
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
System.out.println(ByteBufUtil.prettyHexDump(buf));
System.out.println(ByteBufUtil.hexDump(buf));
//将长度字节读取后直接解析报文正文
buf.readInt();
DeviceMessage read = BinaryMessageType.read(buf);
if (null != read.getHeaders()) {
read.getHeaders().forEach(message::addHeader);
}
System.out.println(read);
Assert.assertEquals(read.toString(), message.toString());
}
}

View File

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<logger name="io.netty" level="warn"/>
<logger name="org.apache" level="warn"/>
 
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>