From 3cbbaa6643740b0baea6b8fe31d858f2985db78c Mon Sep 17 00:00:00 2001 From: zhouhao Date: Thu, 11 Aug 2022 18:07:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../official/binary/BinaryMessageType.java | 14 +- .../protocol/official/tcp/TcpDevice.java | 124 +++++++++++------- 2 files changed, 87 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java index 1dead1e..031346f 100644 --- a/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java +++ b/src/main/java/org/jetlinks/protocol/official/binary/BinaryMessageType.java @@ -20,6 +20,7 @@ import java.util.function.BiFunction; import java.util.function.Supplier; public enum BinaryMessageType { + keepalive(null, null), online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new), @@ -63,10 +64,10 @@ public enum BinaryMessageType { private final Map cached = CacheBuilder .newBuilder() .expireAfterWrite(Duration.ofSeconds(30)) - . build() + .build() .asMap(); - public synchronized int next(String id) { + public int next(String id) { if (id == null) { return -1; } @@ -98,6 +99,15 @@ public enum BinaryMessageType { return write(message, msgId, data); } + public static ByteBuf write(BinaryMessageType type, ByteBuf data) { + // 第0个字节是消息类型 + data.writeByte(type.ordinal()); + // 0-4字节 时间戳 + data.writeLong(System.currentTimeMillis()); + + return data; + } + public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) { BinaryMessageType type = lookup(message); // 第0个字节是消息类型 diff --git a/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java index 11cb9af..7487bf1 100644 --- a/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java +++ b/src/main/java/org/jetlinks/protocol/official/tcp/TcpDevice.java @@ -6,10 +6,13 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.NetSocket; +import io.vertx.core.parsetools.RecordParser; import lombok.SneakyThrows; +import org.jetlinks.core.message.AcknowledgeDeviceMessage; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceOnlineMessage; import org.jetlinks.core.message.property.ReadPropertyMessage; +import org.jetlinks.core.message.property.WritePropertyMessage; import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage; import org.jetlinks.protocol.official.binary.BinaryMessageType; import reactor.core.publisher.Flux; @@ -17,6 +20,8 @@ import reactor.core.publisher.Mono; import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class TcpDevice { @SneakyThrows @@ -28,49 +33,59 @@ public class TcpDevice { Flux.range(start, count) .flatMap(i -> Mono - .create(sink -> { - NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true); - conf.setLocalAddress(hosts[i % hosts.length]); - vertx - .createNetClient(conf) - .connect(8802, "localhost") - .onFailure(err -> { - System.out.println(err.getMessage()); - sink.success(); - }) - .onSuccess(socket -> { - socket - .closeHandler((s) -> { - System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed"); - sink.success(); - }) - .exceptionHandler(er -> { - System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage()); - sink.success(); - }) - .handler(buffer -> { - sink.success("tcp-off-" + i + ":" + socket.localAddress()); + .create(sink -> { + NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true); + conf.setLocalAddress(hosts[i % hosts.length]); + vertx + .createNetClient(conf) + .connect(8802, "localhost") + .onFailure(err -> { + System.out.println(err.getMessage()); + sink.success(); + }) + .onSuccess(socket -> { + RecordParser parser = RecordParser.newFixed(4); + AtomicReference buffer = new AtomicReference<>(); + parser.handler(buf -> { + buffer.accumulateAndGet(buf, (a, b) -> { + if (a == null) { + parser.fixedSizeMode(buf.getInt(0)); + return b; + } + parser.fixedSizeMode(4); - ByteBuf buf = buffer.getByteBuf(); - buf.readInt(); - BinaryMessageType - .read(buf, - null, - (downstream, seq) -> { - handleDownStream(downstream, seq, socket); - return null; - }); + sink.success("tcp-off-" + i + ":" + socket.localAddress()); - }); + BinaryMessageType + .read(b.getByteBuf(), + null, + (downstream, seq) -> { + handleDownStream(downstream, seq, socket); + return null; + }); + return null; + }); + }); + socket + .closeHandler((s) -> { + System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed"); + sink.success(); + }) + .exceptionHandler(er -> { + System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage()); + sink.success(); + }) + .handler(parser); - DeviceOnlineMessage message = new DeviceOnlineMessage(); - message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test"); - message.setDeviceId("tcp-off-" + i); + DeviceOnlineMessage message = new DeviceOnlineMessage(); + message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test"); + message.setDeviceId("tcp-off-" + i); - socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer())))); + socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer())))); - }); - }) + }); + }), + 1024 ) .count() .subscribe(System.out::println); @@ -79,21 +94,32 @@ public class TcpDevice { } protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) { - System.out.println(downstream); + if (!(downstream instanceof AcknowledgeDeviceMessage)) { + // System.out.println(downstream); + } + + DeviceMessage reply = null; if (downstream instanceof ReadPropertyMessage) { + reply = ((ReadPropertyMessage) downstream) + .newReply() + .success(Collections.singletonMap( + "temp0", + ThreadLocalRandom + .current() + .nextFloat() * 100 + )); + + } else if (downstream instanceof WritePropertyMessage) { + reply = ((WritePropertyMessage) downstream) + .newReply() + .success(((WritePropertyMessage) downstream).getProperties()); + + } + if (reply != null) { socket.write( Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write( - ((ReadPropertyMessage) downstream) - .newReply() - .success(Collections.singletonMap( - ((ReadPropertyMessage) downstream) - .getProperties() - .get(0), - ThreadLocalRandom - .current() - .nextFloat() * 100 - )) + reply , seq, Unpooled.buffer()))) ); }