优化tcp消息
This commit is contained in:
parent
6d46169cc5
commit
9a81923167
|
@ -4,6 +4,7 @@ import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.collect.BiMap;
|
import com.google.common.collect.BiMap;
|
||||||
import com.google.common.collect.HashBiMap;
|
import com.google.common.collect.HashBiMap;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufUtil;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import org.jetlinks.core.device.DeviceThingType;
|
import org.jetlinks.core.device.DeviceThingType;
|
||||||
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
|
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
|
||||||
|
@ -20,16 +21,21 @@ import java.util.function.BiFunction;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public enum BinaryMessageType {
|
public enum BinaryMessageType {
|
||||||
|
//0x00
|
||||||
keepalive(null, null),
|
keepalive(null, null),
|
||||||
|
|
||||||
|
//0x01
|
||||||
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
|
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
|
||||||
|
|
||||||
|
//0x02
|
||||||
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
|
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
|
||||||
|
//0x03
|
||||||
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
|
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
|
||||||
|
|
||||||
|
//0x04
|
||||||
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
|
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
|
||||||
|
|
||||||
|
//0x05
|
||||||
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
|
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
|
||||||
|
|
||||||
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
|
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
|
||||||
|
@ -112,14 +118,14 @@ public enum BinaryMessageType {
|
||||||
BinaryMessageType type = lookup(message);
|
BinaryMessageType type = lookup(message);
|
||||||
// 第0个字节是消息类型
|
// 第0个字节是消息类型
|
||||||
data.writeByte(type.ordinal());
|
data.writeByte(type.ordinal());
|
||||||
// 0-4字节 时间戳
|
// 第1-8字节 时间戳
|
||||||
data.writeLong(message.getTimestamp());
|
data.writeLong(message.getTimestamp());
|
||||||
|
|
||||||
// 5-6字节 消息序号
|
// 9-11字节 消息序号
|
||||||
data.writeShort(msgId);
|
data.writeShort(msgId);
|
||||||
|
|
||||||
// 7... 字节 设备ID
|
// 12... 字节 设备ID
|
||||||
DataType.writeTo(message.getDeviceId(), data);
|
DataType.STRING.write(data, message.getDeviceId());
|
||||||
|
|
||||||
// 创建消息对象
|
// 创建消息对象
|
||||||
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
|
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
|
||||||
|
@ -148,7 +154,7 @@ public enum BinaryMessageType {
|
||||||
// 5-6字节 消息序号
|
// 5-6字节 消息序号
|
||||||
int msgId = data.readUnsignedShort();
|
int msgId = data.readUnsignedShort();
|
||||||
// 7... 字节 设备ID
|
// 7... 字节 设备ID
|
||||||
String deviceId = (String) DataType.readFrom(data);
|
String deviceId = (String) DataType.STRING.read(data);
|
||||||
if (deviceId == null) {
|
if (deviceId == null) {
|
||||||
deviceId = deviceIdMaybe;
|
deviceId = deviceIdMaybe;
|
||||||
}
|
}
|
||||||
|
@ -177,7 +183,7 @@ public enum BinaryMessageType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (messageId == null) {
|
if (messageId == null && msgId > 0) {
|
||||||
messageId = String.valueOf(msgId);
|
messageId = String.valueOf(msgId);
|
||||||
}
|
}
|
||||||
message.messageId(messageId);
|
message.messageId(messageId);
|
||||||
|
|
|
@ -3,10 +3,7 @@ package org.jetlinks.protocol.official.tcp;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import lombok.NonNull;
|
import lombok.NonNull;
|
||||||
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
|
import org.jetlinks.core.message.*;
|
||||||
import org.jetlinks.core.message.DeviceMessage;
|
|
||||||
import org.jetlinks.core.message.DeviceOnlineMessage;
|
|
||||||
import org.jetlinks.core.message.Message;
|
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
import org.jetlinks.core.metadata.DefaultConfigMetadata;
|
import org.jetlinks.core.metadata.DefaultConfigMetadata;
|
||||||
import org.jetlinks.core.metadata.types.PasswordType;
|
import org.jetlinks.core.metadata.types.PasswordType;
|
||||||
|
@ -105,7 +102,9 @@ public class TcpDeviceMessageCodec implements DeviceMessageCodec {
|
||||||
@Override
|
@Override
|
||||||
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
|
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
|
||||||
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
|
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
|
||||||
|
if(deviceMessage instanceof DisconnectDeviceMessage){
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
return Mono.just(EncodedMessage.simple(
|
return Mono.just(EncodedMessage.simple(
|
||||||
wrapByteByf(
|
wrapByteByf(
|
||||||
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
|
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
|
||||||
|
|
Loading…
Reference in New Issue