增加上下线消息
This commit is contained in:
parent
08d90485be
commit
c160f00fea
|
@ -5,8 +5,10 @@ import io.netty.buffer.Unpooled;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jetlinks.core.device.DeviceConfigKey;
|
import org.jetlinks.core.device.DeviceConfigKey;
|
||||||
import org.jetlinks.core.message.DeviceMessage;
|
import org.jetlinks.core.message.DeviceMessage;
|
||||||
|
import org.jetlinks.core.message.DisconnectDeviceMessage;
|
||||||
import org.jetlinks.core.message.Message;
|
import org.jetlinks.core.message.Message;
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
|
import org.jetlinks.core.server.session.DeviceSession;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
@ -69,6 +71,13 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
|
||||||
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
|
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
|
||||||
return Mono.defer(() -> {
|
return Mono.defer(() -> {
|
||||||
Message message = context.getMessage();
|
Message message = context.getMessage();
|
||||||
|
|
||||||
|
if (message instanceof DisconnectDeviceMessage) {
|
||||||
|
return ((ToDeviceMessageContext) context)
|
||||||
|
.disconnect()
|
||||||
|
.then(Mono.empty());
|
||||||
|
}
|
||||||
|
|
||||||
if (message instanceof DeviceMessage) {
|
if (message instanceof DeviceMessage) {
|
||||||
DeviceMessage deviceMessage = ((DeviceMessage) message);
|
DeviceMessage deviceMessage = ((DeviceMessage) message);
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,12 @@ public enum TopicMessageCodec {
|
||||||
return Mono.just(message);
|
return Mono.just(message);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
//断开连接消息
|
||||||
|
disconnect("/*/disconnect", DisconnectDeviceMessage.class),
|
||||||
|
//上线
|
||||||
|
connect("/*/online", DeviceOnlineMessage.class),
|
||||||
|
//离线
|
||||||
|
offline("/*/offline", DeviceOfflineMessage.class),
|
||||||
;
|
;
|
||||||
|
|
||||||
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {
|
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {
|
||||||
|
|
Loading…
Reference in New Issue