增加基本的tcp实现

This commit is contained in:
zhouhao 2022-08-08 18:34:15 +08:00
parent c93b5d11a6
commit b0c94bed6b
21 changed files with 1304 additions and 4 deletions

1
.gitignore vendored
View File

@ -25,3 +25,4 @@ hs_err_pid*
/upload
/ui/upload/
!/package/**
dependency-reduced-pom.xml

52
pom.xml
View File

@ -122,6 +122,52 @@
</snapshotRepository>
</distributionManagement>
</profile>
<profile>
<id>all-in-one</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置主类 -->
<mainClass>org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec</mainClass>
</transformer>
</transformers>
<!-- <artifactSet>-->
<!-- <includes>-->
<!-- &lt;!&ndash; 添加需要打包在一起的第三方依赖信息 &ndash;&gt;-->
<!-- &lt;!&ndash; <include>com.domain:*</include> &ndash;&gt;-->
<!-- </includes>-->
<!-- </artifactSet>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
@ -201,6 +247,12 @@
<version>1.2.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.3.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<dependencyManagement>

View File

@ -9,6 +9,7 @@ import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
@ -80,16 +81,21 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
//TCP
support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig);
support.addMessageCodecSupport(new TcpDeviceMessageCodec());
//MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());
//HTTP
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//CoAP
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
return Mono.just(support);

View File

@ -315,7 +315,8 @@ public enum TopicMessageCodec {
joiner.add(topic);
}
return MqttRoute
.builder(joiner.toString());
.builder(joiner.toString())
.qos(1);
}
public MqttRoute getRoute() {

View File

@ -0,0 +1,7 @@
package org.jetlinks.protocol.official.binary;
public enum AckCode {
ok,
noAuth,
unsupportedMessage
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
public class BinaryAcknowledgeDeviceMessage implements BinaryMessage<AcknowledgeDeviceMessage> {
public static final HeaderKey<String> codeHeader = HeaderKey.of("code", AckCode.ok.name());
private AcknowledgeDeviceMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.ack;
}
@Override
public void read(ByteBuf buf) {
message = new AcknowledgeDeviceMessage();
AckCode code = AckCode.values()[buf.readUnsignedByte()];
message.addHeader(codeHeader, code.name());
}
@Override
public void write(ByteBuf buf) {
AckCode code = AckCode.valueOf(this.message.getHeaderOrDefault(codeHeader));
buf.writeByte(code.ordinal());
}
@Override
public void setMessage(AcknowledgeDeviceMessage message) {
this.message = message;
}
@Override
public AcknowledgeDeviceMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,45 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
/**
*
*/
public class BinaryDeviceOnlineMessage implements BinaryMessage<DeviceOnlineMessage> {
public static final HeaderKey<String> loginToken = HeaderKey.of("token", null);
private DeviceOnlineMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.online;
}
@Override
public void read(ByteBuf buf) {
message = new DeviceOnlineMessage();
message.addHeader(loginToken, (String) DataType.STRING.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING
.write(
buf, message.getHeader(loginToken).orElse("")
);
}
@Override
public void setMessage(DeviceOnlineMessage message) {
this.message = message;
}
@Override
public DeviceOnlineMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import java.util.Map;
import java.util.stream.Collectors;
public class BinaryFunctionInvokeMessage implements BinaryMessage<FunctionInvokeMessage> {
private FunctionInvokeMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.function;
}
@Override
public void read(ByteBuf buf) {
message = new FunctionInvokeMessage();
message.setFunctionId((String) DataType.STRING.read(buf));
@SuppressWarnings("all")
Map<String, Object> params = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setInputs(
params
.entrySet()
.stream()
.map(e -> new FunctionParameter(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getFunctionId());
DataType.OBJECT.write(buf,message.inputsToMap());
}
@Override
public void setMessage(FunctionInvokeMessage message) {
this.message = message;
}
@Override
public FunctionInvokeMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryFunctionInvokeMessageReply extends BinaryReplyMessage<FunctionInvokeMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.functionReply;
}
@Override
protected FunctionInvokeMessageReply newMessage() {
return new FunctionInvokeMessageReply();
}
@Override
protected void doReadSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
msg.setFunctionId((String) DataType.readFrom(buf));
msg.setOutput(DataType.readFrom(buf));
}
@Override
protected void doWriteSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
DataType.writeTo(getMessage().getFunctionId(), buf);
DataType.writeTo(msg.getOutput(), buf);
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessage;
import reactor.core.publisher.Flux;
public interface BinaryMessage<T extends DeviceMessage> {
BinaryMessageType getType();
void read(ByteBuf buf);
void write(ByteBuf buf);
void setMessage(T message);
T getMessage();
}

View File

@ -0,0 +1,181 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import io.netty.buffer.ByteBuf;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public enum BinaryMessageType {
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
writePropertyReply(WritePropertyMessageReply.class, BinaryWritePropertyMessageReply::new),
function(FunctionInvokeMessage.class, BinaryFunctionInvokeMessage::new),
functionReply(FunctionInvokeMessageReply.class, BinaryFunctionInvokeMessageReply::new);
private final Class<? extends DeviceMessage> forDevice;
private final Supplier<BinaryMessage<DeviceMessage>> forTcp;
private static final BinaryMessageType[] VALUES = values();
@SuppressWarnings("all")
BinaryMessageType(Class<? extends DeviceMessage> forDevice,
Supplier<? extends BinaryMessage<?>> forTcp) {
this.forDevice = forDevice;
this.forTcp = (Supplier) forTcp;
}
private static final Map<String, MsgIdHolder> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.<String, MsgIdHolder>build()
.asMap();
private static class MsgIdHolder {
private int msgId = 0;
private final BiMap<Integer, String> cached = HashBiMap.create();
public synchronized int next(String id) {
if (id == null) {
return -1;
}
if (msgId++ < 0) {
msgId = 0;
}
cached.put(msgId, id);
return msgId;
}
public String getAndRemove(int id) {
if (id < 0) {
return null;
}
return cached.remove(id);
}
}
@SneakyThrows
private static MsgIdHolder takeHolder(String deviceId) {
return cache.computeIfAbsent(deviceId, (ignore) -> new MsgIdHolder());
}
public static ByteBuf write(DeviceMessage message, ByteBuf data) {
int msgId = takeHolder(message.getDeviceId()).next(message.getMessageId());
return write(message, msgId, data);
}
public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) {
BinaryMessageType type = lookup(message);
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 0-4字节 时间戳
data.writeLong(message.getTimestamp());
// 5-6字节 消息序号
data.writeShort(msgId);
// 7... 字节 设备ID
DataType.writeTo(message.getDeviceId(), data);
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
tcp.setMessage(message);
//写出数据到ByteBuf
tcp.write(data);
return data;
}
public static DeviceMessage read(ByteBuf data) {
return read(data, null);
}
public static <T> T read(ByteBuf data,
String deviceIdMaybe,
BiFunction<DeviceMessage, Integer, T> handler) {
//第0个字节是消息类型
BinaryMessageType type = VALUES[data.readByte()];
if (type.forTcp == null) {
return null;
}
// 1-4字节 时间戳
long timestamp = data.readLong();
// 5-6字节 消息序号
int msgId = data.readUnsignedShort();
// 7... 字节 设备ID
String deviceId = (String) DataType.readFrom(data);
if (deviceId == null) {
deviceId = deviceIdMaybe;
}
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
//从ByteBuf读取
tcp.read(data);
DeviceMessage message = tcp.getMessage();
message.thingId(DeviceThingType.device, deviceId);
message.timestamp(timestamp);
return handler.apply(message, msgId);
}
public static DeviceMessage read(ByteBuf data, String deviceIdMaybe) {
return read(data, deviceIdMaybe, (message, msgId) -> {
String messageId = null;
if (message.getDeviceId() != null) {
//获取实际平台下发的消息ID
MsgIdHolder holder = cache.get(message.getDeviceId());
if (holder != null) {
messageId = holder.getAndRemove(msgId);
}
}
if (messageId == null) {
messageId = String.valueOf(msgId);
}
message.messageId(messageId);
return message;
});
}
public static BinaryMessageType lookup(DeviceMessage message) {
for (BinaryMessageType value : VALUES) {
if (value.forDevice != null && value.forDevice.isInstance(message)) {
return value;
}
}
throw new UnsupportedOperationException("unsupported device message " + message.getMessageType());
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.List;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReadPropertyMessage implements BinaryMessage<ReadPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readProperty;
}
private ReadPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReadPropertyMessage();
@SuppressWarnings("all")
List<String> list = (List<String>) DataType.ARRAY.read(buf);
message.setProperties(list);
}
@Override
public void write(ByteBuf buf) {
DataType.ARRAY.write(buf, message.getProperties());
}
@Override
public void setMessage(ReadPropertyMessage message) {
this.message = message;
}
@Override
public ReadPropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,41 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryReadPropertyMessageReply extends BinaryReplyMessage<ReadPropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected ReadPropertyMessageReply newMessage() {
return new ReadPropertyMessageReply();
}
@Override
protected void doWriteSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
@Override
protected void doReadSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
}

