From c160f00fea1e2975873d99feaef0c1dfaed2ceb0 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Fri, 5 Mar 2021 18:37:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=8A=E4=B8=8B=E7=BA=BF?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../official/JetLinksMqttDeviceMessageCodec.java | 9 +++++++++ .../jetlinks/protocol/official/TopicMessageCodec.java | 7 ++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java index 7be7abe..7441d52 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java @@ -5,8 +5,10 @@ import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DisconnectDeviceMessage; import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.server.session.DeviceSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -69,6 +71,13 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { public Mono encode(@Nonnull MessageEncodeContext context) { return Mono.defer(() -> { Message message = context.getMessage(); + + if (message instanceof DisconnectDeviceMessage) { + return ((ToDeviceMessageContext) context) + .disconnect() + .then(Mono.empty()); + } + if (message instanceof DeviceMessage) { DeviceMessage deviceMessage = ((DeviceMessage) message); diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index d3a9f6a..09ca53f 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -100,7 +100,12 @@ public enum TopicMessageCodec { return Mono.just(message); } }, - + //断开连接消息 + disconnect("/*/disconnect", DisconnectDeviceMessage.class), + //上线 + connect("/*/online", DeviceOnlineMessage.class), + //离线 + offline("/*/offline", DeviceOfflineMessage.class), ; TopicMessageCodec(String topic, Class type) {