Compare commits

..

No commits in common. "v3" and "v2" have entirely different histories.
v3 ... v2

42 changed files with 271 additions and 2949 deletions

3
.gitignore vendored
View File

@ -24,5 +24,4 @@ hs_err_pid*
/static/
/upload
/ui/upload/
!/package/**
dependency-reduced-pom.xml
!/package/**

View File

@ -1,61 +1,5 @@
## JetLinks 官方设备接入协议
JetLinks官方实现的设备接入协议,可用于参考实现自定义协议开发.
类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider`
注意: 本协议仅用于参考自定义协议开发,在实际使用中请根据不同的场景进行调整.如认证方式,加密等.
### MQTT
[查看TOPIC说明](http://doc.jetlinks.cn/dev-guide/jetlinks-protocol-support.html)
用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/mqtt-auth-generator.html)
### HTTP
HTTP接入时需要使用`Bearer`
认证,URL和[MQTT的接入Topic]((http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html))一致.
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
### TCP
报文格式说明:
第0-4字节对应的32位整型值为接下来报文的长度,
后续为报文数据,
具体报文格式见: [二进制格式说明](binary-protocol.md)
创建连接后第一个数据包需要发送[认证包](binary-protocol.md#0x01-online-首次连接),
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
### UDP
报文格式说明:
第`0`字节表示认证类型,目前固定为0x00.
第`1-n`字节为`密钥信息`,编码使用`STRING`见: [数据类型定义](binary-protocol.md#数据类型)
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
后续为报文数据,具体报文格式见: [二进制格式说明](binary-protocol.md)
UDP无需发送认证包,但是需要每个报文中都包含密钥信息.
除了ACK以外,其他平台下发的指令也都会包含认证密钥信息,用于设备侧校验请求.
### 测试
可以使用[模拟器](http://github.com/jetlinks/device-simulator)进行模拟测试
[查看说明](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html)

View File

@ -1,167 +0,0 @@
## 头信息
| 字节数 | 类型 | 字段 | 备注 |
| :----: | ------- | :--------------------: | ------------------- |
| n | 自定 | 消息长度 | 平台配置项 |
| 1 | INT8 | 消息类型 | 见消息类型定义 |
| 8 | INT64 | UTC时间戳 | |
| 2 | INT16 | 消息序号 | |
| 2 | INT16 | 设备ID长度 | |
| n | STRING | 设备ID | 根据设备ID长度得出 |
| n | MESSAGE | 消息类型对应的编码规则 | |
| 2 | INT16 | secureKeyLength | TCP认真配置密钥长度 |
| n | STRING | secureKey | 密钥 |
## 数据类型
所有数据类型均采用`大端`编码
| Byte | Type | 编码规则 | 备注 |
| :--: | :-----: | ------------------------------------------------------------ | ------------------------------------------------------------ |
| 0x00 | NULL | | |
| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true | |
| 0x02 | INT8 | 1字节 (byte) | |
| 0x03 | INT16 | 2字节整型 (short) | |
| 0x04 | INT32 | 4字节整型 (int) | |
| 0x05 | INT64 | 8字节整型 (long) | |
| 0x06 | UINT8 | 1字节无符号整型 | |
| 0x07 | UINT16 | 2字节无符号整型 | |
| 0x08 | UINT32 | 4字节无符号整型 | |
| 0x09 | FLOAT | 4字节 IEEE 754浮点数 | |
| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 | |
| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 | 2+N2个字节UnsignedShort表示N的长度 |
| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 | 2+N2个字节UnsignedShort表示N的长度 |
| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 | 2+N2个字节UnsignedShort表示ARRAY的长度N=很多*(1+X)1个字节UnsignedShort表示X的数据类型X表示长度见上几行。 |
| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value | 2+N2个字节UnsignedShort表示OBJECT的长度,N是STRING+数据类型的组合了(见上几行)。 |
## 消息类型定义
| Byte | Type | 说明 | Message示例 |
| :--: | :----------------- | ------------ | ------------------------------------------------------------ |
| 0x00 | keepalive | 心跳 | |
| 0x01 | online | 首次连接 | [STRING:密钥信息 ] |
| 0x02 | ack | 应答 | [应答码 ]<br />应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持. |
| 0x03 | reportProperty | 上报属性 | [属性数据:OBJECT类型 ] |
| 0x04 | readProperty | 读取属性 | [属性列表:ARRAY类型 ] |
| 0x05 | readPropertyReply | 读取属性回复 | 读取成功:[`0x01`,属性数据:OBJECT类型 ]<br />读取失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
| 0x06 | writeProperty | 修改属性 | [属性列表:OBJECT类型 ] |
| 0x07 | writePropertyReply | 修改属性回复 | 修改成功:[`0x01`,属性数据:OBJECT类型 ]<br />修改失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
| 0x08 | function | 功能调用 | [功能ID:STRING类型,功能参数:OBJECT类型 ] |
| 0x09 | functionReply | 功能调用回复 | 调用成功:[`0x01`,属性数据:OBJECT类型 ]<br />调用失败:[`0x00`,错误码:动态类型,错误消息:动态类型 ] |
### 备注
`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型.
如: 无错误信息
[ 0x00,`0x00`,`0x00` ]
`INT8(0x02)`类型错误码:`0x04`
[0x00,`0x02,0x04`,0x00 ]
## 示例
### 设备上线
```
000000270100000186c51a890f0001001331363531383533343133303332383934343634000561646d696e
```
```java
String hexForOnline =
"00000027" +//消息长度
"01" + //请求上线
"00000186c51a890f" +//时间戳
"0001" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0005" +//secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一使用byteBuf构建上线消息
String deviceId = "1651853413032894464";
String secureKey = "admin";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x01)
.writeLong(System.currentTimeMillis())
.writeShort(1)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf onlineBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二使用jetlinks-core构建消息
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("1");
message.setTimestamp(1678344096015L);
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
```
### 设备数据上报
```
0000006C0300000186C567FA7900020013313635313835333431333033323839343436340001000474656d700B000433362e35000561646d696e
```
```java
String hexForReport =
"0000006C" +//消息长度
"03" +//上报消息
"00000186C567FA79" + //时间戳
"0002" +//消息序号
"0013" +//设备ID长度
"31363531383533343133303332383934343634" +//设备ID
"0001" + //OBJECT对象数量
"0004" +//key的长度
"74656d70" + //值
"0B" + //value的类型
"0004" +//Value的长度
"33362e35" + //值
"0005" + //secureKey长度
"61646d696e";//平台配置的secureKey
//构建方式一
String deviceId = "1651853413032894464";
String secureKey = "admin";
String key = "temp";
String value = "36.5";
ByteBuf data = Unpooled
.buffer()
.writeByte(0x03)
.writeLong(System.currentTimeMillis())
.writeShort(2)
.writeShort(deviceId.getBytes().length)
.writeBytes(deviceId.getBytes())
.writeShort(1)
.writeShort(key.getBytes().length)
.writeBytes(key.getBytes())
.writeByte(0x0B)
.writeShort(value.getBytes().length)
.writeBytes(value.getBytes())
.writeShort(secureKey.getBytes().length)
.writeBytes(secureKey.getBytes());
ByteBuf reportBuf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
//构建方式二
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("1651853413032894464");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
message.setMessageId("2");
message.setProperties(Collections.singletonMap("temp", 32.88));
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
```

Binary file not shown.

80
pom.xml
View File

@ -6,17 +6,17 @@
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId>
<version>3.0.0</version>
<version>2.0-SNAPSHOT</version>
<name>JetLinks</name>
<url>https://jetlinks.org</url>
<url>http://jetlinks.org</url>
<inceptionYear>2019</inceptionYear>
<description>JetLinks 物联网平台</description>
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
@ -47,7 +47,7 @@
<spring.boot.version>2.2.8.RELEASE</spring.boot.version>
<hsweb.framework.version>4.0.3</hsweb.framework.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<reactor.version>2020.0.6</reactor.version>
<reactor.version>Dysprosium-RELEASE</reactor.version>
</properties>
<profiles>
@ -122,52 +122,6 @@
</snapshotRepository>
</distributionManagement>
</profile>
<profile>
<id>all-in-one</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置主类 -->
<mainClass>org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec</mainClass>
</transformer>
</transformers>
<!-- <artifactSet>-->
<!-- <includes>-->
<!-- &lt;!&ndash; 添加需要打包在一起的第三方依赖信息 &ndash;&gt;-->
<!-- &lt;!&ndash; <include>com.domain:*</include> &ndash;&gt;-->
<!-- </includes>-->
<!-- </artifactSet>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
@ -204,20 +158,20 @@
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.2.0</version>
<version>1.1.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
<version>3.5.0</version>
<version>2.2.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
@ -229,14 +183,14 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.18</version>
<version>3.3.12.RELEASE</version>
<scope>test</scope>
</dependency>
@ -244,15 +198,9 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.3.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<dependencyManagement>
@ -271,12 +219,12 @@
<repository>
<id>releases</id>
<name>Nexus Release Repository</name>
<url>https://nexus.jetlinks.cn/content/repositories/releases/</url>
<url>http://nexus.hsweb.me/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>https://nexus.jetlinks.cn/content/repositories/snapshots/</url>
<url>http://nexus.hsweb.me/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
@ -285,7 +233,7 @@
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>https://nexus.jetlinks.cn/content/groups/public/</url>
<url>https://nexus.hsweb.me/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
@ -299,4 +247,4 @@
</repositories>
</project>
</project>

View File

@ -1,81 +0,0 @@
package org.jetlinks.protocol.official;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Slf4j
public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec {
protected abstract Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response);
protected String getPath(CoapMessage message){
String path = message.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
}
return path;
}
protected String getDeviceId(CoapMessage message){
String deviceId = message.getStringOption(2100).orElse(null);
String[] paths = TopicMessageCodec.removeProductPath(getPath(message));
if (StringUtils.isEmpty(deviceId) && paths.length > 1) {
deviceId = paths[1];
}
return deviceId;
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
AtomicBoolean alreadyReply = new AtomicBoolean();
Consumer<Object> responseHandler = (resp) -> {
if (alreadyReply.compareAndSet(false, true)) {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
if (resp instanceof byte[]) {
exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp));
}
}
};
return this
.decode(exchangeMessage, context, responseHandler)
.doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST);
})
.switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST)));
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
@Nonnull
@Override
public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -1,73 +0,0 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.functional.TimeSyncRequest;
import org.jetlinks.protocol.official.functional.TimeSyncResponse;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.function.Function;
/**
* 功能性的topic,不和平台交互
*/
public enum FunctionalTopicHandlers {
//同步时间
timeSync("/*/*/time-sync") {
@SneakyThrows
@SuppressWarnings("all")
Mono<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
TopicPayload topicPayload = new TopicPayload();
topicPayload.setTopic(String.join("/", topic) + "/reply");
TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class);
TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis());
topicPayload.setPayload(mapper.writeValueAsBytes(response));
//直接回复给设备
return sender
.apply(topicPayload)
.then(Mono.empty());
}
};
FunctionalTopicHandlers(String topic) {
this.pattern = topic.split("/");
}
private final String[] pattern;
abstract Publisher<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender);
public static Publisher<DeviceMessage> handle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
return Mono
.justOrEmpty(fromTopic(topic))
.flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender));
}
static Optional<FunctionalTopicHandlers> fromTopic(String[] topic) {
for (FunctionalTopicHandlers value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
}

View File

@ -8,31 +8,18 @@ import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.message.codec.*;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
@Override
public Transport getSupportTransport() {
@ -41,12 +28,8 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessag
public Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
if (context.getDevice() == null) {
return Flux.empty();
}
return Flux.defer(() -> {
String path = getPath(message);
String deviceId = getDeviceId(message);
String path = message.getPath();
String sign = message.getStringOption(2110).orElse(null);
String token = message.getStringOption(2111).orElse(null);
byte[] payload = message.payloadAsBytes();
@ -56,34 +39,26 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessag
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
if (StringUtils.isEmpty(deviceId)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
// TODO: 2021/7/30 移到 FunctionalTopicHandlers
if (path.endsWith("/request-token")) {
if ("/auth".equals(path)) {
//认证
return context
.getDevice(deviceId)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMap(device -> device
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, deviceId, payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return device
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
}))
.getDevice()
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return context.getDevice()
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
})
.then(Mono.empty());
}
if (StringUtils.isEmpty(token)) {
@ -91,28 +66,22 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessag
return Mono.empty();
}
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getSelfConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}))
.doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED))
.getDevice()
.getConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
})
.doOnComplete(() -> {
response.accept(CoAP.ResponseCode.CREATED);
})
.doOnError(error -> {
log.error("decode coap message error", error);
response.accept(CoAP.ResponseCode.BAD_REQUEST);
@ -121,6 +90,28 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessag
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
return decode(exchangeMessage, context, resp -> {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
});
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) {
//验证签名
@ -134,5 +125,9 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessag
return true;
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -1,45 +1,34 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.jetlinks.core.Value;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
@Slf4j
public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
public static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
public class JetLinksCoapDeviceMessageCodec implements DeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP;
}
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
String path = getPath(message);
String deviceId = getDeviceId(message);
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context) {
String path = message.getPath();
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
@ -47,29 +36,60 @@ public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCod
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey);
//解码
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}));
.getDevice()
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
ByteBuf byteBuf = message.getPayload();
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
byteBuf.resetReaderIndex();
byte[] payload = ciphers.decrypt(req, secureKey);
//解码
return TopicMessageCodec.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload);
});
}
protected Flux<DeviceMessage> decode(CoapExchangeMessage message, MessageDecodeContext context) {
CoapExchange exchange = message.getExchange();
return decode(((CoapMessage) message), context)
.doOnComplete(() -> {
exchange.respond(CoAP.ResponseCode.CREATED);
exchange.accept();
})
.switchIfEmpty(Mono.fromRunnable(() -> {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
}))
.doOnError(error -> {
log.error("decode coap message error", error);
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
});
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
return Flux.defer(() -> {
log.debug("handle coap message:\n{}", context.getMessage());
if (context.getMessage() instanceof CoapExchangeMessage) {
return decode(((CoapExchangeMessage) context.getMessage()), context);
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context);
}
return Flux.empty();
});
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -1,8 +1,6 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
@ -15,7 +13,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Map;
/**
* <pre>
@ -57,11 +54,8 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
private final ObjectMapper mapper;
public JetLinksMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public JetLinksMqttDeviceMessageCodec() {
@ -87,14 +81,15 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
TopicPayload convertResult = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
.flatMap(device -> device
.getConfig(DeviceConfigKey.productId))
)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage
@ -114,42 +109,11 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
MqttMessage message = (MqttMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(context.getDevice(),
message.getTopic().split("/"),
payload,
mapper,
reply -> doReply(context, reply)))
;
}
private Mono<Void> doReply(MessageCodecContext context, TopicPayload reply) {
if (context instanceof FromDeviceMessageContext) {
return ((FromDeviceMessageContext) context)
.getSession()
.send(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
} else if (context instanceof ToDeviceMessageContext) {
return ((ToDeviceMessageContext) context)
.sendToDevice(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
}
return Mono.empty();
.decode(ObjectMappers.JSON_MAPPER, TopicMessageCodec.removeProductPath(message.getTopic()), payload);
}

View File

@ -3,124 +3,68 @@ package org.jetlinks.protocol.official;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec;
import org.jetlinks.protocol.official.udp.UDPDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
"MQTT认证配置"
, "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" +
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
.add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) {
return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport();
support.setId("jetlinks.v3.0");
support.setName("JetLinks V3.0");
support.setDescription("JetLinks Protocol Version 3.0");
support.addRoutes(DefaultTransport.MQTT, Arrays
.stream(TopicMessageCodec.values())
.map(TopicMessageCodec::getRoute)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.MQTT,
"document-mqtt.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addRoutes(DefaultTransport.HTTP, Stream
.of(TopicMessageCodec.reportProperty,
TopicMessageCodec.event,
TopicMessageCodec.online,
TopicMessageCodec.offline)
.map(TopicMessageCodec::getRoute)
.filter(route -> route != null && route.isUpstream())
.map(route -> HttpRoute
.builder()
.address(route.getTopic())
.group(route.getGroup())
.contentType(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST)
.description(route.getDescription())
.example(route.getExample())
.build())
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.HTTP,
"document-http.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.setId("jetlinks.v2.0");
support.setName("JetLinks V2.0");
support.setDescription("JetLinks Protocol Version 2.0");
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig);
support.addConfigMetadata(DefaultTransport.CoAP, coapConfig);
support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig);
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT));
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
//TCP
support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig);
support.addMessageCodecSupport(new TcpDeviceMessageCodec());
support.setDocument(DefaultTransport.TCP,
"document-tcp.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
//UDP
support.addConfigMetadata(DefaultTransport.UDP, UDPDeviceMessageCodec.udpConfig);
support.addMessageCodecSupport(new UDPDeviceMessageCodec());
//MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());
//HTTP
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//Websocket
JetLinksHttpDeviceMessageCodec codec = new JetLinksHttpDeviceMessageCodec(DefaultTransport.WebSocket);
support.addMessageCodecSupport(codec);
support.addAuthenticator(DefaultTransport.WebSocket, codec);
support.addRoutes(
DefaultTransport.WebSocket,
Collections.singleton(
WebsocketRoute
.builder()
.path("/{productId:产品ID}/{productId:设备ID}/socket")
.description("通过Websocket接入平台")
.build()
));
//CoAP
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
return Mono.just(support);

View File

@ -11,7 +11,6 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -20,122 +19,26 @@ import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Function;
public enum TopicMessageCodec {
//上报属性数据
reportProperty("/*/properties/report",
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性
readProperty("/*/properties/read",
ReadPropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
//读取属性回复
readPropertyReply("/*/properties/read/reply",
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性
writeProperty("/*/properties/write",
WritePropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性回复
writePropertyReply("/*/properties/write/reply",
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
reportProperty("/*/properties/report", ReportPropertyMessage.class),
//事件上报
event("/*/event/*",
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{eventId:事件ID}";
}
@Override
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String event = topic[topic.length - 1];
return Mono.from(super.doDecode(mapper, topic, payload))
.cast(EventMessage.class)
.doOnNext(e -> e.setEvent(event))
.cast(DeviceMessage.class);
}
@Override
void refactorTopic(String[] topics, DeviceMessage message) {
super.refactorTopic(topics, message);
EventMessage event = ((EventMessage) message);
topics[topics.length - 1] = String.valueOf(event.getEvent());
}
},
event("/*/event/*", EventMessage.class),
//读取属性
readProperty("/*/properties/read", ReadPropertyMessage.class),
//读取属性回复
readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class),
//修改属性
writeProperty("/*/properties/write", WritePropertyMessage.class),
//修改属性回复
writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class),
//调用功能
functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
functionInvoke("/*/function/invoke", FunctionInvokeMessage.class),
//调用功能回复
functionInvokeReply("/*/function/invoke/reply",
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class),
//子设备消息
child("/*/child/*/**",
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
child("/*/child/*/**", ChildDeviceMessage.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
@ -171,19 +74,7 @@ public enum TopicMessageCodec {
}
}, //子设备消息回复
childReply("/*/child-reply/*/**",
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
@ -220,21 +111,13 @@ public enum TopicMessageCodec {
}
},
//更新标签
updateTag("/*/tags",
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
updateTag("/*/tags", UpdateTagMessage.class),
//注册
register("/*/register", DeviceRegisterMessage.class),
//注销
unregister("/*/unregister", DeviceUnRegisterMessage.class),
//更新固件消息
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
//更新固件消息回复
upgradeFirmwareReply("/*/firmware/upgrade/reply", UpgradeFirmwareMessageReply.class),
//更新固件升级进度消息
upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
//拉取固件
@ -244,8 +127,6 @@ public enum TopicMessageCodec {
//上报固件版本
reportFirmware("/*/firmware/report", ReportFirmwareMessage.class),
//读取固件回复
readFirmware("/*/firmware/read", ReadFirmwareMessage.class),
//读取固件回复
readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class),
//派生物模型上报
derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class),
@ -264,15 +145,9 @@ public enum TopicMessageCodec {
//断开连接回复
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
//上线
online("/*/online", DeviceOnlineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备上线")),
connect("/*/online", DeviceOnlineMessage.class),
//离线
offline("/*/offline", DeviceOfflineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备离线")),
offline("/*/offline", DeviceOfflineMessage.class),
//日志
log("/*/log", DeviceLogMessage.class),
//状态检查
@ -280,49 +155,14 @@ public enum TopicMessageCodec {
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
;
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type,
Function<MqttRoute.Builder, MqttRoute.Builder> routeCustom) {
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
this.route = routeCustom.apply(toRoute()).build();
}
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
this.route = null;
}
private final String[] pattern;
private final MqttRoute route;
private final Class<? extends DeviceMessage> type;
protected void transMqttTopic(String[] topic) {
}
@SneakyThrows
private MqttRoute.Builder toRoute() {
String[] topics = new String[pattern.length];
System.arraycopy(pattern, 0, topics, 0, pattern.length);
topics[0] = "{productId:产品ID}";
topics[1] = "{deviceId:设备ID}";
transMqttTopic(topics);
StringJoiner joiner = new StringJoiner("/", "/", "");
for (String topic : topics) {
joiner.add(topic);
}
return MqttRoute
.builder(joiner.toString())
.qos(1);
}
public MqttRoute getRoute() {
return route;
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
return Mono
.justOrEmpty(fromTopic(topics))

View File

@ -2,13 +2,11 @@ package org.jetlinks.protocol.official;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@AllArgsConstructor(staticName = "of") // FIXME 使用对象池,节省一丢丢内存?
public class TopicPayload {
private String topic;

View File

@ -1,7 +0,0 @@
package org.jetlinks.protocol.official.binary;
public enum AckCode {
ok,
noAuth,
unsupportedMessage
}

View File

@ -1,40 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
public class BinaryAcknowledgeDeviceMessage implements BinaryMessage<AcknowledgeDeviceMessage> {
public static final HeaderKey<String> codeHeader = HeaderKey.of("code", AckCode.ok.name());
private AcknowledgeDeviceMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.ack;
}
@Override
public void read(ByteBuf buf) {
message = new AcknowledgeDeviceMessage();
AckCode code = AckCode.values()[buf.readUnsignedByte()];
message.addHeader(codeHeader, code.name());
}
@Override
public void write(ByteBuf buf) {
AckCode code = AckCode.valueOf(this.message.getHeaderOrDefault(codeHeader));
buf.writeByte(code.ordinal());
}
@Override
public void setMessage(AcknowledgeDeviceMessage message) {
this.message = message;
}
@Override
public AcknowledgeDeviceMessage getMessage() {
return message;
}
}

View File

@ -1,45 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
/**
*
*/
public class BinaryDeviceOnlineMessage implements BinaryMessage<DeviceOnlineMessage> {
public static final HeaderKey<String> loginToken = HeaderKey.of("token", null);
private DeviceOnlineMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.online;
}
@Override
public void read(ByteBuf buf) {
message = new DeviceOnlineMessage();
message.addHeader(loginToken, (String) DataType.STRING.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING
.write(
buf, message.getHeader(loginToken).orElse("")
);
}
@Override
public void setMessage(DeviceOnlineMessage message) {
this.message = message;
}
@Override
public DeviceOnlineMessage getMessage() {
return message;
}
}

View File

@ -1,46 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.event.EventMessage;
/**
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryEventMessage implements BinaryMessage<EventMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.event;
}
private EventMessage message;
@Override
public void read(ByteBuf buf) {
message = new EventMessage();
message.setEvent((String) DataType.STRING.read(buf));
message.setData(DataType.OBJECT.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getEvent());
DataType.OBJECT.write(buf, message.getData());
}
@Override
public void setMessage(EventMessage message) {
this.message = message;
}
@Override
public EventMessage getMessage() {
return message;
}
}

View File

@ -1,50 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import java.util.Map;
import java.util.stream.Collectors;
public class BinaryFunctionInvokeMessage implements BinaryMessage<FunctionInvokeMessage> {
private FunctionInvokeMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.function;
}
@Override
public void read(ByteBuf buf) {
message = new FunctionInvokeMessage();
message.setFunctionId((String) DataType.STRING.read(buf));
@SuppressWarnings("all")
Map<String, Object> params = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setInputs(
params
.entrySet()
.stream()
.map(e -> new FunctionParameter(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getFunctionId());
DataType.OBJECT.write(buf,message.inputsToMap());
}
@Override
public void setMessage(FunctionInvokeMessage message) {
this.message = message;
}
@Override
public FunctionInvokeMessage getMessage() {
return message;
}
}

View File

@ -1,40 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryFunctionInvokeMessageReply extends BinaryReplyMessage<FunctionInvokeMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.functionReply;
}
@Override
protected FunctionInvokeMessageReply newMessage() {
return new FunctionInvokeMessageReply();
}
@Override
protected void doReadSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
msg.setFunctionId((String) DataType.readFrom(buf));
msg.setOutput(DataType.readFrom(buf));
}
@Override
protected void doWriteSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
DataType.writeTo(getMessage().getFunctionId(), buf);
DataType.writeTo(msg.getOutput(), buf);
}
}

View File

@ -1,19 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessage;
import reactor.core.publisher.Flux;
public interface BinaryMessage<T extends DeviceMessage> {
BinaryMessageType getType();
void read(ByteBuf buf);
void write(ByteBuf buf);
void setMessage(T message);
T getMessage();
}

View File

@ -1,223 +0,0 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public enum BinaryMessageType {
//0x00
keepalive(null, null),
//0x01
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
//0x02
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
//0x03
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
//0x04
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
//0x05
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
writePropertyReply(WritePropertyMessageReply.class, BinaryWritePropertyMessageReply::new),
function(FunctionInvokeMessage.class, BinaryFunctionInvokeMessage::new),
functionReply(FunctionInvokeMessageReply.class, BinaryFunctionInvokeMessageReply::new),
event(EventMessage.class, BinaryEventMessage::new);
private final Class<? extends DeviceMessage> forDevice;
private final Supplier<BinaryMessage<DeviceMessage>> forTcp;
private static final BinaryMessageType[] VALUES = values();
public static final HeaderKey<Integer> HEADER_MSG_SEQ = HeaderKey.of("_seq", 0, Integer.class);
@SuppressWarnings("all")
BinaryMessageType(Class<? extends DeviceMessage> forDevice,
Supplier<? extends BinaryMessage<?>> forTcp) {
this.forDevice = forDevice;
this.forTcp = (Supplier) forTcp;
}
private static final Map<String, MsgIdHolder> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.<String, MsgIdHolder>build()
.asMap();
private static class MsgIdHolder {
private int msgId = 0;
private final Map<Integer, String> cached = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.<Integer, String>build()
.asMap();
public int next(String id) {
if (id == null) {
return -1;
}
do {
if (msgId++ < 0) {
msgId = 0;
}
} while (cached.putIfAbsent(msgId, id) != null);
return msgId;
}
public String getAndRemove(int id) {
if (id < 0) {
return null;
}
return cached.remove(id);
}
}
@SneakyThrows
private static MsgIdHolder takeHolder(String deviceId) {
return cache.computeIfAbsent(deviceId, (ignore) -> new MsgIdHolder());
}
public static ByteBuf write(DeviceMessage message, ByteBuf data) {
int msgId = message.getHeaderOrElse(HEADER_MSG_SEQ, () -> takeHolder(message.getDeviceId()).next(message.getMessageId()));
return write(message, msgId, data);
}
public static ByteBuf write(BinaryMessageType type, ByteBuf data) {
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 0-4字节 时间戳
data.writeLong(System.currentTimeMillis());
return data;
}
public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) {
BinaryMessageType type = lookup(message);
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 第1-8字节 时间戳
data.writeLong(message.getTimestamp());
// 9-11字节 消息序号
data.writeShort(msgId);
// 12... 字节 设备ID
DataType.STRING.write(data, message.getDeviceId());
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
tcp.setMessage(message);
//写出数据到ByteBuf
tcp.write(data);
return data;
}
public static DeviceMessage read(ByteBuf data) {
return read(data, null);
}
public static <T> T read(ByteBuf data,
String deviceIdMaybe,
BiFunction<DeviceMessage, Integer, T> handler) {
//第0个字节是消息类型
BinaryMessageType type = VALUES[data.readByte()];
if (type.forTcp == null) {
return null;
}
// 1-8字节 时间戳
long timestamp = data.readLong();
// 9-11字节 消息序号
int msgId = data.readUnsignedShort();
// 12... 字节 设备ID
String deviceId = (String) DataType.STRING.read(data);
if (deviceId == null) {
deviceId = deviceIdMaybe;
}
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
//从ByteBuf读取
tcp.read(data);
DeviceMessage message = tcp.getMessage();
message.thingId(DeviceThingType.device, deviceId);
if (timestamp > 0) {
message.timestamp(timestamp);
}
message.addHeader(HEADER_MSG_SEQ, msgId);
return handler.apply(message, msgId);
}
public static DeviceMessage read(ByteBuf data, String deviceIdMaybe) {
return read(data, deviceIdMaybe, (message, msgId) -> {
String messageId = null;
if (message.getDeviceId() != null) {
//获取实际平台下发的消息ID
MsgIdHolder holder = cache.get(message.getDeviceId());
if (holder != null) {
messageId = holder.getAndRemove(msgId);
}
}
if (messageId == null && msgId > 0) {
messageId = String.valueOf(msgId);
}
message.messageId(messageId);
return message;
});
}
public static BinaryMessageType lookup(DeviceMessage message) {
for (BinaryMessageType value : VALUES) {
if (value.forDevice != null && value.forDevice.isInstance(message)) {
return value;
}
}
throw new UnsupportedOperationException("unsupported device message " + message.getMessageType());
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (BinaryMessageType value : BinaryMessageType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}

View File

@ -1,50 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.List;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReadPropertyMessage implements BinaryMessage<ReadPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readProperty;
}
private ReadPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReadPropertyMessage();
@SuppressWarnings("all")
List<String> list = (List<String>) DataType.ARRAY.read(buf);
message.setProperties(list);
}
@Override
public void write(ByteBuf buf) {
DataType.ARRAY.write(buf, message.getProperties());
}
@Override
public void setMessage(ReadPropertyMessage message) {
this.message = message;
}
@Override
public ReadPropertyMessage getMessage() {
return message;
}
}

View File

@ -1,41 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryReadPropertyMessageReply extends BinaryReplyMessage<ReadPropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected ReadPropertyMessageReply newMessage() {
return new ReadPropertyMessageReply();
}
@Override
protected void doWriteSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
@Override
protected void doReadSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
}

View File

@ -1,52 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessageReply;
import java.util.Map;
public abstract class BinaryReplyMessage<T extends DeviceMessageReply> implements BinaryMessage<T> {
private T message;
protected abstract T newMessage();
@Override
public final void read(ByteBuf buf) {
message = newMessage();
boolean success = buf.readBoolean();
if (success) {
doReadSuccess(message, buf);
} else {
message.success(false);
message.code(String.valueOf(DataType.readFrom(buf)));
message.message(String.valueOf(DataType.readFrom(buf)));
}
}
protected abstract void doReadSuccess(T msg, ByteBuf buf);
protected abstract void doWriteSuccess(T msg, ByteBuf buf);
@Override
public final void write(ByteBuf buf) {
buf.writeBoolean(message.isSuccess());
if (message.isSuccess()) {
doWriteSuccess(message, buf);
} else {
DataType.writeTo(message.getCode(), buf);
DataType.writeTo(message.getMessage(), buf);
}
}
@Override
public void setMessage(T message) {
this.message = message;
}
@Override
public T getMessage() {
return message;
}
}

View File

@ -1,48 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReportPropertyMessage implements BinaryMessage<ReportPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.reportProperty;
}
private ReportPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReportPropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(ReportPropertyMessage message) {
this.message = message;
}
@Override
public ReportPropertyMessage getMessage() {
return message;
}
}

View File

@ -1,49 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryWritePropertyMessage implements BinaryMessage<WritePropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.writeProperty;
}
private WritePropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new WritePropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(WritePropertyMessage message) {
this.message = message;
}
@Override
public WritePropertyMessage getMessage() {
return message;
}
}

View File

@ -1,40 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryWritePropertyMessageReply extends BinaryReplyMessage<WritePropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected WritePropertyMessageReply newMessage() {
return new WritePropertyMessageReply();
}
@Override
protected void doReadSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
@Override
protected void doWriteSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
}

View File

@ -1,286 +0,0 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.protocol.official.ObjectMappers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public enum DataType {
//0x00
NULL {
@Override
public Object read(ByteBuf buf) {
return null;
}
@Override
public void write(ByteBuf buf, Object value) {
}
},
//0x01
BOOLEAN {
@Override
public Object read(ByteBuf buf) {
return buf.readBoolean();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeBoolean((Boolean) value);
}
},
//0x02
INT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x03
INT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x04
INT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x05
INT64 {
@Override
public Object read(ByteBuf buf) {
return buf.readLong();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeLong((Long) value);
}
},
//0x06
UINT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x07
UINT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x08
UINT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x09
FLOAT {
@Override
public Object read(ByteBuf buf) {
return buf.readFloat();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeFloat((Float) value);
}
},
//0x0A
DOUBLE {
@Override
public Object read(ByteBuf buf) {
return buf.readDouble();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeDouble((Double) value);
}
},
//0x0B
STRING {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = ((String) value).getBytes();
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0C
BINARY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return bytes;
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = (byte[]) value;
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0D
ARRAY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
List<Object> array = new ArrayList<>(len);
for (int i = 0; i < len; i++) {
array.add(readFrom(buf));
}
return array;
}
@Override
public void write(ByteBuf buf, Object value) {
Collection<Object> array = (Collection<Object>) value;
buf.writeShort(array.size());
for (Object o : array) {
writeTo(o, buf);
}
}
},
//0x0E
OBJECT {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
Map<String, Object> data = Maps.newLinkedHashMapWithExpectedSize(len);
for (int i = 0; i < len; i++) {
data.put((String) STRING.read(buf), readFrom(buf));
}
return data;
}
@Override
public void write(ByteBuf buf, Object value) {
Map<String, Object> data = value instanceof Map ? ((Map) value) : ObjectMappers.JSON_MAPPER.convertValue(value, Map.class);
buf.writeShort(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
STRING.write(buf, entry.getKey());
writeTo(entry.getValue(), buf);
}
}
};
private final static DataType[] VALUES = values();
public abstract Object read(ByteBuf buf);
public abstract void write(ByteBuf buf, Object value);
public static Object readFrom(ByteBuf buf) {
return VALUES[buf.readUnsignedByte()].read(buf);
}
public static void writeTo(Object data, ByteBuf buf) {
DataType type = loopUpType(data);
buf.writeByte(type.ordinal());
type.write(buf, data);
}
private static DataType loopUpType(Object data) {
if (data == null) {
return NULL;
} else if (data instanceof Boolean) {
return BOOLEAN;
} else if (data instanceof Byte) {
return INT8;
} else if (data instanceof Short) {
return INT16;
} else if (data instanceof Integer) {
return INT32;
} else if (data instanceof Long) {
return INT64;
} else if (data instanceof Float) {
return FLOAT;
} else if (data instanceof Double) {
return DOUBLE;
} else if (data instanceof String) {
return STRING;
} else if (data instanceof byte[]) {
return BINARY;
} else if (data instanceof Collection) {
return ARRAY;
} else if (data instanceof Map) {
return OBJECT;
} else {
throw new IllegalArgumentException("Unsupported data type: " + data.getClass());
}
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (DataType value : DataType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}

View File

@ -1,10 +0,0 @@
package org.jetlinks.protocol.official.functional;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TimeSyncRequest {
private String messageId;
}

View File

@ -1,15 +0,0 @@
package org.jetlinks.protocol.official.functional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TimeSyncResponse {
private String messageId;
private long timestamp;
}

View File

@ -1,234 +0,0 @@
package org.jetlinks.protocol.official.http;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Objects;
/**
* Http 的消息编解码器
*
* @author zhouhao
* @since 3.0.0
*/
@Slf4j
public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec, Authenticator {
public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置"
, "使用HTTP Bearer Token进行认证")
.add("bearer_token", "Token", "Token", new PasswordType());
private final Transport transport;
public JetLinksHttpDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public JetLinksHttpDeviceMessageCodec() {
this(DefaultTransport.HTTP);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
JSONObject json = context.getMessage().toJson();
//通过websocket下发
return Mono.just(DefaultWebSocketMessage.of(
WebSocketMessage.Type.TEXT,
Unpooled.wrappedBuffer(json.toJSONString().getBytes())));
}
private static SimpleHttpResponseMessage unauthorized(String msg) {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}")
.status(401)
.build();
}
private static SimpleHttpResponseMessage badRequest() {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"bad_request\"}")
.status(400)
.build();
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof HttpExchangeMessage) {
return decodeHttp(context);
}
if (context.getMessage() instanceof WebSocketSessionMessage) {
return decodeWebsocket(context);
}
return Flux.empty();
}
private Flux<DeviceMessage> decodeWebsocket(MessageDecodeContext context) {
WebSocketSessionMessage msg = ((WebSocketSessionMessage) context.getMessage());
return Mono
.justOrEmpty(MessageType.convertMessage(msg.payloadAsJson()))
.cast(DeviceMessage.class)
.flux();
}
private Flux<DeviceMessage> decodeHttp(MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
//校验请求头中的Authorization header,格式:
// Authorization: Bearer <token>
Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null);
if (header == null || header.getValue() == null || header.getValue().length == 0) {
return message
.response(unauthorized("Authorization header is required"))
.thenMany(Mono.empty());
}
String[] token = header.getValue()[0].split(" ");
if (token.length == 1) {
return message
.response(unauthorized("Illegal token format"))
.thenMany(Mono.empty());
}
String basicToken = token[1];
String[] paths = TopicMessageCodec.removeProductPath(message.getPath());
if (paths.length < 1) {
return message
.response(badRequest())
.thenMany(Mono.empty());
}
String deviceId = paths[1];
return context
.getDevice(deviceId)
.flatMap(device -> device.getConfig("bearer_token"))
//校验token
.filter(value -> Objects.equals(value.asString(), basicToken))
//设备或者配置不对
.switchIfEmpty(Mono.defer(() -> message
.response(unauthorized("Device no register or token not match"))
.then(Mono.empty())))
//解码
.flatMapMany(ignore -> doDecode(message, paths))
.switchOnFirst((s, flux) -> {
Mono<Void> handler;
//有结果则认为成功
if (s.hasValue()) {
handler = message.ok("{\"success\":true}");
} else {
return message
.response(badRequest())
.then(Mono.empty());
}
return handler.thenMany(flux);
})
.onErrorResume(err -> message
.error(500, getErrorMessage(err))
.then(Mono.error(err)))
//跟踪信息
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(deviceId),
builder -> builder.setAttribute(DeviceTracer.SpanKey.message, message.print())));
}
private Flux<DeviceMessage> doDecode(HttpExchangeMessage message, String[] paths) {
return message
.payload()
.flatMapMany(buf -> {
byte[] body = ByteBufUtil.getBytes(buf);
return TopicMessageCodec.decode(ObjectMappers.JSON_MAPPER, paths, body);
});
}
public String getErrorMessage(Throwable err) {
if (err instanceof JsonParseException) {
return "{\"success\":false,\"code\":\"request_body_format_error\"}";
}
return "{\"success\":false,\"code\":\"server_error\"}";
}
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator device) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String token = req
.getSocketSession()
.getQueryParameters()
.get("token");
if (StringUtils.isEmpty(token)) {
return Mono.just(AuthenticationResponse.error(401, "认证参数错误"));
}
return device
.getConfig("bearer_token")
//校验token
.filter(value -> Objects.equals(value.asString(), token))
.map(ignore -> AuthenticationResponse.success(device.getDeviceId()))
//未配置或者配置不对
.switchIfEmpty(Mono.fromSupplier(() -> AuthenticationResponse.error(401, "token错误")));
}
static AuthenticationResponse deviceNotFound = AuthenticationResponse.error(404, "设备不存在");
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String[] paths = TopicMessageCodec.removeProductPath(req.getSocketSession().getPath());
if (paths.length < 1) {
return Mono.just(AuthenticationResponse.error(400, "URL格式错误"));
}
return registry
.getDevice(paths[1])
.flatMap(device -> authenticate(request, device))
.defaultIfEmpty(deviceNotFound);
}
}