View File

@ -0,0 +1,52 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessageReply;
import java.util.Map;
public abstract class BinaryReplyMessage<T extends DeviceMessageReply> implements BinaryMessage<T> {
private T message;
protected abstract T newMessage();
@Override
public final void read(ByteBuf buf) {
message = newMessage();
boolean success = buf.readBoolean();
if (success) {
doReadSuccess(message, buf);
} else {
message.success(false);
message.code(String.valueOf(DataType.readFrom(buf)));
message.message(String.valueOf(DataType.readFrom(buf)));
}
}
protected abstract void doReadSuccess(T msg, ByteBuf buf);
protected abstract void doWriteSuccess(T msg, ByteBuf buf);
@Override
public final void write(ByteBuf buf) {
buf.writeBoolean(message.isSuccess());
if (message.isSuccess()) {
doWriteSuccess(message, buf);
} else {
DataType.writeTo(message.getCode(), buf);
DataType.writeTo(message.getMessage(), buf);
}
}
@Override
public void setMessage(T message) {
this.message = message;
}
@Override
public T getMessage() {
return message;
}
}

View File

@ -0,0 +1,48 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReportPropertyMessage implements BinaryMessage<ReportPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.reportProperty;
}
private ReportPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReportPropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(ReportPropertyMessage message) {
this.message = message;
}
@Override
public ReportPropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,49 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryWritePropertyMessage implements BinaryMessage<WritePropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.writeProperty;
}
private WritePropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new WritePropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(WritePropertyMessage message) {
this.message = message;
}
@Override
public WritePropertyMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryWritePropertyMessageReply extends BinaryReplyMessage<WritePropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected WritePropertyMessageReply newMessage() {
return new WritePropertyMessageReply();
}
@Override
protected void doReadSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
@Override
protected void doWriteSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
}

