From 5e326544f7cd6129b0d9eac585512b8b4f32c2d2 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 28 Jul 2021 15:05:19 +0800 Subject: [PATCH] =?UTF-8?q?event=E6=A0=87=E8=AF=86=E4=BB=A5topic=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E4=B8=BA=E5=87=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/official/TopicMessageCodec.java | 19 ++++++++++++++++++- .../official/TopicMessageCodecTest.java | 19 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index e56ef1f..21a90a2 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -24,7 +24,24 @@ public enum TopicMessageCodec { //上报属性数据 reportProperty("/*/properties/report", ReportPropertyMessage.class), //事件上报 - event("/*/event/*", EventMessage.class), + event("/*/event/*", EventMessage.class) { + @Override + Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { + String event = topic[topic.length - 1]; + + return Mono.from(super.doDecode(mapper, topic, payload)) + .cast(EventMessage.class) + .doOnNext(e -> e.setEvent(event)) + .cast(DeviceMessage.class); + } + + @Override + void refactorTopic(String[] topics, DeviceMessage message) { + super.refactorTopic(topics, message); + EventMessage event = ((EventMessage) message); + topics[topics.length - 1] = String.valueOf(event.getEvent()); + } + }, //读取属性 readProperty("/*/properties/read", ReadPropertyMessage.class), //读取属性回复 diff --git a/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java index 3d34fe0..3da78ef 100644 --- a/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java +++ b/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java @@ -2,6 +2,8 @@ package org.jetlinks.protocol.official; import com.fasterxml.jackson.databind.ObjectMapper; import org.jetlinks.core.message.ChildDeviceMessage; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.event.EventMessage; import org.jetlinks.core.message.property.ReportPropertyMessage; import org.junit.Test; import reactor.test.StepVerifier; @@ -41,4 +43,21 @@ public class TopicMessageCodecTest { testChild(ObjectMappers.JSON_MAPPER); testChild(ObjectMappers.CBOR_MAPPER); } + + @Test + public void testEvent() { + EventMessage eventMessage = new EventMessage(); + eventMessage.setEvent("test"); + eventMessage.setDeviceId("test-device"); + eventMessage.setData("123"); + + TopicPayload payload = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, eventMessage); + assertEquals(payload.getTopic(), "/test-device/event/test"); + + + DeviceMessage msg = TopicMessageCodec + .decode(ObjectMappers.JSON_MAPPER, payload.getTopic(), payload.getPayload()) + .blockLast(); + assertEquals(msg.toJson(),eventMessage.toJson()); + } } \ No newline at end of file