From c254a19ea98e97603ff0eb4bd502a8a8ec921585 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Wed, 30 Nov 2022 16:52:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96tcp=20udp=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=92=8C=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 51 +++++++- binary-protocol.md | 89 ++++++++++++++ .../JetLinksProtocolSupportProvider.java | 9 +- .../official/binary/BinaryMessageType.java | 24 +++- .../protocol/official/binary/DataType.java | 17 ++- .../official/tcp/TcpDeviceMessageCodec.java | 6 +- .../official/udp/UDPDeviceMessageCodec.java | 114 ++++++++++++++++++ src/main/resources/document-tcp.md | 0 .../binary/BinaryMessageTypeTest.java | 5 +- 9 files changed, 303 insertions(+), 12 deletions(-) create mode 100644 binary-protocol.md create mode 100644 src/main/java/org/jetlinks/protocol/official/udp/UDPDeviceMessageCodec.java create mode 100644 src/main/resources/document-tcp.md diff --git a/README.md b/README.md index aae53f6..4fe8989 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,54 @@ ## JetLinks 官方设备接入协议 -类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider` +JetLinks官方实现的设备接入协议,可用于参考实现自定义协议开发. + +### MQTT [查看说明](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) -MQTT 用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) \ No newline at end of file +用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html) + +### HTTP + +HTTP接入时需要使用`Bearer` +认证,URL和[MQTT的接入Topic]((http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html))一致. + +```http request +POST /{productId}/{deviceId}/properties/report +Authorization: Bearer {产品或者设备中配置的Token} +Content-Type: application/json + +{ + "properties":{ + "temp":38.5 + } +} +``` + +### TCP + +报文格式说明: + +第0-4字节对应的32位整型值为接下来报文的长度, + +后续为报文数据, +具体报文格式见: [二进制格式说明](binary-protocol.md) + +创建连接后第一个数据包需要发送[认证包](binary-protocol.md#0x01 online 首次连接), +密钥需要在`产品-设备接入`或者`设备详情`中进行配置 + +### UDP + +报文格式说明: + +第`0`字节表示认证类型,目前固定为0x00. + +第`1-n`字节为`密钥信息`,编码使用`STRING`见: [数据类型定义](binary-protocol.md#数据类型) + +密钥需要在`产品-设备接入`或者`设备详情`中进行配置 + +后续为报文数据,具体报文格式见: [二进制格式说明](binary-protocol.md) + +UDP无需发送认证包,但是需要每个报文中都包含密钥信息. + +除了ACK以外,其他平台下发的指令也都会包含认证密钥信息,用于设备侧校验请求. diff --git a/binary-protocol.md b/binary-protocol.md new file mode 100644 index 0000000..d6d5fb7 --- /dev/null +++ b/binary-protocol.md @@ -0,0 +1,89 @@ +## 头信息 + +| 字节序 | 类型 | 字段 | +|:----:|---------|:-----------:| +| 0 | INT8 | 消息类型 | +| 1-8 | INT64 | UTC时间戳 | +| 9-11 | INT16 | 消息序号 | +| 12-n | STRING | 设备ID | +| ... | MESSAGE | 消息类型对应的编码规则 | + +## 数据类型 + +所有数据类型均采用`大端`编码 + +| Byte | Type | 编码规则 | +|:----:|:-------:|---------------------------------------------| +| 0x00 | NULL | 0x01 | +| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true | +| 0x02 | INT8 | 1字节 (byte) | +| 0x03 | INT16 | 2字节整型 (short) | +| 0x04 | INT32 | 4字节整型 (int) | +| 0x05 | INT64 | 8字节整型 (long) | +| 0x06 | UINT8 | 1字节无符号整型 | +| 0x07 | UINT16 | 2字节无符号整型 | +| 0x08 | UINT32 | 4字节无符号整型 | +| 0x09 | FLOAT | 4字节 IEEE 754浮点数 | +| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 | +| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 | +| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 | +| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 | +| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value | + +## 消息类型定义 + +| Byte | Type | 说明 | +|:----:|:-------------------|--------| +| 0x00 | keepalive | 心跳 | +| 0x01 | online | 首次连接 | +| 0x02 | ack | 应答 | +| 0x03 | reportProperty | 上报属性 | +| 0x04 | readProperty | 读取属性 | +| 0x05 | readPropertyReply | 读取属性回复 | +| 0x06 | writeProperty | 修改属性 | +| 0x07 | writePropertyReply | 修改属性回复 | +| 0x08 | function | 功能调用 | +| 0x09 | functionReply | 功能调用回复 | + +### 0x00 keepalive 心跳 + +[ 0x00 ] + +### 0x01 online 首次连接 + +[ 0x01,STRING:密钥信息 ] + +### 0x02 ack 应答 + +[ 0x02,应答码 ] + +应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持. + +### 0x03 reportProperty 上报属性 + +[ 0x03,属性数据:OBJECT类型 ] + +### 0x04 readProperty 读取属性 + +[ 0x04,属性列表:ARRAY类型 ] + +### 0x05 readPropertyReply 读取属性回复 + +读取成功: + +[ 0x05,`0x01`,属性数据:OBJECT类型 ] + +读取失败: +[ 0x05,`0x00`,错误码:动态类型,错误消息:动态类型 ] + +`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型. + +如: 无错误信息 + +[ 0x05,0x00,`0x00`,`0x00` ] + +`INT8(0x02)`类型错误码:`0x04` + +[ 0x05,0x00,`0x02,0x04`,0x00 ] + +TODO 更多消息类型 \ No newline at end of file diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java index a794a69..a7ccff9 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java @@ -10,6 +10,7 @@ import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec; import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec; +import org.jetlinks.protocol.official.udp.UDPDeviceMessageCodec; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -34,7 +35,6 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider @Override public Mono create(ServiceContext context) { - return Mono.defer(() -> { CompositeProtocolSupport support = new CompositeProtocolSupport(); @@ -86,6 +86,13 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider //TCP support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig); support.addMessageCodecSupport(new TcpDeviceMessageCodec()); + support.setDocument(DefaultTransport.TCP, + "document-tcp.md", + JetLinksProtocolSupportProvider.class.getClassLoader()); + + //UDP + support.addConfigMetadata(DefaultTransport.UDP, UDPDeviceMessageCodec.udpConfig); + support.addMessageCodecSupport(new UDPDeviceMessageCodec()); //MQTT support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec()); diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java index e3f0da2..cdc1104 100644 --- a/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java @@ -1,22 +1,19 @@ package org.jetlinks.protocol.official.binary; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; import lombok.SneakyThrows; import org.jetlinks.core.device.DeviceThingType; import org.jetlinks.core.message.AcknowledgeDeviceMessage; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.HeaderKey; import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.message.property.*; import java.time.Duration; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -52,6 +49,9 @@ public enum BinaryMessageType { private static final BinaryMessageType[] VALUES = values(); + public static final HeaderKey HEADER_MSG_SEQ = HeaderKey.of("_seq", 0, Integer.class); + + @SuppressWarnings("all") BinaryMessageType(Class forDevice, Supplier> forTcp) { @@ -101,7 +101,7 @@ public enum BinaryMessageType { } public static ByteBuf write(DeviceMessage message, ByteBuf data) { - int msgId = takeHolder(message.getDeviceId()).next(message.getMessageId()); + int msgId = message.getHeaderOrElse(HEADER_MSG_SEQ, () -> takeHolder(message.getDeviceId()).next(message.getMessageId())); return write(message, msgId, data); } @@ -170,6 +170,7 @@ public enum BinaryMessageType { if (timestamp > 0) { message.timestamp(timestamp); } + message.addHeader(HEADER_MSG_SEQ, msgId); return handler.apply(message, msgId); } @@ -202,5 +203,18 @@ public enum BinaryMessageType { throw new UnsupportedOperationException("unsupported device message " + message.getMessageType()); } + public static void main(String[] args) { + System.out.println("| Byte | Type |"); + System.out.println("| ---- | ---- |"); + for (BinaryMessageType value : BinaryMessageType.values()) { + System.out.print("|"); + System.out.print("0x0"+Integer.toString(value.ordinal(),16)); + System.out.print("|"); + System.out.print(value.name()); + System.out.print("|"); + System.out.println(); + } + System.out.println(); + } } diff --git a/src/main/java/org/jetlinks/protocol/official/binary/DataType.java b/src/main/java/org/jetlinks/protocol/official/binary/DataType.java index f6d7e8b..92280ca 100644 --- a/src/main/java/org/jetlinks/protocol/official/binary/DataType.java +++ b/src/main/java/org/jetlinks/protocol/official/binary/DataType.java @@ -148,7 +148,7 @@ public enum DataType { STRING { @Override public Object read(ByteBuf buf) { - int len = buf.readInt(); + int len = buf.readUnsignedShort(); byte[] bytes = new byte[len]; buf.readBytes(bytes); return new String(bytes, StandardCharsets.UTF_8); @@ -158,7 +158,7 @@ public enum DataType { public void write(ByteBuf buf, Object value) { String str = (String) value; - buf.writeInt(str.length()); + buf.writeShort(str.length()); buf.writeBytes(str.getBytes()); } }, @@ -270,4 +270,17 @@ public enum DataType { } } + public static void main(String[] args) { + System.out.println("| Byte | Type |"); + System.out.println("| ---- | ---- |"); + for (DataType value : DataType.values()) { + System.out.print("|"); + System.out.print("0x0"+Integer.toString(value.ordinal(),16)); + System.out.print("|"); + System.out.print(value.name()); + System.out.print("|"); + System.out.println(); + } + System.out.println(); + } } diff --git a/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java index bcc75a3..046405d 100644 --- a/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java @@ -86,6 +86,10 @@ public class TcpDeviceMessageCodec implements DeviceMessageCodec { message.setMessageId(source.getMessageId()); message.setCode(code.name()); message.setSuccess(code == AckCode.ok); + + source.getHeader(BinaryMessageType.HEADER_MSG_SEQ) + .ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq)); + return ((FromDeviceMessageContext) context) .getSession() .send(EncodedMessage.simple( @@ -102,7 +106,7 @@ public class TcpDeviceMessageCodec implements DeviceMessageCodec { @Override public Publisher encode(@NonNull MessageEncodeContext context) { DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage()); - if(deviceMessage instanceof DisconnectDeviceMessage){ + if (deviceMessage instanceof DisconnectDeviceMessage) { return Mono.empty(); } return Mono.just(EncodedMessage.simple( diff --git a/src/main/java/org/jetlinks/protocol/official/udp/UDPDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/udp/UDPDeviceMessageCodec.java new file mode 100644 index 0000000..3dd22db --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/udp/UDPDeviceMessageCodec.java @@ -0,0 +1,114 @@ +package org.jetlinks.protocol.official.udp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import lombok.NonNull; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.message.*; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.types.PasswordType; +import org.jetlinks.protocol.official.binary.*; +import org.reactivestreams.Publisher; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +public class UDPDeviceMessageCodec implements DeviceMessageCodec { + + public static final String CONFIG_KEY_SECURE_KEY = "secureKey"; + + public static final DefaultConfigMetadata udpConfig = new DefaultConfigMetadata( + "UDP认证配置" + , "") + .add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType()); + + + @Override + public Transport getSupportTransport() { + return DefaultTransport.UDP; + } + + @NonNull + @Override + public Publisher decode(@NonNull MessageDecodeContext context) { + + ByteBuf payload = context.getMessage().getPayload(); + + //todo 认证类型, 0 token,1 sign + byte authType = payload.readByte(); + + //前面是token + String token = (String) DataType.STRING.read(payload); + + //接下来是消息 + DeviceMessage message = BinaryMessageType.read(payload); + + return context + .getDevice(message.getDeviceId()) + .flatMap(device -> device + .getConfig(CONFIG_KEY_SECURE_KEY) + .flatMap(config -> { + if (Objects.equals(config.asString(), token)) { + return ack(message, AckCode.ok, context) + .thenReturn(message); + } + return Mono.empty(); + })) + .switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context))); + } + + public static ByteBuf wrapByteByf(ByteBuf payload) { + + return payload; + } + + private Mono ack(DeviceMessage source, AckCode code, MessageDecodeContext context) { + AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage(); + message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name()); + message.setDeviceId(source.getDeviceId()); + message.setMessageId(source.getMessageId()); + message.setCode(code.name()); + message.setSuccess(code == AckCode.ok); + + source.getHeader(BinaryMessageType.HEADER_MSG_SEQ) + .ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq)); + + return ((FromDeviceMessageContext) context) + .getSession() + .send(doEncode(message, "")) + .then(Mono.fromRunnable(() -> { + if (source instanceof DeviceOnlineMessage && code != AckCode.ok) { + ((FromDeviceMessageContext) context).getSession().close(); + } + })); + } + + @NonNull + @Override + public Publisher encode(@NonNull MessageEncodeContext context) { + DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage()); + if (deviceMessage instanceof DisconnectDeviceMessage) { + return Mono.empty(); + } + + return context + .getDevice(deviceMessage.getDeviceId()) + .flatMap(device -> device + .getConfig(CONFIG_KEY_SECURE_KEY) + .map(config -> doEncode(deviceMessage, config.asString()))); + } + + private EncodedMessage doEncode(DeviceMessage message, String token) { + ByteBuf buf = Unpooled.buffer(); + //todo 认证类型, 0 token,1 sign + buf.writeByte(0); + //token + DataType.STRING.write(buf, token); + //指令 + return EncodedMessage.simple(wrapByteByf(BinaryMessageType.write(message, buf))); + + } + +} diff --git a/src/main/resources/document-tcp.md b/src/main/resources/document-tcp.md new file mode 100644 index 0000000..e69de29 diff --git a/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java b/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java index 95d0917..0af926b 100644 --- a/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java +++ b/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java @@ -63,7 +63,7 @@ public class BinaryMessageTypeTest { message.setFunctionId("123"); message.setDeviceId("test"); message.setMessageId("test123"); - message.addInput("test",1); + message.addInput("test", 1); doTest(message); FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply(); @@ -80,6 +80,9 @@ public class BinaryMessageTypeTest { BinaryMessageType.write(message, data); DeviceMessage read = BinaryMessageType.read(data); + if (null != read.getHeaders()) { + read.getHeaders().forEach(message::addHeader); + } System.out.println(read); Assert.assertEquals(read.toString(), message.toString()); }