diff --git a/.gitignore b/.gitignore index c56056e..8239d15 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ hs_err_pid* /static/ /upload /ui/upload/ -!/package/** \ No newline at end of file +!/package/** +dependency-reduced-pom.xml \ No newline at end of file diff --git a/pom.xml b/pom.xml index eeb78ad..b5bbd01 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,52 @@ + + all-in-one + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec + + + + + + + + + + + + + + + + @@ -201,6 +247,12 @@ 1.2.11 test + + io.vertx + vertx-core + 4.3.1 + compile + diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java index 54dea46..a794a69 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java @@ -9,6 +9,7 @@ import org.jetlinks.core.route.HttpRoute; import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec; +import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; @@ -80,16 +81,21 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig); - support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig); - support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig); + + + //TCP + support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig); + support.addMessageCodecSupport(new TcpDeviceMessageCodec()); //MQTT support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec()); //HTTP + support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig); support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec()); //CoAP + support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig); support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec()); return Mono.just(support); diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index 3e80b00..ddbae54 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -315,7 +315,8 @@ public enum TopicMessageCodec { joiner.add(topic); } return MqttRoute - .builder(joiner.toString()); + .builder(joiner.toString()) + .qos(1); } public MqttRoute getRoute() { diff --git a/src/main/java/org/jetlinks/protocol/official/binary/AckCode.java b/src/main/java/org/jetlinks/protocol/official/binary/AckCode.java new file mode 100644 index 0000000..cc5f915 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/AckCode.java @@ -0,0 +1,7 @@ +package org.jetlinks.protocol.official.binary; + +public enum AckCode { + ok, + noAuth, + unsupportedMessage +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryAcknowledgeDeviceMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryAcknowledgeDeviceMessage.java new file mode 100644 index 0000000..c4f7e29 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryAcknowledgeDeviceMessage.java @@ -0,0 +1,40 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import org.jetlinks.core.message.AcknowledgeDeviceMessage; +import org.jetlinks.core.message.HeaderKey; + +public class BinaryAcknowledgeDeviceMessage implements BinaryMessage { + + public static final HeaderKey codeHeader = HeaderKey.of("code", AckCode.ok.name()); + + private AcknowledgeDeviceMessage message; + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.ack; + } + + @Override + public void read(ByteBuf buf) { + message = new AcknowledgeDeviceMessage(); + AckCode code = AckCode.values()[buf.readUnsignedByte()]; + message.addHeader(codeHeader, code.name()); + } + + @Override + public void write(ByteBuf buf) { + AckCode code = AckCode.valueOf(this.message.getHeaderOrDefault(codeHeader)); + buf.writeByte(code.ordinal()); + } + + @Override + public void setMessage(AcknowledgeDeviceMessage message) { + this.message = message; + } + + @Override + public AcknowledgeDeviceMessage getMessage() { + return message; + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryDeviceOnlineMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryDeviceOnlineMessage.java new file mode 100644 index 0000000..d9300c2 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryDeviceOnlineMessage.java @@ -0,0 +1,45 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.HeaderKey; + +/** + * + */ +public class BinaryDeviceOnlineMessage implements BinaryMessage { + + public static final HeaderKey loginToken = HeaderKey.of("token", null); + + private DeviceOnlineMessage message; + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.online; + } + + @Override + public void read(ByteBuf buf) { + message = new DeviceOnlineMessage(); + message.addHeader(loginToken, (String) DataType.STRING.read(buf)); + } + + @Override + public void write(ByteBuf buf) { + DataType.STRING + .write( + buf, message.getHeader(loginToken).orElse("") + ); + } + + @Override + public void setMessage(DeviceOnlineMessage message) { + this.message = message; + } + + @Override + public DeviceOnlineMessage getMessage() { + return message; + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryFunctionInvokeMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryFunctionInvokeMessage.java new file mode 100644 index 0000000..7073b05 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryFunctionInvokeMessage.java @@ -0,0 +1,50 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionParameter; + +import java.util.Map; +import java.util.stream.Collectors; + +public class BinaryFunctionInvokeMessage implements BinaryMessage { + private FunctionInvokeMessage message; + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.function; + } + + @Override + public void read(ByteBuf buf) { + message = new FunctionInvokeMessage(); + message.setFunctionId((String) DataType.STRING.read(buf)); + + @SuppressWarnings("all") + Map params = (Map) DataType.OBJECT.read(buf); + message.setInputs( + params + .entrySet() + .stream() + .map(e -> new FunctionParameter(e.getKey(), e.getValue())) + .collect(Collectors.toList()) + ); + + } + + @Override + public void write(ByteBuf buf) { + DataType.STRING.write(buf,message.getFunctionId()); + DataType.OBJECT.write(buf,message.inputsToMap()); + } + + @Override + public void setMessage(FunctionInvokeMessage message) { + this.message = message; + } + + @Override + public FunctionInvokeMessage getMessage() { + return message; + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryFunctionInvokeMessageReply.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryFunctionInvokeMessageReply.java new file mode 100644 index 0000000..e646243 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryFunctionInvokeMessageReply.java @@ -0,0 +1,40 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.jetlinks.core.message.function.FunctionInvokeMessageReply; +import org.jetlinks.core.message.property.WritePropertyMessageReply; + +import java.util.Map; + +/** + * @author zhouhao + * @since 1.0 + */ +public class BinaryFunctionInvokeMessageReply extends BinaryReplyMessage { + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.functionReply; + } + + @Override + protected FunctionInvokeMessageReply newMessage() { + return new FunctionInvokeMessageReply(); + } + + @Override + protected void doReadSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) { + msg.setFunctionId((String) DataType.readFrom(buf)); + msg.setOutput(DataType.readFrom(buf)); + } + + @Override + protected void doWriteSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) { + DataType.writeTo(getMessage().getFunctionId(), buf); + DataType.writeTo(msg.getOutput(), buf); + } + + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessage.java new file mode 100644 index 0000000..bcc2709 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessage.java @@ -0,0 +1,19 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import org.jetlinks.core.message.DeviceMessage; +import reactor.core.publisher.Flux; + +public interface BinaryMessage { + + BinaryMessageType getType(); + + void read(ByteBuf buf); + + void write(ByteBuf buf); + + void setMessage(T message); + + T getMessage(); + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java new file mode 100644 index 0000000..0fb77f3 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java @@ -0,0 +1,181 @@ +package org.jetlinks.protocol.official.binary; + +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import io.netty.buffer.ByteBuf; +import lombok.SneakyThrows; +import org.jetlinks.core.device.DeviceThingType; +import org.jetlinks.core.message.AcknowledgeDeviceMessage; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessageReply; +import org.jetlinks.core.message.property.*; + +import java.time.Duration; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +public enum BinaryMessageType { + + online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new), + + ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new), + + reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new), + + readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new), + + readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new), + + writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new), + + writePropertyReply(WritePropertyMessageReply.class, BinaryWritePropertyMessageReply::new), + + function(FunctionInvokeMessage.class, BinaryFunctionInvokeMessage::new), + + functionReply(FunctionInvokeMessageReply.class, BinaryFunctionInvokeMessageReply::new); + + private final Class forDevice; + + private final Supplier> forTcp; + + private static final BinaryMessageType[] VALUES = values(); + + @SuppressWarnings("all") + BinaryMessageType(Class forDevice, + Supplier> forTcp) { + this.forDevice = forDevice; + this.forTcp = (Supplier) forTcp; + } + + private static final Map cache = CacheBuilder + .newBuilder() + .expireAfterWrite(Duration.ofHours(1)) + .build() + .asMap(); + + private static class MsgIdHolder { + private int msgId = 0; + private final BiMap cached = HashBiMap.create(); + + public synchronized int next(String id) { + if (id == null) { + return -1; + } + if (msgId++ < 0) { + msgId = 0; + } + cached.put(msgId, id); + return msgId; + } + + public String getAndRemove(int id) { + if (id < 0) { + return null; + } + return cached.remove(id); + } + + } + + @SneakyThrows + private static MsgIdHolder takeHolder(String deviceId) { + return cache.computeIfAbsent(deviceId, (ignore) -> new MsgIdHolder()); + } + + public static ByteBuf write(DeviceMessage message, ByteBuf data) { + int msgId = takeHolder(message.getDeviceId()).next(message.getMessageId()); + return write(message, msgId, data); + } + + public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) { + BinaryMessageType type = lookup(message); + // 第0个字节是消息类型 + data.writeByte(type.ordinal()); + // 0-4字节 时间戳 + data.writeLong(message.getTimestamp()); + + // 5-6字节 消息序号 + data.writeShort(msgId); + + // 7... 字节 设备ID + DataType.writeTo(message.getDeviceId(), data); + + // 创建消息对象 + BinaryMessage tcp = type.forTcp.get(); + + tcp.setMessage(message); + + //写出数据到ByteBuf + tcp.write(data); + return data; + } + + public static DeviceMessage read(ByteBuf data) { + return read(data, null); + } + + public static T read(ByteBuf data, + String deviceIdMaybe, + BiFunction handler) { + //第0个字节是消息类型 + BinaryMessageType type = VALUES[data.readByte()]; + if (type.forTcp == null) { + return null; + } + // 1-4字节 时间戳 + long timestamp = data.readLong(); + // 5-6字节 消息序号 + int msgId = data.readUnsignedShort(); + // 7... 字节 设备ID + String deviceId = (String) DataType.readFrom(data); + if (deviceId == null) { + deviceId = deviceIdMaybe; + } + + // 创建消息对象 + BinaryMessage tcp = type.forTcp.get(); + + //从ByteBuf读取 + tcp.read(data); + + DeviceMessage message = tcp.getMessage(); + message.thingId(DeviceThingType.device, deviceId); + message.timestamp(timestamp); + + return handler.apply(message, msgId); + } + + public static DeviceMessage read(ByteBuf data, String deviceIdMaybe) { + return read(data, deviceIdMaybe, (message, msgId) -> { + String messageId = null; + if (message.getDeviceId() != null) { + //获取实际平台下发的消息ID + MsgIdHolder holder = cache.get(message.getDeviceId()); + if (holder != null) { + messageId = holder.getAndRemove(msgId); + } + } + + if (messageId == null) { + messageId = String.valueOf(msgId); + } + message.messageId(messageId); + return message; + }); + } + + public static BinaryMessageType lookup(DeviceMessage message) { + for (BinaryMessageType value : VALUES) { + if (value.forDevice != null && value.forDevice.isInstance(message)) { + return value; + } + } + throw new UnsupportedOperationException("unsupported device message " + message.getMessageType()); + } + + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryReadPropertyMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReadPropertyMessage.java new file mode 100644 index 0000000..67fd250 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReadPropertyMessage.java @@ -0,0 +1,50 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.jetlinks.core.message.property.ReadPropertyMessage; +import org.jetlinks.core.message.property.ReportPropertyMessage; + +import java.util.List; +import java.util.Map; + +/** + * @author zhouhao + * @since 1.0 + */ +@AllArgsConstructor +@NoArgsConstructor +public class BinaryReadPropertyMessage implements BinaryMessage { + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.readProperty; + } + + private ReadPropertyMessage message; + + @Override + public void read(ByteBuf buf) { + message = new ReadPropertyMessage(); + @SuppressWarnings("all") + List list = (List) DataType.ARRAY.read(buf); + message.setProperties(list); + } + + @Override + public void write(ByteBuf buf) { + DataType.ARRAY.write(buf, message.getProperties()); + } + + @Override + public void setMessage(ReadPropertyMessage message) { + this.message = message; + } + + @Override + public ReadPropertyMessage getMessage() { + return message; + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryReadPropertyMessageReply.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReadPropertyMessageReply.java new file mode 100644 index 0000000..4c9ce7f --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReadPropertyMessageReply.java @@ -0,0 +1,41 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.jetlinks.core.message.property.ReadPropertyMessageReply; +import org.jetlinks.core.message.property.ReportPropertyMessage; + +import java.util.Map; + +/** + * @author zhouhao + * @since 1.0 + */ +public class BinaryReadPropertyMessageReply extends BinaryReplyMessage { + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.readPropertyReply; + } + + @Override + protected ReadPropertyMessageReply newMessage() { + return new ReadPropertyMessageReply(); + } + + @Override + protected void doWriteSuccess(ReadPropertyMessageReply msg, ByteBuf buf) { + DataType.OBJECT.write(buf, msg.getProperties()); + } + + @Override + protected void doReadSuccess(ReadPropertyMessageReply msg, ByteBuf buf) { + @SuppressWarnings("all") + Map map = (Map) DataType.OBJECT.read(buf); + msg.setProperties(map); + + } + + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryReplyMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReplyMessage.java new file mode 100644 index 0000000..04d8c4d --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReplyMessage.java @@ -0,0 +1,52 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import org.jetlinks.core.message.DeviceMessageReply; + +import java.util.Map; + +public abstract class BinaryReplyMessage implements BinaryMessage { + + private T message; + + protected abstract T newMessage(); + + @Override + public final void read(ByteBuf buf) { + message = newMessage(); + boolean success = buf.readBoolean(); + if (success) { + doReadSuccess(message, buf); + } else { + message.success(false); + message.code(String.valueOf(DataType.readFrom(buf))); + message.message(String.valueOf(DataType.readFrom(buf))); + } + } + + protected abstract void doReadSuccess(T msg, ByteBuf buf); + + protected abstract void doWriteSuccess(T msg, ByteBuf buf); + + @Override + public final void write(ByteBuf buf) { + buf.writeBoolean(message.isSuccess()); + + if (message.isSuccess()) { + doWriteSuccess(message, buf); + } else { + DataType.writeTo(message.getCode(), buf); + DataType.writeTo(message.getMessage(), buf); + } + } + + @Override + public void setMessage(T message) { + this.message = message; + } + + @Override + public T getMessage() { + return message; + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryReportPropertyMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReportPropertyMessage.java new file mode 100644 index 0000000..511cad1 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryReportPropertyMessage.java @@ -0,0 +1,48 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.jetlinks.core.message.property.ReportPropertyMessage; + +import java.util.Map; + +/** + * @author zhouhao + * @since 1.0 + */ +@AllArgsConstructor +@NoArgsConstructor +public class BinaryReportPropertyMessage implements BinaryMessage { + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.reportProperty; + } + + private ReportPropertyMessage message; + + @Override + public void read(ByteBuf buf) { + message = new ReportPropertyMessage(); + @SuppressWarnings("all") + Map map = (Map) DataType.OBJECT.read(buf); + message.setProperties(map); + } + + @Override + public void write(ByteBuf buf) { + DataType.OBJECT.write(buf, message.getProperties()); + } + + @Override + public void setMessage(ReportPropertyMessage message) { + this.message = message; + } + + @Override + public ReportPropertyMessage getMessage() { + return message; + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryWritePropertyMessage.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryWritePropertyMessage.java new file mode 100644 index 0000000..6045b0d --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryWritePropertyMessage.java @@ -0,0 +1,49 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.jetlinks.core.message.property.ReportPropertyMessage; +import org.jetlinks.core.message.property.WritePropertyMessage; + +import java.util.Map; + +/** + * @author zhouhao + * @since 1.0 + */ +@AllArgsConstructor +@NoArgsConstructor +public class BinaryWritePropertyMessage implements BinaryMessage { + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.writeProperty; + } + + private WritePropertyMessage message; + + @Override + public void read(ByteBuf buf) { + message = new WritePropertyMessage(); + @SuppressWarnings("all") + Map map = (Map) DataType.OBJECT.read(buf); + message.setProperties(map); + } + + @Override + public void write(ByteBuf buf) { + DataType.OBJECT.write(buf, message.getProperties()); + } + + @Override + public void setMessage(WritePropertyMessage message) { + this.message = message; + } + + @Override + public WritePropertyMessage getMessage() { + return message; + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryWritePropertyMessageReply.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryWritePropertyMessageReply.java new file mode 100644 index 0000000..33cabda --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryWritePropertyMessageReply.java @@ -0,0 +1,40 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import org.jetlinks.core.message.property.ReadPropertyMessageReply; +import org.jetlinks.core.message.property.WritePropertyMessageReply; + +import java.util.Map; + +/** + * @author zhouhao + * @since 1.0 + */ +public class BinaryWritePropertyMessageReply extends BinaryReplyMessage { + + @Override + public BinaryMessageType getType() { + return BinaryMessageType.readPropertyReply; + } + + @Override + protected WritePropertyMessageReply newMessage() { + return new WritePropertyMessageReply(); + } + + @Override + protected void doReadSuccess(WritePropertyMessageReply msg, ByteBuf buf) { + @SuppressWarnings("all") + Map map = (Map) DataType.OBJECT.read(buf); + msg.setProperties(map); + } + + @Override + protected void doWriteSuccess(WritePropertyMessageReply msg, ByteBuf buf) { + DataType.OBJECT.write(buf, msg.getProperties()); + } + + +} diff --git a/src/main/java/org/jetlinks/protocol/official/binary/DataType.java b/src/main/java/org/jetlinks/protocol/official/binary/DataType.java new file mode 100644 index 0000000..f6d7e8b --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/binary/DataType.java @@ -0,0 +1,273 @@ +package org.jetlinks.protocol.official.binary; + +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.jetlinks.protocol.official.ObjectMappers; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public enum DataType { + //0x00 + NULL { + @Override + public Object read(ByteBuf buf) { + return null; + } + + @Override + public void write(ByteBuf buf, Object value) { + + } + }, + //0x01 + BOOLEAN { + @Override + public Object read(ByteBuf buf) { + return buf.readBoolean(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeBoolean((Boolean) value); + } + }, + //0x02 + INT8 { + @Override + public Object read(ByteBuf buf) { + return buf.readByte(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeByte((Byte) value); + } + }, + //0x03 + INT16 { + @Override + public Object read(ByteBuf buf) { + return buf.readShort(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeShort((Short) value); + } + }, + //0x04 + INT32 { + @Override + public Object read(ByteBuf buf) { + return buf.readInt(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeInt((Integer) value); + } + }, + //0x05 + INT64 { + @Override + public Object read(ByteBuf buf) { + return buf.readLong(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeLong((Long) value); + } + }, + //0x06 + UINT8 { + @Override + public Object read(ByteBuf buf) { + return buf.readUnsignedByte(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeByte((Byte) value); + } + }, + //0x07 + UINT16 { + @Override + public Object read(ByteBuf buf) { + return buf.readUnsignedShort(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeShort((Short) value); + } + }, + //0x08 + UINT32 { + @Override + public Object read(ByteBuf buf) { + return buf.readUnsignedInt(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeInt((Integer) value); + } + }, + //0x09 + FLOAT { + @Override + public Object read(ByteBuf buf) { + return buf.readFloat(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeFloat((Float) value); + } + }, + //0x0A + DOUBLE { + @Override + public Object read(ByteBuf buf) { + return buf.readDouble(); + } + + @Override + public void write(ByteBuf buf, Object value) { + buf.writeDouble((Double) value); + } + }, + //0x0B + STRING { + @Override + public Object read(ByteBuf buf) { + int len = buf.readInt(); + byte[] bytes = new byte[len]; + buf.readBytes(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + @Override + public void write(ByteBuf buf, Object value) { + + String str = (String) value; + buf.writeInt(str.length()); + buf.writeBytes(str.getBytes()); + } + }, + //0x0C + BINARY { + @Override + public Object read(ByteBuf buf) { + int len = buf.readUnsignedShort(); + byte[] bytes = new byte[len]; + buf.readBytes(bytes); + return bytes; + } + + @Override + public void write(ByteBuf buf, Object value) { + byte[] bytes = (byte[]) value; + buf.writeShort(bytes.length); + buf.writeBytes(bytes); + } + }, + //0x0D + ARRAY { + @Override + public Object read(ByteBuf buf) { + int len = buf.readUnsignedShort(); + List array = new ArrayList<>(len); + for (int i = 0; i < len; i++) { + array.add(readFrom(buf)); + } + return array; + } + + @Override + public void write(ByteBuf buf, Object value) { + Collection array = (Collection) value; + buf.writeShort(array.size()); + for (Object o : array) { + writeTo(o, buf); + } + } + }, + //0x0E + OBJECT { + @Override + public Object read(ByteBuf buf) { + int len = buf.readUnsignedShort(); + Map data = Maps.newLinkedHashMapWithExpectedSize(len); + for (int i = 0; i < len; i++) { + data.put((String) STRING.read(buf), readFrom(buf)); + } + return data; + } + + @Override + public void write(ByteBuf buf, Object value) { + Map data = value instanceof Map ? ((Map) value) : ObjectMappers.JSON_MAPPER.convertValue(value, Map.class); + buf.writeShort(data.size()); + + for (Map.Entry entry : data.entrySet()) { + STRING.write(buf, entry.getKey()); + writeTo(entry.getValue(), buf); + } + } + }; + + private final static DataType[] VALUES = values(); + + public abstract Object read(ByteBuf buf); + + public abstract void write(ByteBuf buf, Object value); + + public static Object readFrom(ByteBuf buf) { + return VALUES[buf.readUnsignedByte()].read(buf); + } + + public static void writeTo(Object data, ByteBuf buf) { + DataType type = loopUpType(data); + buf.writeByte(type.ordinal()); + type.write(buf, data); + } + + private static DataType loopUpType(Object data) { + if (data == null) { + return NULL; + } else if (data instanceof Boolean) { + return BOOLEAN; + } else if (data instanceof Byte) { + return INT8; + } else if (data instanceof Short) { + return INT16; + } else if (data instanceof Integer) { + return INT32; + } else if (data instanceof Long) { + return INT64; + } else if (data instanceof Float) { + return FLOAT; + } else if (data instanceof Double) { + return DOUBLE; + } else if (data instanceof String) { + return STRING; + } else if (data instanceof byte[]) { + return BINARY; + } else if (data instanceof Collection) { + return ARRAY; + } else if (data instanceof Map) { + return OBJECT; + } else { + throw new IllegalArgumentException("Unsupported data type: " + data.getClass()); + } + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java new file mode 100644 index 0000000..11cb9af --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java @@ -0,0 +1,101 @@ +package org.jetlinks.protocol.official.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; +import lombok.SneakyThrows; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.property.ReadPropertyMessage; +import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage; +import org.jetlinks.protocol.official.binary.BinaryMessageType; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; + +public class TcpDevice { + @SneakyThrows + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + int start = args.length > 0 ? Integer.parseInt(args[0]) : 1; + int count = args.length > 1 ? Integer.parseInt(args[1]) : 8000; + String[] hosts = args.length > 2 ? args[2].split(",") : new String[]{"0.0.0.0"}; + + Flux.range(start, count) + .flatMap(i -> Mono + .create(sink -> { + NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true); + conf.setLocalAddress(hosts[i % hosts.length]); + vertx + .createNetClient(conf) + .connect(8802, "localhost") + .onFailure(err -> { + System.out.println(err.getMessage()); + sink.success(); + }) + .onSuccess(socket -> { + socket + .closeHandler((s) -> { + System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed"); + sink.success(); + }) + .exceptionHandler(er -> { + System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage()); + sink.success(); + }) + .handler(buffer -> { + sink.success("tcp-off-" + i + ":" + socket.localAddress()); + + ByteBuf buf = buffer.getByteBuf(); + buf.readInt(); + BinaryMessageType + .read(buf, + null, + (downstream, seq) -> { + handleDownStream(downstream, seq, socket); + return null; + }); + + }); + + DeviceOnlineMessage message = new DeviceOnlineMessage(); + message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test"); + message.setDeviceId("tcp-off-" + i); + + socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer())))); + + }); + }) + ) + .count() + .subscribe(System.out::println); + + System.in.read(); + } + + protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) { + System.out.println(downstream); + + if (downstream instanceof ReadPropertyMessage) { + socket.write( + Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write( + ((ReadPropertyMessage) downstream) + .newReply() + .success(Collections.singletonMap( + ((ReadPropertyMessage) downstream) + .getProperties() + .get(0), + ThreadLocalRandom + .current() + .nextFloat() * 100 + )) + , seq, Unpooled.buffer()))) + ); + } + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java new file mode 100644 index 0000000..1b3a342 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDeviceMessageCodec.java @@ -0,0 +1,117 @@ +package org.jetlinks.protocol.official.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import lombok.NonNull; +import org.jetlinks.core.message.AcknowledgeDeviceMessage; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.types.PasswordType; +import org.jetlinks.protocol.official.binary.AckCode; +import org.jetlinks.protocol.official.binary.BinaryAcknowledgeDeviceMessage; +import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage; +import org.jetlinks.protocol.official.binary.BinaryMessageType; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +public class TcpDeviceMessageCodec implements DeviceMessageCodec { + + public static final String CONFIG_KEY_SECURE_KEY = "secureKey"; + + public static final DefaultConfigMetadata tcpConfig = new DefaultConfigMetadata( + "TCP认证配置" + , "") + .add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType()); + + + @Override + public Transport getSupportTransport() { + return DefaultTransport.TCP; + } + + @NonNull + @Override + public Publisher decode(@NonNull MessageDecodeContext context) { + + ByteBuf payload = context.getMessage().getPayload(); + //read index + payload.readInt(); + + //处理tcp连接后的首次消息 + if (context.getDevice() == null) { + return handleLogin(payload, context); + } + return Mono.justOrEmpty(BinaryMessageType.read(payload, context.getDevice().getDeviceId())); + } + + private Mono handleLogin(ByteBuf payload, MessageDecodeContext context) { + DeviceMessage message = BinaryMessageType.read(payload); + if (message instanceof DeviceOnlineMessage) { + String token = message + .getHeader(BinaryDeviceOnlineMessage.loginToken) + .orElse(null); + + String deviceId = message.getDeviceId(); + return context + .getDevice(deviceId) + .flatMap(device -> device + .getConfig(CONFIG_KEY_SECURE_KEY) + .flatMap(config -> { + if (Objects.equals(config.asString(), token)) { + return ack(message, AckCode.ok, context) + .thenReturn(message); + } + return Mono.empty(); + })) + .switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context))); + + } else { + return ack(message, AckCode.noAuth, context); + } + } + + public static ByteBuf wrapByteByf(ByteBuf payload) { + + return Unpooled.wrappedBuffer( + Unpooled.buffer().writeInt(payload.writerIndex()), + payload); + } + + private Mono ack(DeviceMessage source, AckCode code, MessageDecodeContext context) { + AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage(); + message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name()); + message.setDeviceId(source.getDeviceId()); + message.setMessageId(source.getMessageId()); + message.setCode(code.name()); + message.setSuccess(code == AckCode.ok); + return ((FromDeviceMessageContext) context) + .getSession() + .send(EncodedMessage.simple( + wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer())) + )) + .then(Mono.fromRunnable(() -> { + if (source instanceof DeviceOnlineMessage && code != AckCode.ok) { + ((FromDeviceMessageContext) context).getSession().close(); + } + })); + } + + @NonNull + @Override + public Publisher encode(@NonNull MessageEncodeContext context) { + DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage()); + + return Mono.just(EncodedMessage.simple( + wrapByteByf( + BinaryMessageType.write(deviceMessage, Unpooled.buffer()) + ) + )); + } + + +} diff --git a/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java b/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java new file mode 100644 index 0000000..95d0917 --- /dev/null +++ b/src/test/java/org/jetlinks/protocol/official/binary/BinaryMessageTypeTest.java @@ -0,0 +1,87 @@ +package org.jetlinks.protocol.official.binary; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessageReply; +import org.jetlinks.core.message.property.*; +import org.junit.Assert; +import org.junit.Test; +import reactor.test.StepVerifier; + +import java.util.Collections; + +public class BinaryMessageTypeTest { + + + @Test + public void testReport() { + ReportPropertyMessage message = new ReportPropertyMessage(); + message.setDeviceId("test"); + message.setMessageId("test123"); + message.setProperties(Collections.singletonMap("temp", 32.88)); + + doTest(message); + } + + @Test + public void testRead() { + ReadPropertyMessage message = new ReadPropertyMessage(); + message.setDeviceId("test"); + message.setMessageId("test123"); + message.setProperties(Collections.singletonList("temp")); + doTest(message); + + ReadPropertyMessageReply reply = new ReadPropertyMessageReply(); + reply.setDeviceId("test"); + reply.setMessageId("test123"); + reply.setProperties(Collections.singletonMap("temp", 32.88)); + doTest(reply); + + } + + @Test + public void testWrite() { + WritePropertyMessage message = new WritePropertyMessage(); + message.setDeviceId("test"); + message.setMessageId("test123"); + message.setProperties(Collections.singletonMap("temp", 32.88)); + doTest(message); + + WritePropertyMessageReply reply = new WritePropertyMessageReply(); + reply.setDeviceId("test"); + reply.setMessageId("test123"); + reply.setProperties(Collections.singletonMap("temp", 32.88)); + doTest(reply); + + } + + @Test + public void testFunction() { + FunctionInvokeMessage message = new FunctionInvokeMessage(); + message.setFunctionId("123"); + message.setDeviceId("test"); + message.setMessageId("test123"); + message.addInput("test",1); + doTest(message); + + FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply(); + reply.setDeviceId("test"); + reply.setMessageId("test123"); + reply.setOutput(123); + doTest(reply); + + } + + public void doTest(DeviceMessage message) { + ByteBuf data = Unpooled.buffer(); + + BinaryMessageType.write(message, data); + + DeviceMessage read = BinaryMessageType.read(data); + System.out.println(read); + Assert.assertEquals(read.toString(), message.toString()); + } + +} \ No newline at end of file