diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java index 7be7abe..7441d52 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java @@ -5,8 +5,10 @@ import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DisconnectDeviceMessage; import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.server.session.DeviceSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -69,6 +71,13 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { public Mono encode(@Nonnull MessageEncodeContext context) { return Mono.defer(() -> { Message message = context.getMessage(); + + if (message instanceof DisconnectDeviceMessage) { + return ((ToDeviceMessageContext) context) + .disconnect() + .then(Mono.empty()); + } + if (message instanceof DeviceMessage) { DeviceMessage deviceMessage = ((DeviceMessage) message); diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index d3a9f6a..09ca53f 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -100,7 +100,12 @@ public enum TopicMessageCodec { return Mono.just(message); } }, - + //断开连接消息 + disconnect("/*/disconnect", DisconnectDeviceMessage.class), + //上线 + connect("/*/online", DeviceOnlineMessage.class), + //离线 + offline("/*/offline", DeviceOfflineMessage.class), ; TopicMessageCodec(String topic, Class type) {