View File

@ -1,127 +0,0 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.SneakyThrows;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TcpDevice {
@SneakyThrows
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
int start = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int count = args.length > 1 ? Integer.parseInt(args[1]) : 8000;
String[] hosts = args.length > 2 ? args[2].split(",") : new String[]{"0.0.0.0"};
Flux.range(start, count)
.flatMap(i -> Mono
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
RecordParser parser = RecordParser.newFixed(4);
AtomicReference<Buffer> buffer = new AtomicReference<>();
parser.handler(buf -> {
buffer.accumulateAndGet(buf, (a, b) -> {
if (a == null) {
parser.fixedSizeMode(buf.getInt(0));
return b;
}
parser.fixedSizeMode(4);
sink.success("tcp-off-" + i + ":" + socket.localAddress());
BinaryMessageType
.read(b.getByteBuf(),
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
return null;
});
});
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(parser);
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
});
}),
1024
)
.count()
.subscribe(System.out::println);
System.in.read();
}
protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) {
if (!(downstream instanceof AcknowledgeDeviceMessage)) {
// System.out.println(downstream);
}
DeviceMessage reply = null;
if (downstream instanceof ReadPropertyMessage) {
reply = ((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
"temp0",
ThreadLocalRandom
.current()
.nextFloat() * 100
));
} else if (downstream instanceof WritePropertyMessage) {
reply = ((WritePropertyMessage) downstream)
.newReply()
.success(((WritePropertyMessage) downstream).getProperties());
}
if (reply != null) {
socket.write(
Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(
reply
, seq, Unpooled.buffer())))
);
}
}
}

