This commit is contained in:
zhouhao 2022-08-11 18:07:23 +08:00
parent cf21c84518
commit 3cbbaa6643
2 changed files with 87 additions and 51 deletions

View File

@ -20,6 +20,7 @@ import java.util.function.BiFunction;
import java.util.function.Supplier;
public enum BinaryMessageType {
keepalive(null, null),
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
@ -63,10 +64,10 @@ public enum BinaryMessageType {
private final Map<Integer, String> cached = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.<Integer, String> build()
.<Integer, String>build()
.asMap();
public synchronized int next(String id) {
public int next(String id) {
if (id == null) {
return -1;
}
@ -98,6 +99,15 @@ public enum BinaryMessageType {
return write(message, msgId, data);
}
public static ByteBuf write(BinaryMessageType type, ByteBuf data) {
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 0-4字节 时间戳
data.writeLong(System.currentTimeMillis());
return data;
}
public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) {
BinaryMessageType type = lookup(message);
// 第0个字节是消息类型

View File

@ -6,10 +6,13 @@ import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.SneakyThrows;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import reactor.core.publisher.Flux;
@ -17,6 +20,8 @@ import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TcpDevice {
@SneakyThrows
@ -28,49 +33,59 @@ public class TcpDevice {
Flux.range(start, count)
.flatMap(i -> Mono
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(buffer -> {
sink.success("tcp-off-" + i + ":" + socket.localAddress());
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
RecordParser parser = RecordParser.newFixed(4);
AtomicReference<Buffer> buffer = new AtomicReference<>();
parser.handler(buf -> {
buffer.accumulateAndGet(buf, (a, b) -> {
if (a == null) {
parser.fixedSizeMode(buf.getInt(0));
return b;
}
parser.fixedSizeMode(4);
ByteBuf buf = buffer.getByteBuf();
buf.readInt();
BinaryMessageType
.read(buf,
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
sink.success("tcp-off-" + i + ":" + socket.localAddress());
});
BinaryMessageType
.read(b.getByteBuf(),
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
return null;
});
});
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(parser);
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
});
})
});
}),
1024
)
.count()
.subscribe(System.out::println);
@ -79,21 +94,32 @@ public class TcpDevice {
}
protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) {
System.out.println(downstream);
if (!(downstream instanceof AcknowledgeDeviceMessage)) {
// System.out.println(downstream);
}
DeviceMessage reply = null;
if (downstream instanceof ReadPropertyMessage) {
reply = ((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
"temp0",
ThreadLocalRandom
.current()
.nextFloat() * 100
));
} else if (downstream instanceof WritePropertyMessage) {
reply = ((WritePropertyMessage) downstream)
.newReply()
.success(((WritePropertyMessage) downstream).getProperties());
}
if (reply != null) {
socket.write(
Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(
((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
((ReadPropertyMessage) downstream)
.getProperties()
.get(0),
ThreadLocalRandom
.current()
.nextFloat() * 100
))
reply
, seq, Unpooled.buffer())))
);
}