优化tcp udp实现和说明

This commit is contained in:
zhouhao 2022-11-30 16:52:30 +08:00
parent ff37d0733c
commit c254a19ea9
9 changed files with 303 additions and 12 deletions

View File

@ -1,7 +1,54 @@
## JetLinks 官方设备接入协议 ## JetLinks 官方设备接入协议
类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider` JetLinks官方实现的设备接入协议,可用于参考实现自定义协议开发.
### MQTT
[查看说明](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) [查看说明](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html)
MQTT 用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) 用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html)
### HTTP
HTTP接入时需要使用`Bearer`
认证,URL和[MQTT的接入Topic]((http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html))一致.
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
### TCP
报文格式说明:
第0-4字节对应的32位整型值为接下来报文的长度,
后续为报文数据,
具体报文格式见: [二进制格式说明](binary-protocol.md)
创建连接后第一个数据包需要发送[认证包](binary-protocol.md#0x01 online 首次连接),
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
### UDP
报文格式说明:
第`0`字节表示认证类型,目前固定为0x00.
第`1-n`字节为`密钥信息`,编码使用`STRING`见: [数据类型定义](binary-protocol.md#数据类型)
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
后续为报文数据,具体报文格式见: [二进制格式说明](binary-protocol.md)
UDP无需发送认证包,但是需要每个报文中都包含密钥信息.
除了ACK以外,其他平台下发的指令也都会包含认证密钥信息,用于设备侧校验请求.

89
binary-protocol.md Normal file
View File

@ -0,0 +1,89 @@
## 头信息
| 字节序 | 类型 | 字段 |
|:----:|---------|:-----------:|
| 0 | INT8 | 消息类型 |
| 1-8 | INT64 | UTC时间戳 |
| 9-11 | INT16 | 消息序号 |
| 12-n | STRING | 设备ID |
| ... | MESSAGE | 消息类型对应的编码规则 |
## 数据类型
所有数据类型均采用`大端`编码
| Byte | Type | 编码规则 |
|:----:|:-------:|---------------------------------------------|
| 0x00 | NULL | 0x01 |
| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true |
| 0x02 | INT8 | 1字节 (byte) |
| 0x03 | INT16 | 2字节整型 (short) |
| 0x04 | INT32 | 4字节整型 (int) |
| 0x05 | INT64 | 8字节整型 (long) |
| 0x06 | UINT8 | 1字节无符号整型 |
| 0x07 | UINT16 | 2字节无符号整型 |
| 0x08 | UINT32 | 4字节无符号整型 |
| 0x09 | FLOAT | 4字节 IEEE 754浮点数 |
| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 |
| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 |
| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 |
| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 |
| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value |
## 消息类型定义
| Byte | Type | 说明 |
|:----:|:-------------------|--------|
| 0x00 | keepalive | 心跳 |
| 0x01 | online | 首次连接 |
| 0x02 | ack | 应答 |
| 0x03 | reportProperty | 上报属性 |
| 0x04 | readProperty | 读取属性 |
| 0x05 | readPropertyReply | 读取属性回复 |
| 0x06 | writeProperty | 修改属性 |
| 0x07 | writePropertyReply | 修改属性回复 |
| 0x08 | function | 功能调用 |
| 0x09 | functionReply | 功能调用回复 |
### 0x00 keepalive 心跳
[ 0x00 ]
### 0x01 online 首次连接
[ 0x01,STRING:密钥信息 ]
### 0x02 ack 应答
[ 0x02,应答码 ]
应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持.
### 0x03 reportProperty 上报属性
[ 0x03,属性数据:OBJECT类型 ]
### 0x04 readProperty 读取属性
[ 0x04,属性列表:ARRAY类型 ]
### 0x05 readPropertyReply 读取属性回复
读取成功:
[ 0x05,`0x01`,属性数据:OBJECT类型 ]
读取失败:
[ 0x05,`0x00`,错误码:动态类型,错误消息:动态类型 ]
`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型.
如: 无错误信息
[ 0x05,0x00,`0x00`,`0x00` ]
`INT8(0x02)`类型错误码:`0x04`
[ 0x05,0x00,`0x02,0x04`,0x00 ]
TODO 更多消息类型

View File