View File

@ -1,123 +0,0 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.AckCode;
import org.jetlinks.protocol.official.binary.BinaryAcknowledgeDeviceMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class TcpDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata tcpConfig = new DefaultConfigMetadata(
"TCP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.TCP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//read index
payload.readInt();
//处理tcp连接后的首次消息
if (context.getDevice() == null) {
return handleLogin(payload, context);
}
return Mono.justOrEmpty(BinaryMessageType.read(payload, context.getDevice().getDeviceId()));
}
private Mono<DeviceMessage> handleLogin(ByteBuf payload, MessageDecodeContext context) {
DeviceMessage message = BinaryMessageType.read(payload);
if (message instanceof DeviceOnlineMessage) {
String token = message
.getHeader(BinaryDeviceOnlineMessage.loginToken)
.orElse(null);
String deviceId = message.getDeviceId();
return context
.getDevice(deviceId)
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
} else {
return ack(message, AckCode.noAuth, context);
}
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return Unpooled.wrappedBuffer(
Unpooled.buffer().writeInt(payload.writerIndex()),
payload);
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
if(source==null){
return Mono.empty();
}
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(EncodedMessage.simple(
wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))
))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return Mono.just(EncodedMessage.simple(
wrapByteByf(
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
)
));
}
}

View File

