优化协议

This commit is contained in:
zhou-hao 2021-01-21 11:21:28 +08:00
parent 40c3f5c535
commit 59c981bf95
2 changed files with 18 additions and 21 deletions

View File

@ -11,7 +11,6 @@ import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*; import org.jetlinks.core.message.property.*;
import org.jetlinks.core.utils.TopicUtils; import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.supports.utils.MqttTopicUtils;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.util.Map; import java.util.Map;
@ -86,7 +85,7 @@ class JetlinksTopicMessageCodec {
Assert.notNull(message, "message can not be null"); Assert.notNull(message, "message can not be null");
if (message instanceof ReadPropertyMessage) { if (message instanceof ReadPropertyMessage) {
String topic = "/" .concat(deviceId).concat("/properties/read"); String topic = "/".concat(deviceId).concat("/properties/read");
JSONObject mqttData = new JSONObject(); JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId()); mqttData.put("messageId", message.getMessageId());
mqttData.put("properties", ((ReadPropertyMessage) message).getProperties()); mqttData.put("properties", ((ReadPropertyMessage) message).getProperties());
@ -94,7 +93,7 @@ class JetlinksTopicMessageCodec {
return new EncodedTopic(topic, mqttData); return new EncodedTopic(topic, mqttData);
} else if (message instanceof WritePropertyMessage) { } else if (message instanceof WritePropertyMessage) {
String topic = "/" .concat(deviceId).concat("/properties/write"); String topic = "/".concat(deviceId).concat("/properties/write");
JSONObject mqttData = new JSONObject(); JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId()); mqttData.put("messageId", message.getMessageId());
mqttData.put("properties", ((WritePropertyMessage) message).getProperties()); mqttData.put("properties", ((WritePropertyMessage) message).getProperties());
@ -102,7 +101,7 @@ class JetlinksTopicMessageCodec {
return new EncodedTopic(topic, mqttData); return new EncodedTopic(topic, mqttData);
} else if (message instanceof FunctionInvokeMessage) { } else if (message instanceof FunctionInvokeMessage) {
String topic = "/" .concat(deviceId).concat("/function/invoke"); String topic = "/".concat(deviceId).concat("/function/invoke");
FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message); FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message);
JSONObject mqttData = new JSONObject(); JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId()); mqttData.put("messageId", message.getMessageId());
@ -112,7 +111,7 @@ class JetlinksTopicMessageCodec {
return new EncodedTopic(topic, mqttData); return new EncodedTopic(topic, mqttData);
} else if (message instanceof UpgradeFirmwareMessage) { } else if (message instanceof UpgradeFirmwareMessage) {
String topic = "/" .concat(deviceId).concat("/firmware/upgrade"); String topic = "/".concat(deviceId).concat("/firmware/upgrade");
UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message); UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message);
JSONObject mqttData = new JSONObject(); JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId()); mqttData.put("messageId", message.getMessageId());
@ -125,13 +124,13 @@ class JetlinksTopicMessageCodec {
return new EncodedTopic(topic, mqttData); return new EncodedTopic(topic, mqttData);
} else if (message instanceof ReadFirmwareMessage) { } else if (message instanceof ReadFirmwareMessage) {
String topic = "/" .concat(deviceId).concat("/firmware/read"); String topic = "/".concat(deviceId).concat("/firmware/read");
JSONObject mqttData = new JSONObject(); JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId()); mqttData.put("messageId", message.getMessageId());
mqttData.put("deviceId", deviceId); mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData); return new EncodedTopic(topic, mqttData);
} else if (message instanceof RequestFirmwareMessageReply) { } else if (message instanceof RequestFirmwareMessageReply) {
String topic = "/" .concat(deviceId).concat("/firmware/pull/reply"); String topic = "/".concat(deviceId).concat("/firmware/pull/reply");
RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message); RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message);
JSONObject mqttData = new JSONObject(); JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId()); mqttData.put("messageId", message.getMessageId());
@ -145,7 +144,7 @@ class JetlinksTopicMessageCodec {
} else if (message instanceof ChildDeviceMessage) { } else if (message instanceof ChildDeviceMessage) {
ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message); ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message);
EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage()); EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
String topic = "/" .concat(deviceId).concat("/child").concat(result.topic); String topic = "/".concat(deviceId).concat("/child").concat(result.topic);
result.payload.put("deviceId", childDeviceMessage.getChildDeviceId()); result.payload.put("deviceId", childDeviceMessage.getChildDeviceId());
return new EncodedTopic(topic, result.payload); return new EncodedTopic(topic, result.payload);
@ -180,7 +179,7 @@ class JetlinksTopicMessageCodec {
message = object.toJavaObject(ReportFirmwareMessage.class); message = object.toJavaObject(ReportFirmwareMessage.class);
} else if (result.isUpgradeFirmwareProgress()) { } else if (result.isUpgradeFirmwareProgress()) {
message = object.toJavaObject(UpgradeFirmwareProgressMessage.class); message = object.toJavaObject(UpgradeFirmwareProgressMessage.class);
}else if (topic.endsWith("connected")) { } else if (topic.endsWith("connected")) {
message = object.toJavaObject(DeviceOnlineMessage.class); message = object.toJavaObject(DeviceOnlineMessage.class);
} else if (topic.endsWith("disconnect")) { } else if (topic.endsWith("disconnect")) {
message = object.toJavaObject(DeviceOfflineMessage.class); message = object.toJavaObject(DeviceOfflineMessage.class);

View File

@ -180,10 +180,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block(); .build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply); Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message); ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1"); Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test"); Assert.assertEquals(childReply.getMessageId(),"test");
@ -251,10 +250,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes())) .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).block(); .build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply); Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message); ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1"); Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test"); Assert.assertEquals(childReply.getMessageId(),"test");
@ -288,9 +286,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes())) .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).block(); .build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply); Assert.assertTrue(message instanceof ChildDeviceMessage);
EventMessage reply = ((EventMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage()); EventMessage reply = ((EventMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test"); Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test"); Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100); Assert.assertEquals(reply.getData(), 100);
@ -320,9 +318,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block(); .build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply); Assert.assertTrue(message instanceof ChildDeviceMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage()); ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test"); Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test"); Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test")); Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
@ -352,9 +350,9 @@ public class JetLinksMqttDeviceMessageCodecTest {
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes())) .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).block(); .build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply); Assert.assertTrue(message instanceof ChildDeviceMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage()); DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test"); Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test"); Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1"); Assert.assertEquals(reply.getMetadata(), "1");