View File

@ -0,0 +1,273 @@
package org.jetlinks.protocol.official.binary;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.protocol.official.ObjectMappers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public enum DataType {
//0x00
NULL {
@Override
public Object read(ByteBuf buf) {
return null;
}
@Override
public void write(ByteBuf buf, Object value) {
}
},
//0x01
BOOLEAN {
@Override
public Object read(ByteBuf buf) {
return buf.readBoolean();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeBoolean((Boolean) value);
}
},
//0x02
INT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x03
INT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x04
INT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x05
INT64 {
@Override
public Object read(ByteBuf buf) {
return buf.readLong();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeLong((Long) value);
}
},
//0x06
UINT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x07
UINT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x08
UINT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x09
FLOAT {
@Override
public Object read(ByteBuf buf) {
return buf.readFloat();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeFloat((Float) value);
}
},
//0x0A
DOUBLE {
@Override
public Object read(ByteBuf buf) {
return buf.readDouble();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeDouble((Double) value);
}
},
//0x0B
STRING {
@Override
public Object read(ByteBuf buf) {
int len = buf.readInt();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
@Override
public void write(ByteBuf buf, Object value) {
String str = (String) value;
buf.writeInt(str.length());
buf.writeBytes(str.getBytes());
}
},
//0x0C
BINARY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return bytes;
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = (byte[]) value;
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0D
ARRAY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
List<Object> array = new ArrayList<>(len);
for (int i = 0; i < len; i++) {
array.add(readFrom(buf));
}
return array;
}
@Override
public void write(ByteBuf buf, Object value) {
Collection<Object> array = (Collection<Object>) value;
buf.writeShort(array.size());
for (Object o : array) {
writeTo(o, buf);
}
}
},
//0x0E
OBJECT {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
Map<String, Object> data = Maps.newLinkedHashMapWithExpectedSize(len);
for (int i = 0; i < len; i++) {
data.put((String) STRING.read(buf), readFrom(buf));
}
return data;
}
@Override
public void write(ByteBuf buf, Object value) {
Map<String, Object> data = value instanceof Map ? ((Map) value) : ObjectMappers.JSON_MAPPER.convertValue(value, Map.class);
buf.writeShort(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
STRING.write(buf, entry.getKey());
writeTo(entry.getValue(), buf);
}
}
};
private final static DataType[] VALUES = values();
public abstract Object read(ByteBuf buf);
public abstract void write(ByteBuf buf, Object value);
public static Object readFrom(ByteBuf buf) {
return VALUES[buf.readUnsignedByte()].read(buf);
}
public static void writeTo(Object data, ByteBuf buf) {
DataType type = loopUpType(data);
buf.writeByte(type.ordinal());
type.write(buf, data);
}
private static DataType loopUpType(Object data) {
if (data == null) {
return NULL;
} else if (data instanceof Boolean) {
return BOOLEAN;
} else if (data instanceof Byte) {
return INT8;
} else if (data instanceof Short) {
return INT16;
} else if (data instanceof Integer) {
return INT32;
} else if (data instanceof Long) {
return INT64;
} else if (data instanceof Float) {
return FLOAT;
} else if (data instanceof Double) {
return DOUBLE;
} else if (data instanceof String) {
return STRING;
} else if (data instanceof byte[]) {
return BINARY;
} else if (data instanceof Collection) {
return ARRAY;
} else if (data instanceof Map) {
return OBJECT;
} else {
throw new IllegalArgumentException("Unsupported data type: " + data.getClass());
}
}
}