@ -1,114 +0,0 @@
package org.jetlinks.protocol.official.udp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class UDPDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata udpConfig = new DefaultConfigMetadata(
"UDP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.UDP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//todo 认证类型, 0 token,1 sign
byte authType = payload.readByte();
//前面是token
String token = (String) DataType.STRING.read(payload);
//接下来是消息
DeviceMessage message = BinaryMessageType.read(payload);
return context
.getDevice(message.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return payload;
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(doEncode(message, ""))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return context
.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.map(config -> doEncode(deviceMessage, config.asString())));
}
private EncodedMessage doEncode(DeviceMessage message, String token) {
ByteBuf buf = Unpooled.buffer();
//todo 认证类型, 0 token,1 sign
buf.writeByte(0);
//token
DataType.STRING.write(buf, token);
//指令
return EncodedMessage.simple(wrapByteByf(BinaryMessageType.write(message, buf)));
}
}

View File

@ -1,29 +0,0 @@
#### 使用HTTP推送设备数据
上报属性例子:
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
上报事件例子:
```http request
POST /{productId}/{deviceId}/event/{eventId}
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"data":{
"address": ""
}
}
```

View File

@ -1,12 +0,0 @@
### 认证说明
CONNECT报文:
```text
clientId: 设备ID
username: secureId+"|"+timestamp
password: md5(secureId+"|"+timestamp+"|"+secureKey)
```
说明: secureId以及secureKey在创建设备产品或设备实例时进行配置.
timestamp为当前时间戳(毫秒),与服务器时间不能相差5分钟.
md5为32位,不区分大小写.