@ -10,6 +10,7 @@ import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec; import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec; import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec;
import org.jetlinks.protocol.official.udp.UDPDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.http.HttpMethod; import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -34,7 +35,6 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
@Override @Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) { public Mono<CompositeProtocolSupport> create(ServiceContext context) {
return Mono.defer(() -> { return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport(); CompositeProtocolSupport support = new CompositeProtocolSupport();
@ -86,6 +86,13 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
//TCP //TCP
support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig); support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig);
support.addMessageCodecSupport(new TcpDeviceMessageCodec()); support.addMessageCodecSupport(new TcpDeviceMessageCodec());
support.setDocument(DefaultTransport.TCP,
"document-tcp.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
//UDP
support.addConfigMetadata(DefaultTransport.UDP, UDPDeviceMessageCodec.udpConfig);
support.addMessageCodecSupport(new UDPDeviceMessageCodec());
//MQTT //MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec()); support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());

View File

@ -1,22 +1,19 @@
package org.jetlinks.protocol.official.binary; package org.jetlinks.protocol.official.binary;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceThingType; import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.AcknowledgeDeviceMessage; import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage; import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*; import org.jetlinks.core.message.property.*;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -52,6 +49,9 @@ public enum BinaryMessageType {
private static final BinaryMessageType[] VALUES = values(); private static final BinaryMessageType[] VALUES = values();
public static final HeaderKey<Integer> HEADER_MSG_SEQ = HeaderKey.of("_seq", 0, Integer.class);
@SuppressWarnings("all") @SuppressWarnings("all")
BinaryMessageType(Class<? extends DeviceMessage> forDevice, BinaryMessageType(Class<? extends DeviceMessage> forDevice,
Supplier<? extends BinaryMessage<?>> forTcp) { Supplier<? extends BinaryMessage<?>> forTcp) {
@ -101,7 +101,7 @@ public enum BinaryMessageType {
} }
public static ByteBuf write(DeviceMessage message, ByteBuf data) { public static ByteBuf write(DeviceMessage message, ByteBuf data) {
int msgId = takeHolder(message.getDeviceId()).next(message.getMessageId()); int msgId = message.getHeaderOrElse(HEADER_MSG_SEQ, () -> takeHolder(message.getDeviceId()).next(message.getMessageId()));
return write(message, msgId, data); return write(message, msgId, data);
} }
@ -170,6 +170,7 @@ public enum BinaryMessageType {
if (timestamp > 0) { if (timestamp > 0) {
message.timestamp(timestamp); message.timestamp(timestamp);
} }
message.addHeader(HEADER_MSG_SEQ, msgId);
return handler.apply(message, msgId); return handler.apply(message, msgId);
} }
@ -202,5 +203,18 @@ public enum BinaryMessageType {
throw new UnsupportedOperationException("unsupported device message " + message.getMessageType()); throw new UnsupportedOperationException("unsupported device message " + message.getMessageType());
} }
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (BinaryMessageType value : BinaryMessageType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
} }

View File

@ -148,7 +148,7 @@ public enum DataType {
STRING { STRING {
@Override @Override
public Object read(ByteBuf buf) { public Object read(ByteBuf buf) {
int len = buf.readInt(); int len = buf.readUnsignedShort();
byte[] bytes = new byte[len]; byte[] bytes = new byte[len];
buf.readBytes(bytes); buf.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8); return new String(bytes, StandardCharsets.UTF_8);
@ -158,7 +158,7 @@ public enum DataType {
public void write(ByteBuf buf, Object value) { public void write(ByteBuf buf, Object value) {
String str = (String) value; String str = (String) value;
buf.writeInt(str.length()); buf.writeShort(str.length());
buf.writeBytes(str.getBytes()); buf.writeBytes(str.getBytes());
} }
}, },
@ -270,4 +270,17 @@ public enum DataType {
} }
} }
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (DataType value : DataType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
} }

View File

@ -86,6 +86,10 @@ public class TcpDeviceMessageCodec implements DeviceMessageCodec {
message.setMessageId(source.getMessageId()); message.setMessageId(source.getMessageId());
message.setCode(code.name()); message.setCode(code.name());
message.setSuccess(code == AckCode.ok); message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context) return ((FromDeviceMessageContext) context)
.getSession() .getSession()
.send(EncodedMessage.simple( .send(EncodedMessage.simple(
@ -102,7 +106,7 @@ public class TcpDeviceMessageCodec implements DeviceMessageCodec {
@Override @Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) { public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage()); DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if(deviceMessage instanceof DisconnectDeviceMessage){ if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty(); return Mono.empty();
} }
return Mono.just(EncodedMessage.simple( return Mono.just(EncodedMessage.simple(

View File

@ -0,0 +1,114 @@
package org.jetlinks.protocol.official.udp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class UDPDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata udpConfig = new DefaultConfigMetadata(
"UDP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.UDP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//todo 认证类型, 0 token,1 sign
byte authType = payload.readByte();
//前面是token
String token = (String) DataType.STRING.read(payload);
//接下来是消息
DeviceMessage message = BinaryMessageType.read(payload);
return context
.getDevice(message.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return payload;
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(doEncode(message, ""))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return context
.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.map(config -> doEncode(deviceMessage, config.asString())));
}
private EncodedMessage doEncode(DeviceMessage message, String token) {
ByteBuf buf = Unpooled.buffer();
//todo 认证类型, 0 token,1 sign
buf.writeByte(0);
//token
DataType.STRING.write(buf, token);
//指令
return EncodedMessage.simple(wrapByteByf(BinaryMessageType.write(message, buf)));
}
}

View File

View File

@ -63,7 +63,7 @@ public class BinaryMessageTypeTest {
message.setFunctionId("123"); message.setFunctionId("123");
message.setDeviceId("test"); message.setDeviceId("test");
message.setMessageId("test123"); message.setMessageId("test123");
message.addInput("test",1); message.addInput("test", 1);
doTest(message); doTest(message);
FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply(); FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply();
@ -80,6 +80,9 @@ public class BinaryMessageTypeTest {
BinaryMessageType.write(message, data); BinaryMessageType.write(message, data);
DeviceMessage read = BinaryMessageType.read(data); DeviceMessage read = BinaryMessageType.read(data);
if (null != read.getHeaders()) {
read.getHeaders().forEach(message::addHeader);
}
System.out.println(read); System.out.println(read);
Assert.assertEquals(read.toString(), message.toString()); Assert.assertEquals(read.toString(), message.toString());
} }