decode(@Nonnull MessageDecodeContext context) {
+ return Flux.defer(() -> {
log.debug("handle coap message:\n{}", context.getMessage());
if (context.getMessage() instanceof CoapExchangeMessage) {
return decode(((CoapExchangeMessage) context.getMessage()), context);
@@ -73,7 +83,7 @@ public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec im
return decode(((CoapMessage) context.getMessage()), context);
}
- return Mono.empty();
+ return Flux.empty();
});
}
diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java
index 68eac18..b6594f0 100644
--- a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java
+++ b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java
@@ -1,17 +1,18 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
+import org.jetlinks.core.server.session.DeviceSession;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
-import java.nio.charset.StandardCharsets;
/**
*
@@ -49,10 +50,9 @@ import java.nio.charset.StandardCharsets;
* @since 1.0.0
*/
@Slf4j
-public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
-
- private Transport transport;
+public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
+ private final Transport transport;
public JetLinksMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
@@ -71,21 +71,33 @@ public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec im
public Mono encode(@Nonnull MessageEncodeContext context) {
return Mono.defer(() -> {
Message message = context.getMessage();
+
+ if (message instanceof DisconnectDeviceMessage) {
+ return ((ToDeviceMessageContext) context)
+ .disconnect()
+ .then(Mono.empty());
+ }
+
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
- EncodedTopic convertResult = encode(deviceMessage.getDeviceId(), deviceMessage);
+ TopicPayload convertResult = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
- return context.getDevice()
- .getConfig(DeviceConfigKey.productId)
+ return Mono
+ .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
+ .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
+ .flatMap(device -> device
+ .getConfig(DeviceConfigKey.productId))
+ )
.defaultIfEmpty("null")
- .map(productId -> SimpleMqttMessage.builder()
+ .map(productId -> SimpleMqttMessage
+ .builder()
.clientId(deviceMessage.getDeviceId())
- .topic("/" .concat(productId).concat(convertResult.topic))
+ .topic("/".concat(productId).concat(convertResult.getTopic()))
.payloadType(MessagePayloadType.JSON)
- .payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(convertResult.payload)))
+ .payload(Unpooled.wrappedBuffer(convertResult.getPayload()))
.build());
} else {
return Mono.empty();
@@ -95,18 +107,14 @@ public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec im
@Nonnull
@Override
- public Mono decode(@Nonnull MessageDecodeContext context) {
- return Mono.fromSupplier(() -> {
- MqttMessage message = (MqttMessage) context.getMessage();
- String topic = message.getTopic();
- String jsonData = message.getPayload().toString(StandardCharsets.UTF_8);
+ public Flux decode(@Nonnull MessageDecodeContext context) {
+ MqttMessage message = (MqttMessage) context.getMessage();
+
+ byte[] payload = message.payloadAsBytes();
+
+ return TopicMessageCodec
+ .decode(ObjectMappers.JSON_MAPPER, TopicMessageCodec.removeProductPath(message.getTopic()), payload);
- JSONObject object = JSON.parseObject(jsonData, JSONObject.class);
- if (object == null) {
- throw new UnsupportedOperationException("cannot parse payload:{}" + jsonData);
- }
- return decode(topic, object).getMessage();
- });
}
}
diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java
index a885f52..8769b0b 100644
--- a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java
+++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java
@@ -46,9 +46,9 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport();
- support.setId("jetlinks.v1.0");
- support.setName("JetLinks V1.0");
- support.setDescription("JetLinks Protocol Version 1.0");
+ support.setId("jetlinks.v2.0");
+ support.setName("JetLinks V2.0");
+ support.setDescription("JetLinks Protocol Version 2.0");
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
diff --git a/src/main/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodec.java
deleted file mode 100644
index 762259b..0000000
--- a/src/main/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodec.java
+++ /dev/null
@@ -1,290 +0,0 @@
-package org.jetlinks.protocol.official;
-
-import com.alibaba.fastjson.JSONObject;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-import org.jetlinks.core.message.*;
-import org.jetlinks.core.message.event.EventMessage;
-import org.jetlinks.core.message.firmware.*;
-import org.jetlinks.core.message.function.FunctionInvokeMessage;
-import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
-import org.jetlinks.core.message.property.*;
-import org.jetlinks.core.utils.TopicUtils;
-import org.jetlinks.supports.utils.MqttTopicUtils;
-import org.springframework.util.Assert;
-
-import java.util.Map;
-import java.util.Optional;
-
-class JetlinksTopicMessageCodec {
-
- @Getter
- protected class DecodeResult {
- private Map args;
-
- private boolean child;
-
- private boolean event;
- private boolean readPropertyReply;
- private boolean writePropertyReply;
- private boolean functionInvokeReply;
- private boolean reportProperties;
- private boolean derivedMetadata;
- private boolean register;
- private boolean unregister;
-
- private boolean requestFirmware;
- private boolean reportFirmware;
- private boolean upgradeFirmwareProgress;
- private boolean readFirmwareReply;
-
-
- public DecodeResult(String topic) {
- this.topic = topic;
- args = TopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
- if (topic.contains("child")) {
- child = true;
- args.putAll(TopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
- }
- if (topic.contains("event")) {
- event = true;
- args.putAll(TopicUtils.getPathVariables("/**/event/{eventId}", topic));
- }
- derivedMetadata = topic.endsWith("metadata/derived");
- if (event) {
- } else if (reportProperties = topic.endsWith("properties/report")) {
- } else if (unregister = topic.endsWith("unregister")) {
- } else if (register = topic.endsWith("register")) {
- } else if (readPropertyReply = topic.endsWith("properties/read/reply")) {
- } else if (writePropertyReply = topic.endsWith("properties/write/reply")) {
- } else if (functionInvokeReply = topic.endsWith("function/invoke/reply")) {
- } else if (upgradeFirmwareProgress = topic.endsWith("firmware/upgrade/progress")) {
- } else if (requestFirmware = topic.endsWith("firmware/pull")) {
- } else if (reportFirmware = topic.endsWith("firmware/report")) {
- } else if (readFirmwareReply = topic.endsWith("firmware/read/reply")) {
- } else if (derivedMetadata = topic.endsWith("metadata/derived")) {
- }
- }
-
- private final String topic;
-
- public String getDeviceId() {
- return args.get("deviceId");
- }
-
- public String getChildDeviceId() {
- return args.get("childDeviceId");
- }
-
- protected Message message;
- }
-
- protected EncodedTopic encode(String deviceId, Message message) {
-
- Assert.hasText(deviceId, "deviceId can not be null");
- Assert.notNull(message, "message can not be null");
-
- if (message instanceof ReadPropertyMessage) {
- String topic = "/" .concat(deviceId).concat("/properties/read");
- JSONObject mqttData = new JSONObject();
- mqttData.put("messageId", message.getMessageId());
- mqttData.put("properties", ((ReadPropertyMessage) message).getProperties());
- mqttData.put("deviceId", deviceId);
-
- return new EncodedTopic(topic, mqttData);
- } else if (message instanceof WritePropertyMessage) {
- String topic = "/" .concat(deviceId).concat("/properties/write");
- JSONObject mqttData = new JSONObject();
- mqttData.put("messageId", message.getMessageId());
- mqttData.put("properties", ((WritePropertyMessage) message).getProperties());
- mqttData.put("deviceId", deviceId);
-
- return new EncodedTopic(topic, mqttData);
- } else if (message instanceof FunctionInvokeMessage) {
- String topic = "/" .concat(deviceId).concat("/function/invoke");
- FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message);
- JSONObject mqttData = new JSONObject();
- mqttData.put("messageId", message.getMessageId());
- mqttData.put("function", invokeMessage.getFunctionId());
- mqttData.put("inputs", invokeMessage.getInputs());
- mqttData.put("deviceId", deviceId);
-
- return new EncodedTopic(topic, mqttData);
- } else if (message instanceof UpgradeFirmwareMessage) {
- String topic = "/" .concat(deviceId).concat("/firmware/upgrade");
- UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message);
- JSONObject mqttData = new JSONObject();
- mqttData.put("messageId", message.getMessageId());
- mqttData.put("url", firmwareMessage.getUrl());
- mqttData.put("sign", firmwareMessage.getSign());
- mqttData.put("version", firmwareMessage.getVersion());
- mqttData.put("signMethod", firmwareMessage.getSignMethod());
- mqttData.put("parameters", firmwareMessage.getParameters());
- mqttData.put("deviceId", deviceId);
-
- return new EncodedTopic(topic, mqttData);
- } else if (message instanceof ReadFirmwareMessage) {
- String topic = "/" .concat(deviceId).concat("/firmware/read");
- JSONObject mqttData = new JSONObject();
- mqttData.put("messageId", message.getMessageId());
- mqttData.put("deviceId", deviceId);
- return new EncodedTopic(topic, mqttData);
- } else if (message instanceof RequestFirmwareMessageReply) {
- String topic = "/" .concat(deviceId).concat("/firmware/pull/reply");
- RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message);
- JSONObject mqttData = new JSONObject();
- mqttData.put("messageId", message.getMessageId());
- mqttData.put("url", firmwareMessage.getUrl());
- mqttData.put("sign", firmwareMessage.getSign());
- mqttData.put("version", firmwareMessage.getVersion());
- mqttData.put("signMethod", firmwareMessage.getSignMethod());
- mqttData.put("parameters", firmwareMessage.getParameters());
- mqttData.put("deviceId", deviceId);
- return new EncodedTopic(topic, mqttData);
- } else if (message instanceof ChildDeviceMessage) {
- ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message);
- EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
- String topic = "/" .concat(deviceId).concat("/child").concat(result.topic);
- result.payload.put("deviceId", childDeviceMessage.getChildDeviceId());
-
- return new EncodedTopic(topic, result.payload);
- }
- return null;
- }
-
- protected DecodeResult decode(String topic, JSONObject object) {
- DecodeResult result = new DecodeResult(topic);
- Message message = null;
- if (result.isEvent()) {
- message = decodeEvent(result, object);
- } else if (result.isReportProperties()) {
- message = decodeReportPropertyReply(result, object);
- } else if (result.isReadPropertyReply()) {
- message = decodeReadPropertyReply(result, object);
- } else if (result.isWritePropertyReply()) {
- message = decodeWritePropertyReply(result, object);
- } else if (result.isFunctionInvokeReply()) {
- message = decodeInvokeReply(result, object);
- } else if (result.isRegister()) {
- message = decodeRegister(result, object);
- } else if (result.isUnregister()) {
- message = decodeUnregister(result, object);
- } else if (result.isDerivedMetadata()) {
- message = decodeDerivedMetadata(result, object);
- } else if (result.isReadFirmwareReply()) {
- message = object.toJavaObject(ReadFirmwareMessageReply.class);
- } else if (result.isRequestFirmware()) {
- message = object.toJavaObject(RequestFirmwareMessage.class);
- } else if (result.isReportFirmware()) {
- message = object.toJavaObject(ReportFirmwareMessage.class);
- } else if (result.isUpgradeFirmwareProgress()) {
- message = object.toJavaObject(UpgradeFirmwareProgressMessage.class);
- }else if (topic.endsWith("connected")) {
- message = object.toJavaObject(DeviceOnlineMessage.class);
- } else if (topic.endsWith("disconnect")) {
- message = object.toJavaObject(DeviceOfflineMessage.class);
- }
-
- if (result.isChild()) {
- if (message == null) {
- throw new UnsupportedOperationException("unsupported topic:" + topic);
- }
- applyCommons(message, result, object);
- ChildDeviceMessage children = new ChildDeviceMessage();
- children.setChildDeviceId(result.getChildDeviceId());
- children.setDeviceId(result.getDeviceId());
- children.setChildDeviceMessage(message);
- children.setTimestamp(Optional.ofNullable(object.getLong("timestamp")).orElse(System.currentTimeMillis()));
- Optional.ofNullable(object.getString("messageId")).ifPresent(children::setMessageId);
- result.message = children;
- } else {
- if (message == null) {
- throw new UnsupportedOperationException("unsupported topic:" + topic);
- }
- applyCommons(message, result, object);
- result.message = message;
- }
- return result;
- }
-
-
- private Message decodeEvent(DecodeResult result, JSONObject event) {
- EventMessage message = event.toJavaObject(EventMessage.class);
- message.setData(event.get("data"));
- message.setEvent(result.args.get("eventId"));
- return message;
- }
-
- private Message decodeReadPropertyReply(DecodeResult result, JSONObject data) {
-
- return data.toJavaObject(ReadPropertyMessageReply.class);
- }
-
-
- private Message decodeReportPropertyReply(DecodeResult result, JSONObject data) {
-
- return data.toJavaObject(ReportPropertyMessage.class);
- }
-
-
- private Message decodeWritePropertyReply(DecodeResult result, JSONObject data) {
-
- return data.toJavaObject(WritePropertyMessageReply.class);
- }
-
- private Message decodeInvokeReply(DecodeResult result, JSONObject data) {
- return data.toJavaObject(FunctionInvokeMessageReply.class);
- }
-
- private Message decodeRegister(DecodeResult result, JSONObject data) {
- return data.toJavaObject(DeviceRegisterMessage.class);
- }
-
- private Message decodeUnregister(DecodeResult result, JSONObject data) {
- return data.toJavaObject(DeviceUnRegisterMessage.class);
- }
-
- private Message decodeDerivedMetadata(DecodeResult result, JSONObject data) {
- return data.toJavaObject(DerivedMetadataMessage.class);
- }
-
- private void applyCommons(Message message, DecodeResult result, JSONObject data) {
- if (message instanceof CommonDeviceMessageReply) {
- CommonDeviceMessageReply reply = ((CommonDeviceMessageReply) message);
- reply.setSuccess(Optional.ofNullable(data.getBoolean("success")).orElse(true));
- reply.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
- if (result.isChild()) {
- reply.setDeviceId(result.getChildDeviceId());
- } else {
- reply.setDeviceId(result.getDeviceId());
- }
- }
- if (message instanceof CommonDeviceMessage) {
- CommonDeviceMessage msg = ((CommonDeviceMessage) message);
- msg.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
- if (result.isChild()) {
- msg.setDeviceId(result.getChildDeviceId());
- } else {
- msg.setDeviceId(result.getDeviceId());
- }
- }
- }
-
- @Getter
- @Setter
- @AllArgsConstructor
- protected class EncodedTopic {
- String topic;
-
- JSONObject payload;
- }
-
- @Getter
- @Setter
- protected class Decoded {
- Message message;
-
- }
-
-}
diff --git a/src/main/java/org/jetlinks/protocol/official/ObjectMappers.java b/src/main/java/org/jetlinks/protocol/official/ObjectMappers.java
new file mode 100644
index 0000000..d8240af
--- /dev/null
+++ b/src/main/java/org/jetlinks/protocol/official/ObjectMappers.java
@@ -0,0 +1,28 @@
+package org.jetlinks.protocol.official;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
+
+public class ObjectMappers {
+
+ public static final ObjectMapper JSON_MAPPER;
+ public static final ObjectMapper CBOR_MAPPER;
+
+ static {
+ JSON_MAPPER = Jackson2ObjectMapperBuilder
+ .json()
+ .build()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL)
+ ;
+ CBOR_MAPPER = Jackson2ObjectMapperBuilder
+ .cbor()
+ .build()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ }
+
+}
diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java
new file mode 100644
index 0000000..084541b
--- /dev/null
+++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java
@@ -0,0 +1,243 @@
+package org.jetlinks.protocol.official;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.SneakyThrows;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.core.message.*;
+import org.jetlinks.core.message.event.EventMessage;
+import org.jetlinks.core.message.firmware.*;
+import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
+import org.jetlinks.core.message.property.*;
+import org.jetlinks.core.message.state.DeviceStateCheckMessage;
+import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
+import org.jetlinks.core.utils.TopicUtils;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+public enum TopicMessageCodec {
+ //上报属性数据
+ reportProperty("/*/properties/report", ReportPropertyMessage.class),
+ //事件上报
+ event("/*/event/*", EventMessage.class),
+ //读取属性
+ readProperty("/*/properties/read", ReadPropertyMessage.class),
+ //读取属性回复
+ readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class),
+ //修改属性
+ writeProperty("/*/properties/write", WritePropertyMessage.class),
+ //修改属性回复
+ writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class),
+ //调用功能
+ functionInvoke("/*/function/invoke", FunctionInvokeMessage.class),
+ //调用功能回复
+ functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class),
+ //子设备消息
+ child("/*/child/*/**", ChildDeviceMessage.class) {
+ @Override
+ public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
+ String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
+ _topic[0] = "";// topic以/开头所有第一位是空白
+ return TopicMessageCodec
+ .decode(mapper, _topic, payload)
+ .map(childMsg -> {
+ ChildDeviceMessage msg = new ChildDeviceMessage();
+ msg.setDeviceId(topic[1]);
+ msg.setChildDeviceMessage(childMsg);
+ msg.setTimestamp(childMsg.getTimestamp());
+ msg.setMessageId(childMsg.getMessageId());
+ return msg;
+ });
+ }
+
+ @Override
+ protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
+ ChildDeviceMessage deviceMessage = ((ChildDeviceMessage) message);
+
+ DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
+
+ TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
+ String[] childTopic = payload.getTopic().split("/");
+ String[] topic = new String[topics.length + childTopic.length - 3];
+ //合并topic
+ System.arraycopy(topics, 0, topic, 0, topics.length - 1);
+ System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
+
+ refactorTopic(topic, message);
+ payload.setTopic(String.join("/", topic));
+ return payload;
+
+ }
+ }, //子设备消息回复
+ childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) {
+ @Override
+ public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
+ String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
+ _topic[0] = "";// topic以/开头所有第一位是空白
+ return TopicMessageCodec
+ .decode(mapper, _topic, payload)
+ .map(childMsg -> {
+ ChildDeviceMessageReply msg = new ChildDeviceMessageReply();
+ msg.setDeviceId(topic[1]);
+ msg.setChildDeviceMessage(childMsg);
+ msg.setTimestamp(childMsg.getTimestamp());
+ msg.setMessageId(childMsg.getMessageId());
+ return msg;
+ });
+ }
+
+ @Override
+ protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
+ ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message);
+
+ DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
+
+ TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
+ String[] childTopic = payload.getTopic().split("/");
+ String[] topic = new String[topics.length + childTopic.length - 3];
+ //合并topic
+ System.arraycopy(topics, 0, topic, 0, topics.length - 1);
+ System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
+
+ refactorTopic(topic, message);
+ payload.setTopic(String.join("/", topic));
+ return payload;
+
+ }
+ },
+ //更新标签
+ updateTag("/*/tags", UpdateTagMessage.class),
+ //注册
+ register("/*/register", DeviceRegisterMessage.class),
+ //注销
+ unregister("/*/unregister", DeviceUnRegisterMessage.class),
+ //更新固件消息
+ upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
+ //更新固件升级进度消息
+ upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
+ //拉取固件
+ requestFirmware("/*/firmware/pull", RequestFirmwareMessage.class),
+ //拉取固件更新回复
+ requestFirmwareReply("/*/firmware/pull/reply", RequestFirmwareMessageReply.class),
+ //上报固件版本
+ reportFirmware("/*/firmware/report", ReportFirmwareMessage.class),
+ //读取固件回复
+ readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class),
+ //派生物模型上报
+ derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class),
+ //透传设备消息
+ direct("/*/direct", DirectDeviceMessage.class) {
+ @Override
+ public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
+ DirectDeviceMessage message = new DirectDeviceMessage();
+ message.setDeviceId(topic[1]);
+ message.setPayload(payload);
+ return Mono.just(message);
+ }
+ },
+ //断开连接消息
+ disconnect("/*/disconnect", DisconnectDeviceMessage.class),
+ //断开连接回复
+ disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
+ //上线
+ connect("/*/online", DeviceOnlineMessage.class),
+ //离线
+ offline("/*/offline", DeviceOfflineMessage.class),
+ //日志
+ log("/*/log", DeviceLogMessage.class),
+ //状态检查
+ stateCheck("/*/state-check", DeviceStateCheckMessage.class),
+ stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
+ ;
+
+ TopicMessageCodec(String topic, Class extends DeviceMessage> type) {
+ this.pattern = topic.split("/");
+ this.type = type;
+ }
+
+ private final String[] pattern;
+ private final Class extends DeviceMessage> type;
+
+ public static Flux decode(ObjectMapper mapper, String[] topics, byte[] payload) {
+ return Mono
+ .justOrEmpty(fromTopic(topics))
+ .flatMapMany(topicMessageCodec -> topicMessageCodec.doDecode(mapper, topics, payload));
+ }
+
+ public static Flux decode(ObjectMapper mapper, String topic, byte[] payload) {
+ return decode(mapper, topic.split("/"), payload);
+ }
+
+ public static TopicPayload encode(ObjectMapper mapper, DeviceMessage message) {
+
+ return fromMessage(message)
+ .orElseThrow(() -> new UnsupportedOperationException("unsupported message:" + message.getMessageType()))
+ .doEncode(mapper, message);
+ }
+
+ static Optional fromTopic(String[] topic) {
+ for (TopicMessageCodec value : values()) {
+ if (TopicUtils.match(value.pattern, topic)) {
+ return Optional.of(value);
+ }
+ }
+ return Optional.empty();
+ }
+
+ static Optional fromMessage(DeviceMessage message) {
+ for (TopicMessageCodec value : values()) {
+ if (value.type == message.getClass()) {
+ return Optional.of(value);
+ }
+ }
+ return Optional.empty();
+ }
+
+ Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
+ return Mono
+ .fromCallable(() -> {
+ DeviceMessage message = mapper.readValue(payload, type);
+ FastBeanCopier.copy(Collections.singletonMap("deviceId", topic[1]), message);
+
+ return message;
+ });
+ }
+
+ @SneakyThrows
+ TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
+ refactorTopic(topics, message);
+ return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message));
+ }
+
+ @SneakyThrows
+ TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) {
+ String[] topics = Arrays.copyOf(pattern, pattern.length);
+ return doEncode(mapper, topics, message);
+ }
+
+ void refactorTopic(String[] topics, DeviceMessage message) {
+ topics[1] = message.getDeviceId();
+ }
+
+ /**
+ * 移除topic中的产品信息,topic第一个层为产品ID,在解码时,不需要此信息,所以需要移除之.
+ *
+ * @param topic topic
+ * @return 移除后的topic
+ */
+ public static String[] removeProductPath(String topic) {
+ if (!topic.startsWith("/")) {
+ topic = "/" + topic;
+ }
+ String[] topicArr = topic.split("/");
+ String[] topics = Arrays.copyOfRange(topicArr, 1, topicArr.length);
+ topics[0] = "";
+ return topics;
+ }
+
+}
diff --git a/src/main/java/org/jetlinks/protocol/official/TopicPayload.java b/src/main/java/org/jetlinks/protocol/official/TopicPayload.java
new file mode 100644
index 0000000..7cc4956
--- /dev/null
+++ b/src/main/java/org/jetlinks/protocol/official/TopicPayload.java
@@ -0,0 +1,15 @@
+package org.jetlinks.protocol.official;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor(staticName = "of") // FIXME 使用对象池,节省一丢丢内存?
+public class TopicPayload {
+
+ private String topic;
+
+ private byte[] payload;
+}
diff --git a/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider b/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider
deleted file mode 100644
index 39e2abe..0000000
--- a/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider
+++ /dev/null
@@ -1 +0,0 @@
-org.jetlinks.protocol.official.JetLinksProtocolSupportProvider
\ No newline at end of file
diff --git a/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java
index 00ec80e..815e115 100644
--- a/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java
+++ b/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class JetLinksCoapDeviceMessageCodecTest {
- JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
+ JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
DeviceOperator device;
@@ -39,10 +39,11 @@ public class JetLinksCoapDeviceMessageCodecTest {
@Before
public void init() {
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
- device = registry.register(DeviceInfo.builder()
- .id("test")
- .protocol("jetlinks")
- .build())
+ device = registry
+ .register(DeviceInfo.builder()
+ .id("test")
+ .protocol("jetlinks")
+ .build())
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
.block();
}
@@ -59,19 +60,20 @@ public class JetLinksCoapDeviceMessageCodecTest {
@Override
public void handlePOST(CoapExchange exchange) {
- codec.decode(new MessageDecodeContext() {
- @Nonnull
- @Override
- public EncodedMessage getMessage() {
- return new CoapExchangeMessage(exchange);
- }
+ codec
+ .decode(new MessageDecodeContext() {
+ @Nonnull
+ @Override
+ public EncodedMessage getMessage() {
+ return new CoapExchangeMessage(exchange);
+ }
- @Override
- public DeviceOperator getDevice() {
- return device;
- }
- })
- .doOnSuccess(messageRef::set)
+ @Override
+ public DeviceOperator getDevice() {
+ return device;
+ }
+ })
+ .doOnNext(messageRef::set)
.doOnError(Throwable::printStackTrace)
.subscribe();
}
@@ -85,7 +87,7 @@ public class JetLinksCoapDeviceMessageCodecTest {
};
Endpoint endpoint = new CoapEndpoint.Builder()
- .setPort(12345).build();
+ .setPort(12341).build();
server.addEndpoint(endpoint);
server.start();
@@ -96,8 +98,8 @@ public class JetLinksCoapDeviceMessageCodecTest {
Request request = Request.newPost();
String payload = "{\"data\":1}";
- request.setURI("coap://localhost:12345/test/test/event/event1");
- request.setPayload(Ciphers.AES.encrypt(payload.getBytes(),key));
+ request.setURI("coap://localhost:12341/test/test/event/event1");
+ request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
diff --git a/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java
index 2079c84..44d60fd 100644
--- a/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java
+++ b/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java
@@ -7,7 +7,6 @@ import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.ProductInfo;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.ChildDeviceMessage;
-import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DerivedMetadataMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
@@ -16,7 +15,6 @@ import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
-import org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -28,7 +26,7 @@ import java.util.Collections;
public class JetLinksMqttDeviceMessageCodecTest {
- org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
+ JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
TestDeviceRegistry registry;
@@ -87,7 +85,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
@@ -103,12 +101,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
- Assert.assertTrue(message instanceof ChildDeviceMessageReply);
- ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
+ Assert.assertTrue(message instanceof ChildDeviceMessage);
+ ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
- Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
@@ -151,7 +148,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Assert.assertNotNull(encodedMessage);
- Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/write");
+ Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/device1/properties/write");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@@ -160,7 +157,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
Assert.assertTrue(message instanceof WritePropertyMessageReply);
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
@@ -178,12 +175,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
- Assert.assertTrue(message instanceof ChildDeviceMessageReply);
- ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
+ Assert.assertTrue(message instanceof ChildDeviceMessage);
+ ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
- Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
@@ -233,7 +229,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
@@ -249,12 +245,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
- Assert.assertTrue(message instanceof ChildDeviceMessageReply);
- ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
+ Assert.assertTrue(message instanceof ChildDeviceMessage);
+ ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
- Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
@@ -271,7 +266,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
Assert.assertTrue(message instanceof EventMessage);
EventMessage reply = ((EventMessage) message);
@@ -286,11 +281,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
- Assert.assertTrue(message instanceof ChildDeviceMessageReply);
+ Assert.assertTrue(message instanceof ChildDeviceMessage);
- EventMessage reply = ((EventMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
+ EventMessage reply = ((EventMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100);
@@ -302,7 +297,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
@@ -318,11 +313,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
- Assert.assertTrue(message instanceof ChildDeviceMessageReply);
+ Assert.assertTrue(message instanceof ChildDeviceMessage);
- ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
+ ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
@@ -335,7 +330,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
Assert.assertTrue(message instanceof DerivedMetadataMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
@@ -350,11 +345,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
- .build())).block();
+ .build())).blockFirst();
- Assert.assertTrue(message instanceof ChildDeviceMessageReply);
+ Assert.assertTrue(message instanceof ChildDeviceMessage);
- DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
+ DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1");
diff --git a/src/test/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodecTest.java
deleted file mode 100644
index e34950c..0000000
--- a/src/test/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodecTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.jetlinks.protocol.official;
-
-import org.jetlinks.core.message.property.ReadPropertyMessage;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class JetlinksTopicMessageCodecTest {
-
- JetlinksTopicMessageCodec codec = new JetlinksTopicMessageCodec();
-
- @Test
- public void testReadProperty() {
-
- ReadPropertyMessage readProperty = new ReadPropertyMessage();
- readProperty.setProperties(Arrays.asList("name"));
- readProperty.setMessageId("test");
- JetlinksTopicMessageCodec.EncodedTopic topic = codec.encode("test", readProperty);
- Assert.assertEquals(topic.getTopic(),"/test/properties/read");
-
- }
-
-}
\ No newline at end of file
diff --git a/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java
new file mode 100644
index 0000000..3d34fe0
--- /dev/null
+++ b/src/test/java/org/jetlinks/protocol/official/TopicMessageCodecTest.java
@@ -0,0 +1,44 @@
+package org.jetlinks.protocol.official;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.jetlinks.core.message.ChildDeviceMessage;
+import org.jetlinks.core.message.property.ReportPropertyMessage;
+import org.junit.Test;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.*;
+
+public class TopicMessageCodecTest {
+
+
+ public void testChild(ObjectMapper objectMapper) {
+ ChildDeviceMessage message = new ChildDeviceMessage();
+ message.setDeviceId("test");
+ ReportPropertyMessage msg = new ReportPropertyMessage();
+ msg.setDeviceId("childId");
+ message.setChildDeviceMessage(msg);
+ message.setTimestamp(msg.getTimestamp());
+
+
+ TopicPayload payload = TopicMessageCodec.child.doEncode(objectMapper, message);
+ System.out.println(payload.getPayload().length);
+ assertEquals("/test/child/childId/properties/report", payload.getTopic());
+
+ TopicMessageCodec
+ .decode(objectMapper, payload.getTopic(), payload.getPayload())
+ .as(StepVerifier::create)
+ .expectNextMatches(deviceMessage -> {
+ System.out.println(message);
+ System.out.println(deviceMessage);
+ return deviceMessage.toJson().equals(message.toJson());
+ })
+ .verifyComplete();
+
+ }
+
+ @Test
+ public void doTest() {
+ testChild(ObjectMappers.JSON_MAPPER);
+ testChild(ObjectMappers.CBOR_MAPPER);
+ }
+}
\ No newline at end of file
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100644
index 0000000..74df945
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+ %-4relative [%thread] %-5level %logger{35} - %msg %n
+
+
+
+
+
+
+
\ No newline at end of file