View File

@ -5,7 +5,6 @@ import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.Option;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
@ -20,37 +19,26 @@ import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.CoapExchangeMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicReference;
public class JetLinksCoapDeviceMessageCodecTest {
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
DeviceOperator device;
private final String key = RandomUtil.randomChar(16);
private String key = RandomUtil.randomChar(16);
private TestDeviceRegistry registry;
AtomicReference<Message> messageRef = new AtomicReference<>();
private CoapServer server;
@After
public void shutdown(){
server.stop();
}
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
device = registry
.register(DeviceInfo.builder()
.id("test")
@ -58,8 +46,14 @@ public class JetLinksCoapDeviceMessageCodecTest {
.build())
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
.block();
}
server = new CoapServer() {
@Test
@SneakyThrows
public void test() {
AtomicReference<Message> messageRef = new AtomicReference<>();
CoapServer server = new CoapServer() {
@Override
protected Resource createRoot() {
return new CoapResource("/", true) {
@ -78,11 +72,6 @@ public class JetLinksCoapDeviceMessageCodecTest {
public DeviceOperator getDevice() {
return device;
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
})
.doOnNext(messageRef::set)
.doOnError(Throwable::printStackTrace)
@ -101,14 +90,11 @@ public class JetLinksCoapDeviceMessageCodecTest {
.setPort(12341).build();
server.addEndpoint(endpoint);
server.start();
}
@Test
@SneakyThrows
public void test() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"data\":1}";
@ -118,31 +104,11 @@ public class JetLinksCoapDeviceMessageCodecTest {
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Thread.sleep(1000);
Assert.assertNotNull(messageRef.get());
Assert.assertTrue(messageRef.get() instanceof EventMessage);
System.out.println(messageRef.get());
}
@Test
@SneakyThrows
public void testTimeSync() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"messageId\":1}";
request.setURI("coap://localhost:12341/test/test/time-sync");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Assert.assertTrue(response.getResponseText().contains("timestamp"));
}
}

View File

@ -15,14 +15,11 @@ import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.server.session.DeviceSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@ -34,21 +31,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
TestDeviceRegistry registry;
private EncodedMessage currentReply;
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
registry.register(ProductInfo.builder()
.id("product1")
.protocol("jetlinks")
.build())
.id("product1")
.protocol("jetlinks")
.build())
.flatMap(product -> registry.register(DeviceInfo.builder()
.id("device1")
.productId("product1")
.build()))
.block();
.id("device1")
.productId("product1")
.build()))
.subscribe();
}
@ -88,10 +83,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
@ -105,19 +99,17 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) childReply.getChildDeviceMessage();
;
ReadPropertyMessageReply reply = (ReadPropertyMessageReply)childReply.getChildDeviceMessage();;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -163,10 +155,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testWritePropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof WritePropertyMessageReply);
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
@ -178,22 +169,21 @@ public class JetLinksMqttDeviceMessageCodecTest {
}
@Test
public void testWriteChildPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
WritePropertyMessageReply reply = (WritePropertyMessageReply) childReply.getChildDeviceMessage();
;
WritePropertyMessageReply reply = (WritePropertyMessageReply)childReply.getChildDeviceMessage();;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -237,10 +227,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testInvokeFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
@ -254,19 +243,17 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testInvokeChildFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply) childReply.getChildDeviceMessage();
;
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply)childReply.getChildDeviceMessage();;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -277,10 +264,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof EventMessage);
EventMessage reply = ((EventMessage) message);
@ -293,10 +279,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
@ -309,26 +294,16 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testPropertiesReport() {
Message message = codec.decode(createMessageContext(
SimpleMqttMessage
.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer(("{\"messageId\":\"test\"," +
"\"properties\":{\"sn\":\"test\"}," +
"\"propertySourceTimes\":{\"sn\":10086}" +
"}").getBytes()))
.build()))
.blockFirst();
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
Assert.assertNotNull(reply.getPropertySourceTimes());
Assert.assertEquals(reply.getPropertySourceTimes().get("sn"), Long.valueOf(10086L));
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply);
}
@ -336,10 +311,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
@ -354,10 +328,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof DerivedMetadataMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
@ -369,12 +342,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
@ -385,29 +356,6 @@ public class JetLinksMqttDeviceMessageCodecTest {
System.out.println(reply);
}
@Test
public void testTimeSync() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/time-sync")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\"}"
.getBytes()))
.build()))
.blockFirst();
Assert.assertNull(message);
Assert.assertNotNull(currentReply);
Assert.assertEquals(((MqttMessage) currentReply).getTopic(), "/product1/device1/time-sync/reply");
Assert.assertTrue(currentReply.payloadAsString().contains("timestamp"));
}
public void testTopic() {
}
public MessageEncodeContext createMessageContext(Message message) {
System.out.println(message.toString());
return new MessageEncodeContext() {
@ -421,99 +369,25 @@ public class JetLinksMqttDeviceMessageCodecTest {
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
public MessageDecodeContext createMessageContext(EncodedMessage message) {
System.out.println(message.toString());
return new FromDeviceMessageContext() {
return new MessageDecodeContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return message;
}
@Override
public DeviceSession getSession() {
return new MockSession();
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
class MockSession implements DeviceSession {
@Override
public String getId() {
return "device1";
}
@Override
public String getDeviceId() {
return "device1";
}
@Nullable
@Override
public DeviceOperator getOperator() {
return registry.getDevice("device1").block();
}
@Override
public long lastPingTime() {
return 0;
}
@Override
public long connectTime() {
return 0;
}
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
currentReply = encodedMessage;
return Mono.just(true);
}
@Override
public Transport getTransport() {
return null;
}
@Override
public void close() {
}
@Override
public void ping() {
}
@Override
public boolean isAlive() {
return false;
}
@Override
public void onClose(Runnable call) {
}
}
}

View File

@ -1,12 +1,8 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetlinks.core.codec.defaults.TopicPayloadCodec;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.route.Route;
import org.junit.Test;
import reactor.test.StepVerifier;
@ -40,35 +36,9 @@ public class TopicMessageCodecTest {
}
@Test
public void testRoute() {
for (TopicMessageCodec value : TopicMessageCodec.values()) {
Route route = value.getRoute();
if (null != route)
System.out.println(route.getAddress());
}
}
@Test
public void doTest() {
testChild(ObjectMappers.JSON_MAPPER);
testChild(ObjectMappers.CBOR_MAPPER);
}
@Test
public void testEvent() {
EventMessage eventMessage = new EventMessage();
eventMessage.setEvent("test");
eventMessage.setDeviceId("test-device");
eventMessage.setData("123");
TopicPayload payload = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, eventMessage);
assertEquals(payload.getTopic(), "/test-device/event/test");
DeviceMessage msg = TopicMessageCodec
.decode(ObjectMappers.JSON_MAPPER, payload.getTopic(), payload.getPayload())
.blockLast();
assertEquals(msg.toJson(), eventMessage.toJson());
}
}

View File

@ -1,119 +0,0 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class BinaryMessageTypeTest {
@Test
public void testOnline() {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1000");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
System.out.println(ByteBufUtil.prettyHexDump(byteBuf));
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
System.out.println(ByteBufUtil.prettyHexDump(buf));
//登录报文
System.out.println(ByteBufUtil.hexDump(buf));
}
@Test
public void testReport() {
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
}
@Test
public void testRead() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonList("temp"));
doTest(message);
ReadPropertyMessageReply reply = new ReadPropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testWrite() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
WritePropertyMessageReply reply = new WritePropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setFunctionId("123");
message.setDeviceId("test");
message.setMessageId("test123");
message.addInput("test", 1);
doTest(message);
FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setOutput(123);
doTest(reply);
}
public void doTest(DeviceMessage message) {
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
// System.out.println(ByteBufUtil.prettyHexDump(data));
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
System.out.println(ByteBufUtil.prettyHexDump(buf));
System.out.println(ByteBufUtil.hexDump(buf));
//将长度字节读取后直接解析报文正文
buf.readInt();
DeviceMessage read = BinaryMessageType.read(buf);
if (null != read.getHeaders()) {
read.getHeaders().forEach(message::addHeader);
}
System.out.println(read);
Assert.assertEquals(read.toString(), message.toString());
}
}