View File

@ -0,0 +1,101 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import lombok.SneakyThrows;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
public class TcpDevice {
@SneakyThrows
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
int start = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int count = args.length > 1 ? Integer.parseInt(args[1]) : 8000;
String[] hosts = args.length > 2 ? args[2].split(",") : new String[]{"0.0.0.0"};
Flux.range(start, count)
.flatMap(i -> Mono
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(buffer -> {
sink.success("tcp-off-" + i + ":" + socket.localAddress());
ByteBuf buf = buffer.getByteBuf();
buf.readInt();
BinaryMessageType
.read(buf,
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
});
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
});
})
)
.count()
.subscribe(System.out::println);
System.in.read();
}
protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) {
System.out.println(downstream);
if (downstream instanceof ReadPropertyMessage) {
socket.write(
Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(
((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
((ReadPropertyMessage) downstream)
.getProperties()
.get(0),
ThreadLocalRandom
.current()
.nextFloat() * 100
))
, seq, Unpooled.buffer())))
);
}
}
}

View File

@ -0,0 +1,117 @@
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.AckCode;
import org.jetlinks.protocol.official.binary.BinaryAcknowledgeDeviceMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class TcpDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata tcpConfig = new DefaultConfigMetadata(
"TCP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.TCP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//read index
payload.readInt();
//处理tcp连接后的首次消息
if (context.getDevice() == null) {
return handleLogin(payload, context);
}
return Mono.justOrEmpty(BinaryMessageType.read(payload, context.getDevice().getDeviceId()));
}
private Mono<DeviceMessage> handleLogin(ByteBuf payload, MessageDecodeContext context) {
DeviceMessage message = BinaryMessageType.read(payload);
if (message instanceof DeviceOnlineMessage) {
String token = message
.getHeader(BinaryDeviceOnlineMessage.loginToken)
.orElse(null);
String deviceId = message.getDeviceId();
return context
.getDevice(deviceId)
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
} else {
return ack(message, AckCode.noAuth, context);
}
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return Unpooled.wrappedBuffer(
Unpooled.buffer().writeInt(payload.writerIndex()),
payload);
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
return ((FromDeviceMessageContext) context)
.getSession()
.send(EncodedMessage.simple(
wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))
))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
return Mono.just(EncodedMessage.simple(
wrapByteByf(
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
)
));
}
}

View File

@ -0,0 +1,87 @@
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.junit.Assert;
import org.junit.Test;
import reactor.test.StepVerifier;
import java.util.Collections;
public class BinaryMessageTypeTest {
@Test
public void testReport() {
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
}
@Test
public void testRead() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonList("temp"));
doTest(message);
ReadPropertyMessageReply reply = new ReadPropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testWrite() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
WritePropertyMessageReply reply = new WritePropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setFunctionId("123");
message.setDeviceId("test");
message.setMessageId("test123");
message.addInput("test",1);
doTest(message);
FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setOutput(123);
doTest(reply);
}
public void doTest(DeviceMessage message) {
ByteBuf data = Unpooled.buffer();
BinaryMessageType.write(message, data);
DeviceMessage read = BinaryMessageType.read(data);
System.out.println(read);
Assert.assertEquals(read.toString(), message.toString());
}
}