增加child-reply

This commit is contained in:
zhou-hao 2021-04-25 17:13:55 +08:00
parent 73dd691413
commit 4d194d2cc2
2 changed files with 42 additions and 1 deletions

View File

@ -158,7 +158,7 @@
<dependency> <dependency>
<groupId>org.jetlinks</groupId> <groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId> <artifactId>jetlinks-supports</artifactId>
<version>1.1.5</version> <version>1.1.6-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -9,6 +9,8 @@ import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*; import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.utils.TopicUtils; import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -70,6 +72,42 @@ public enum TopicMessageCodec {
payload.setTopic(String.join("/", topic)); payload.setTopic(String.join("/", topic));
return payload; return payload;
}
}, //子设备消息回复
childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessageReply msg = new ChildDeviceMessageReply();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
} }
}, },
//更新标签 //更新标签
@ -110,6 +148,9 @@ public enum TopicMessageCodec {
offline("/*/offline", DeviceOfflineMessage.class), offline("/*/offline", DeviceOfflineMessage.class),
//日志 //日志
log("/*/log", DeviceLogMessage.class), log("/*/log", DeviceLogMessage.class),
//状态检查
stateCheck("/*/state-check", DeviceStateCheckMessage.class),
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
; ;
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) { TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {