From 4d194d2cc24cdd2bd28eaa82b0a7539dd930a33f Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Sun, 25 Apr 2021 17:13:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0child-reply?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../protocol/official/TopicMessageCodec.java | 41 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 80f9bb3..f2ae6d3 100644 --- a/pom.xml +++ b/pom.xml @@ -158,7 +158,7 @@ org.jetlinks jetlinks-supports - 1.1.5 + 1.1.6-SNAPSHOT diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index d26d40d..343a4e1 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -9,6 +9,8 @@ import org.jetlinks.core.message.firmware.*; import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.message.property.*; +import org.jetlinks.core.message.state.DeviceStateCheckMessage; +import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; import org.jetlinks.core.utils.TopicUtils; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -70,6 +72,42 @@ public enum TopicMessageCodec { payload.setTopic(String.join("/", topic)); return payload; + } + }, //子设备消息回复 + childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) { + @Override + public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); + _topic[0] = "";// topic以/开头所有第一位是空白 + return TopicMessageCodec + .decode(mapper, _topic, payload) + .map(childMsg -> { + ChildDeviceMessageReply msg = new ChildDeviceMessageReply(); + msg.setDeviceId(topic[1]); + msg.setChildDeviceMessage(childMsg); + msg.setTimestamp(childMsg.getTimestamp()); + msg.setMessageId(childMsg.getMessageId()); + return msg; + }); + } + + @Override + protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) { + ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message); + + DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage()); + + TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage); + String[] childTopic = payload.getTopic().split("/"); + String[] topic = new String[topics.length + childTopic.length - 3]; + //合并topic + System.arraycopy(topics, 0, topic, 0, topics.length - 1); + System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1); + + refactorTopic(topic, message); + payload.setTopic(String.join("/", topic)); + return payload; + } }, //更新标签 @@ -110,6 +148,9 @@ public enum TopicMessageCodec { offline("/*/offline", DeviceOfflineMessage.class), //日志 log("/*/log", DeviceLogMessage.class), + //状态检查 + stateCheck("/*/state-check", DeviceStateCheckMessage.class), + stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class), ; TopicMessageCodec(String topic, Class type) {