commit
3c0e3aa046
Binary file not shown.
Binary file not shown.
142
pom.xml
142
pom.xml
|
@ -6,7 +6,7 @@
|
||||||
|
|
||||||
<groupId>org.jetlinks</groupId>
|
<groupId>org.jetlinks</groupId>
|
||||||
<artifactId>jetlinks-official-protocol</artifactId>
|
<artifactId>jetlinks-official-protocol</artifactId>
|
||||||
<version>1.0-SNAPSHOT</version>
|
<version>2.0-SNAPSHOT</version>
|
||||||
|
|
||||||
<name>JetLinks</name>
|
<name>JetLinks</name>
|
||||||
<url>http://jetlinks.org</url>
|
<url>http://jetlinks.org</url>
|
||||||
|
@ -50,6 +50,80 @@
|
||||||
<reactor.version>Dysprosium-RELEASE</reactor.version>
|
<reactor.version>Dysprosium-RELEASE</reactor.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
<profiles>
|
||||||
|
<profile>
|
||||||
|
<id>release</id>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.sonatype.plugins</groupId>
|
||||||
|
<artifactId>nexus-staging-maven-plugin</artifactId>
|
||||||
|
<version>1.6.3</version>
|
||||||
|
<extensions>true</extensions>
|
||||||
|
<configuration>
|
||||||
|
<serverId>sonatype-releases</serverId>
|
||||||
|
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
|
||||||
|
<autoReleaseAfterClose>true</autoReleaseAfterClose>
|
||||||
|
<stagingProgressTimeoutMinutes>120</stagingProgressTimeoutMinutes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-release-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<autoVersionSubmodules>true</autoVersionSubmodules>
|
||||||
|
<useReleaseProfile>false</useReleaseProfile>
|
||||||
|
<releaseProfiles>release</releaseProfiles>
|
||||||
|
<goals>deploy</goals>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-gpg-plugin</artifactId>
|
||||||
|
<version>1.5</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>sign-artifacts</id>
|
||||||
|
<phase>verify</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>sign</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-javadoc-plugin</artifactId>
|
||||||
|
<version>2.9.1</version>
|
||||||
|
<configuration>
|
||||||
|
<additionalparam>-Xdoclint:none</additionalparam>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>attach-javadocs</id>
|
||||||
|
<goals>
|
||||||
|
<goal>jar</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
<distributionManagement>
|
||||||
|
<repository>
|
||||||
|
<id>sonatype-releases</id>
|
||||||
|
<name>sonatype repository</name>
|
||||||
|
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
|
||||||
|
</repository>
|
||||||
|
<snapshotRepository>
|
||||||
|
<id>sonatype-snapshots</id>
|
||||||
|
<name>Nexus Snapshot Repository</name>
|
||||||
|
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
|
||||||
|
</snapshotRepository>
|
||||||
|
</distributionManagement>
|
||||||
|
</profile>
|
||||||
|
</profiles>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
|
@ -61,6 +135,21 @@
|
||||||
<target>8</target>
|
<target>8</target>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|
||||||
|
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-source-plugin</artifactId>
|
||||||
|
<version>2.4</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>attach-sources</id>
|
||||||
|
<goals>
|
||||||
|
<goal>jar-no-fork</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
|
||||||
|
@ -69,7 +158,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.jetlinks</groupId>
|
<groupId>org.jetlinks</groupId>
|
||||||
<artifactId>jetlinks-supports</artifactId>
|
<artifactId>jetlinks-supports</artifactId>
|
||||||
<version>1.1.4-SNAPSHOT</version>
|
<version>1.1.6-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -86,6 +175,11 @@
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||||
|
<artifactId>jackson-dataformat-cbor</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>junit</groupId>
|
<groupId>junit</groupId>
|
||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
|
@ -93,13 +187,53 @@
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.projectreactor</groupId>
|
||||||
|
<artifactId>reactor-test</artifactId>
|
||||||
|
<version>3.3.12.RELEASE</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
<version>1.2.3</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson</groupId>
|
||||||
|
<artifactId>jackson-bom</artifactId>
|
||||||
|
<version>2.11.4</version>
|
||||||
|
<type>pom</type>
|
||||||
|
<scope>import</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
|
||||||
|
<distributionManagement>
|
||||||
|
<repository>
|
||||||
|
<id>releases</id>
|
||||||
|
<name>Nexus Release Repository</name>
|
||||||
|
<url>http://nexus.hsweb.me/content/repositories/releases/</url>
|
||||||
|
</repository>
|
||||||
|
<snapshotRepository>
|
||||||
|
<id>snapshots</id>
|
||||||
|
<name>Nexus Snapshot Repository</name>
|
||||||
|
<url>http://nexus.hsweb.me/content/repositories/snapshots/</url>
|
||||||
|
</snapshotRepository>
|
||||||
|
</distributionManagement>
|
||||||
|
|
||||||
|
|
||||||
<repositories>
|
<repositories>
|
||||||
<repository>
|
<repository>
|
||||||
<id>hsweb-nexus</id>
|
<id>hsweb-nexus</id>
|
||||||
<name>Nexus Release Repository</name>
|
<name>Nexus Release Repository</name>
|
||||||
<url>http://nexus.hsweb.me/content/groups/public/</url>
|
<url>https://nexus.hsweb.me/content/groups/public/</url>
|
||||||
<snapshots>
|
<snapshots>
|
||||||
<enabled>true</enabled>
|
<enabled>true</enabled>
|
||||||
<updatePolicy>always</updatePolicy>
|
<updatePolicy>always</updatePolicy>
|
||||||
|
@ -108,7 +242,7 @@
|
||||||
<repository>
|
<repository>
|
||||||
<id>aliyun-nexus</id>
|
<id>aliyun-nexus</id>
|
||||||
<name>aliyun</name>
|
<name>aliyun</name>
|
||||||
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
|
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
|
||||||
</repository>
|
</repository>
|
||||||
</repositories>
|
</repositories>
|
||||||
|
|
||||||
|
|
|
@ -1,22 +1,25 @@
|
||||||
package org.jetlinks.protocol.official;
|
package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.eclipse.californium.core.coap.CoAP;
|
import org.eclipse.californium.core.coap.CoAP;
|
||||||
|
import org.eclipse.californium.core.coap.OptionNumberRegistry;
|
||||||
import org.hswebframework.web.id.IDGenerator;
|
import org.hswebframework.web.id.IDGenerator;
|
||||||
import org.jetlinks.core.message.Message;
|
import org.jetlinks.core.message.DeviceMessage;
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.util.Arrays;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
|
public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Transport getSupportTransport() {
|
public Transport getSupportTransport() {
|
||||||
|
@ -24,15 +27,22 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
|
public Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
|
||||||
return Mono.defer(() -> {
|
return Flux.defer(() -> {
|
||||||
String path = message.getPath();
|
String path = message.getPath();
|
||||||
String sign = message.getStringOption(2110).orElse(null);
|
String sign = message.getStringOption(2110).orElse(null);
|
||||||
String token = message.getStringOption(2111).orElse(null);
|
String token = message.getStringOption(2111).orElse(null);
|
||||||
String payload = message.getPayload().toString(StandardCharsets.UTF_8);
|
byte[] payload = message.payloadAsBytes();
|
||||||
|
boolean cbor = message
|
||||||
|
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
|
||||||
|
.map(MediaType::valueOf)
|
||||||
|
.map(MediaType.APPLICATION_CBOR::includes)
|
||||||
|
.orElse(false);
|
||||||
|
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
|
||||||
if ("/auth".equals(path)) {
|
if ("/auth".equals(path)) {
|
||||||
//认证
|
//认证
|
||||||
return context.getDevice()
|
return context
|
||||||
|
.getDevice()
|
||||||
.getConfig("secureKey")
|
.getConfig("secureKey")
|
||||||
.flatMap(sk -> {
|
.flatMap(sk -> {
|
||||||
String secureKey = sk.asString();
|
String secureKey = sk.asString();
|
||||||
|
@ -42,12 +52,12 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
|
||||||
}
|
}
|
||||||
String newToken = IDGenerator.MD5.generate();
|
String newToken = IDGenerator.MD5.generate();
|
||||||
return context.getDevice()
|
return context.getDevice()
|
||||||
.setConfig("coap-token", newToken)
|
.setConfig("coap-token", newToken)
|
||||||
.doOnSuccess(success -> {
|
.doOnSuccess(success -> {
|
||||||
JSONObject json = new JSONObject();
|
JSONObject json = new JSONObject();
|
||||||
json.put("token", newToken);
|
json.put("token", newToken);
|
||||||
response.accept(json.toJSONString());
|
response.accept(json.toJSONString());
|
||||||
});
|
});
|
||||||
})
|
})
|
||||||
.then(Mono.empty());
|
.then(Mono.empty());
|
||||||
}
|
}
|
||||||
|
@ -55,22 +65,21 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
|
||||||
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
|
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
return context.getDevice()
|
return context
|
||||||
|
.getDevice()
|
||||||
.getConfig("coap-token")
|
.getConfig("coap-token")
|
||||||
.switchIfEmpty(Mono.fromRunnable(() -> {
|
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
|
||||||
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
|
.flatMapMany(value -> {
|
||||||
}))
|
|
||||||
.flatMap(value -> {
|
|
||||||
String tk = value.asString();
|
String tk = value.asString();
|
||||||
if (!token.equals(tk)) {
|
if (!token.equals(tk)) {
|
||||||
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
|
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
return Mono
|
return TopicMessageCodec
|
||||||
.just(decode(path, JSON.parseObject(payload)).getMessage())
|
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
|
||||||
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
|
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
|
||||||
})
|
})
|
||||||
.doOnSuccess(msg -> {
|
.doOnComplete(() -> {
|
||||||
response.accept(CoAP.ResponseCode.CREATED);
|
response.accept(CoAP.ResponseCode.CREATED);
|
||||||
})
|
})
|
||||||
.doOnError(error -> {
|
.doOnError(error -> {
|
||||||
|
@ -83,7 +92,7 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
|
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
|
||||||
if (context.getMessage() instanceof CoapExchangeMessage) {
|
if (context.getMessage() instanceof CoapExchangeMessage) {
|
||||||
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
|
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
|
||||||
return decode(exchangeMessage, context, resp -> {
|
return decode(exchangeMessage, context, resp -> {
|
||||||
|
@ -101,12 +110,15 @@ public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCode
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return Mono.empty();
|
return Flux.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean verifySign(String secureKey, String deviceId, String payload, String sign) {
|
protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) {
|
||||||
//验证签名
|
//验证签名
|
||||||
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(payload.concat(secureKey)).equalsIgnoreCase(sign)) {
|
byte[] secureKeyBytes = secureKey.getBytes();
|
||||||
|
byte[] signPayload = Arrays.copyOf(payload, payload.length + secureKeyBytes.length);
|
||||||
|
System.arraycopy(secureKeyBytes, 0, signPayload, 0, secureKeyBytes.length);
|
||||||
|
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(signPayload).equalsIgnoreCase(sign)) {
|
||||||
log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
|
log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,53 +2,63 @@ package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.eclipse.californium.core.coap.CoAP;
|
import org.eclipse.californium.core.coap.CoAP;
|
||||||
|
import org.eclipse.californium.core.coap.OptionNumberRegistry;
|
||||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||||
import org.jetlinks.core.Value;
|
import org.jetlinks.core.Value;
|
||||||
import org.jetlinks.core.message.Message;
|
import org.jetlinks.core.message.DeviceMessage;
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
import org.jetlinks.protocol.official.cipher.Ciphers;
|
import org.jetlinks.protocol.official.cipher.Ciphers;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
|
public class JetLinksCoapDeviceMessageCodec implements DeviceMessageCodec {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Transport getSupportTransport() {
|
public Transport getSupportTransport() {
|
||||||
return DefaultTransport.CoAP;
|
return DefaultTransport.CoAP;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected JSONObject decode(String text) {
|
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context) {
|
||||||
return JSON.parseObject(text);
|
String path = message.getPath();
|
||||||
}
|
|
||||||
|
|
||||||
protected Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context) {
|
boolean cbor = message
|
||||||
String path = message.getPath();
|
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
|
||||||
|
.map(MediaType::valueOf)
|
||||||
|
.map(MediaType.APPLICATION_CBOR::includes)
|
||||||
|
.orElse(false);
|
||||||
|
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
|
||||||
return context
|
return context
|
||||||
.getDevice()
|
.getDevice()
|
||||||
.getConfigs("encAlg", "secureKey")
|
.getConfigs("encAlg", "secureKey")
|
||||||
.flatMap(configs -> {
|
.flatMapMany(configs -> {
|
||||||
Ciphers ciphers = configs.getValue("encAlg").map(Value::asString).flatMap(Ciphers::of).orElse(Ciphers.AES);
|
Ciphers ciphers = configs
|
||||||
|
.getValue("encAlg")
|
||||||
|
.map(Value::asString)
|
||||||
|
.flatMap(Ciphers::of)
|
||||||
|
.orElse(Ciphers.AES);
|
||||||
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
|
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
|
||||||
ByteBuf byteBuf = message.getPayload();
|
ByteBuf byteBuf = message.getPayload();
|
||||||
byte[] req = new byte[byteBuf.readableBytes()];
|
byte[] req = new byte[byteBuf.readableBytes()];
|
||||||
byteBuf.readBytes(req);
|
byteBuf.readBytes(req);
|
||||||
byteBuf.resetReaderIndex();
|
byteBuf.resetReaderIndex();
|
||||||
String payload = new String(ciphers.decrypt(req, secureKey));
|
byte[] payload = ciphers.decrypt(req, secureKey);
|
||||||
//解码
|
//解码
|
||||||
return Mono.just(decode(path, decode(payload)).getMessage());
|
return TopicMessageCodec.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Mono<? extends Message> decode(CoapExchangeMessage message, MessageDecodeContext context) {
|
protected Flux<DeviceMessage> decode(CoapExchangeMessage message, MessageDecodeContext context) {
|
||||||
CoapExchange exchange = message.getExchange();
|
CoapExchange exchange = message.getExchange();
|
||||||
return decode((CoapMessage) message, context)
|
return decode(((CoapMessage) message), context)
|
||||||
.doOnSuccess(msg -> {
|
.doOnComplete(() -> {
|
||||||
exchange.respond(CoAP.ResponseCode.CREATED);
|
exchange.respond(CoAP.ResponseCode.CREATED);
|
||||||
exchange.accept();
|
exchange.accept();
|
||||||
})
|
})
|
||||||
|
@ -63,8 +73,8 @@ public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec im
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
|
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
|
||||||
return Mono.defer(() -> {
|
return Flux.defer(() -> {
|
||||||
log.debug("handle coap message:\n{}", context.getMessage());
|
log.debug("handle coap message:\n{}", context.getMessage());
|
||||||
if (context.getMessage() instanceof CoapExchangeMessage) {
|
if (context.getMessage() instanceof CoapExchangeMessage) {
|
||||||
return decode(((CoapExchangeMessage) context.getMessage()), context);
|
return decode(((CoapExchangeMessage) context.getMessage()), context);
|
||||||
|
@ -73,7 +83,7 @@ public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec im
|
||||||
return decode(((CoapMessage) context.getMessage()), context);
|
return decode(((CoapMessage) context.getMessage()), context);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Mono.empty();
|
return Flux.empty();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,17 +1,18 @@
|
||||||
package org.jetlinks.protocol.official;
|
package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jetlinks.core.device.DeviceConfigKey;
|
import org.jetlinks.core.device.DeviceConfigKey;
|
||||||
import org.jetlinks.core.message.DeviceMessage;
|
import org.jetlinks.core.message.DeviceMessage;
|
||||||
|
import org.jetlinks.core.message.DisconnectDeviceMessage;
|
||||||
import org.jetlinks.core.message.Message;
|
import org.jetlinks.core.message.Message;
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
|
import org.jetlinks.core.server.session.DeviceSession;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <pre>
|
* <pre>
|
||||||
|
@ -49,10 +50,9 @@ import java.nio.charset.StandardCharsets;
|
||||||
* @since 1.0.0
|
* @since 1.0.0
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
|
public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
|
||||||
|
|
||||||
private Transport transport;
|
|
||||||
|
|
||||||
|
private final Transport transport;
|
||||||
|
|
||||||
public JetLinksMqttDeviceMessageCodec(Transport transport) {
|
public JetLinksMqttDeviceMessageCodec(Transport transport) {
|
||||||
this.transport = transport;
|
this.transport = transport;
|
||||||
|
@ -71,21 +71,33 @@ public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec im
|
||||||
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
|
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
|
||||||
return Mono.defer(() -> {
|
return Mono.defer(() -> {
|
||||||
Message message = context.getMessage();
|
Message message = context.getMessage();
|
||||||
|
|
||||||
|
if (message instanceof DisconnectDeviceMessage) {
|
||||||
|
return ((ToDeviceMessageContext) context)
|
||||||
|
.disconnect()
|
||||||
|
.then(Mono.empty());
|
||||||
|
}
|
||||||
|
|
||||||
if (message instanceof DeviceMessage) {
|
if (message instanceof DeviceMessage) {
|
||||||
DeviceMessage deviceMessage = ((DeviceMessage) message);
|
DeviceMessage deviceMessage = ((DeviceMessage) message);
|
||||||
|
|
||||||
EncodedTopic convertResult = encode(deviceMessage.getDeviceId(), deviceMessage);
|
TopicPayload convertResult = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, deviceMessage);
|
||||||
if (convertResult == null) {
|
if (convertResult == null) {
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
return context.getDevice()
|
return Mono
|
||||||
.getConfig(DeviceConfigKey.productId)
|
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
|
||||||
|
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
|
||||||
|
.flatMap(device -> device
|
||||||
|
.getConfig(DeviceConfigKey.productId))
|
||||||
|
)
|
||||||
.defaultIfEmpty("null")
|
.defaultIfEmpty("null")
|
||||||
.map(productId -> SimpleMqttMessage.builder()
|
.map(productId -> SimpleMqttMessage
|
||||||
|
.builder()
|
||||||
.clientId(deviceMessage.getDeviceId())
|
.clientId(deviceMessage.getDeviceId())
|
||||||
.topic("/" .concat(productId).concat(convertResult.topic))
|
.topic("/".concat(productId).concat(convertResult.getTopic()))
|
||||||
.payloadType(MessagePayloadType.JSON)
|
.payloadType(MessagePayloadType.JSON)
|
||||||
.payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(convertResult.payload)))
|
.payload(Unpooled.wrappedBuffer(convertResult.getPayload()))
|
||||||
.build());
|
.build());
|
||||||
} else {
|
} else {
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
|
@ -95,18 +107,14 @@ public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec im
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
@Override
|
@Override
|
||||||
public Mono<Message> decode(@Nonnull MessageDecodeContext context) {
|
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
|
||||||
return Mono.fromSupplier(() -> {
|
MqttMessage message = (MqttMessage) context.getMessage();
|
||||||
MqttMessage message = (MqttMessage) context.getMessage();
|
|
||||||
String topic = message.getTopic();
|
byte[] payload = message.payloadAsBytes();
|
||||||
String jsonData = message.getPayload().toString(StandardCharsets.UTF_8);
|
|
||||||
|
return TopicMessageCodec
|
||||||
|
.decode(ObjectMappers.JSON_MAPPER, TopicMessageCodec.removeProductPath(message.getTopic()), payload);
|
||||||
|
|
||||||
JSONObject object = JSON.parseObject(jsonData, JSONObject.class);
|
|
||||||
if (object == null) {
|
|
||||||
throw new UnsupportedOperationException("cannot parse payload:{}" + jsonData);
|
|
||||||
}
|
|
||||||
return decode(topic, object).getMessage();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,9 +46,9 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
|
||||||
return Mono.defer(() -> {
|
return Mono.defer(() -> {
|
||||||
CompositeProtocolSupport support = new CompositeProtocolSupport();
|
CompositeProtocolSupport support = new CompositeProtocolSupport();
|
||||||
|
|
||||||
support.setId("jetlinks.v1.0");
|
support.setId("jetlinks.v2.0");
|
||||||
support.setName("JetLinks V1.0");
|
support.setName("JetLinks V2.0");
|
||||||
support.setDescription("JetLinks Protocol Version 1.0");
|
support.setDescription("JetLinks Protocol Version 2.0");
|
||||||
|
|
||||||
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
|
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
|
||||||
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
|
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
|
||||||
|
|
|
@ -1,290 +0,0 @@
|
||||||
package org.jetlinks.protocol.official;
|
|
||||||
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import lombok.AllArgsConstructor;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
import org.jetlinks.core.message.*;
|
|
||||||
import org.jetlinks.core.message.event.EventMessage;
|
|
||||||
import org.jetlinks.core.message.firmware.*;
|
|
||||||
import org.jetlinks.core.message.function.FunctionInvokeMessage;
|
|
||||||
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
|
|
||||||
import org.jetlinks.core.message.property.*;
|
|
||||||
import org.jetlinks.core.utils.TopicUtils;
|
|
||||||
import org.jetlinks.supports.utils.MqttTopicUtils;
|
|
||||||
import org.springframework.util.Assert;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
class JetlinksTopicMessageCodec {
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
protected class DecodeResult {
|
|
||||||
private Map<String, String> args;
|
|
||||||
|
|
||||||
private boolean child;
|
|
||||||
|
|
||||||
private boolean event;
|
|
||||||
private boolean readPropertyReply;
|
|
||||||
private boolean writePropertyReply;
|
|
||||||
private boolean functionInvokeReply;
|
|
||||||
private boolean reportProperties;
|
|
||||||
private boolean derivedMetadata;
|
|
||||||
private boolean register;
|
|
||||||
private boolean unregister;
|
|
||||||
|
|
||||||
private boolean requestFirmware;
|
|
||||||
private boolean reportFirmware;
|
|
||||||
private boolean upgradeFirmwareProgress;
|
|
||||||
private boolean readFirmwareReply;
|
|
||||||
|
|
||||||
|
|
||||||
public DecodeResult(String topic) {
|
|
||||||
this.topic = topic;
|
|
||||||
args = TopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
|
|
||||||
if (topic.contains("child")) {
|
|
||||||
child = true;
|
|
||||||
args.putAll(TopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
|
|
||||||
}
|
|
||||||
if (topic.contains("event")) {
|
|
||||||
event = true;
|
|
||||||
args.putAll(TopicUtils.getPathVariables("/**/event/{eventId}", topic));
|
|
||||||
}
|
|
||||||
derivedMetadata = topic.endsWith("metadata/derived");
|
|
||||||
if (event) {
|
|
||||||
} else if (reportProperties = topic.endsWith("properties/report")) {
|
|
||||||
} else if (unregister = topic.endsWith("unregister")) {
|
|
||||||
} else if (register = topic.endsWith("register")) {
|
|
||||||
} else if (readPropertyReply = topic.endsWith("properties/read/reply")) {
|
|
||||||
} else if (writePropertyReply = topic.endsWith("properties/write/reply")) {
|
|
||||||
} else if (functionInvokeReply = topic.endsWith("function/invoke/reply")) {
|
|
||||||
} else if (upgradeFirmwareProgress = topic.endsWith("firmware/upgrade/progress")) {
|
|
||||||
} else if (requestFirmware = topic.endsWith("firmware/pull")) {
|
|
||||||
} else if (reportFirmware = topic.endsWith("firmware/report")) {
|
|
||||||
} else if (readFirmwareReply = topic.endsWith("firmware/read/reply")) {
|
|
||||||
} else if (derivedMetadata = topic.endsWith("metadata/derived")) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private final String topic;
|
|
||||||
|
|
||||||
public String getDeviceId() {
|
|
||||||
return args.get("deviceId");
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getChildDeviceId() {
|
|
||||||
return args.get("childDeviceId");
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Message message;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected EncodedTopic encode(String deviceId, Message message) {
|
|
||||||
|
|
||||||
Assert.hasText(deviceId, "deviceId can not be null");
|
|
||||||
Assert.notNull(message, "message can not be null");
|
|
||||||
|
|
||||||
if (message instanceof ReadPropertyMessage) {
|
|
||||||
String topic = "/" .concat(deviceId).concat("/properties/read");
|
|
||||||
JSONObject mqttData = new JSONObject();
|
|
||||||
mqttData.put("messageId", message.getMessageId());
|
|
||||||
mqttData.put("properties", ((ReadPropertyMessage) message).getProperties());
|
|
||||||
mqttData.put("deviceId", deviceId);
|
|
||||||
|
|
||||||
return new EncodedTopic(topic, mqttData);
|
|
||||||
} else if (message instanceof WritePropertyMessage) {
|
|
||||||
String topic = "/" .concat(deviceId).concat("/properties/write");
|
|
||||||
JSONObject mqttData = new JSONObject();
|
|
||||||
mqttData.put("messageId", message.getMessageId());
|
|
||||||
mqttData.put("properties", ((WritePropertyMessage) message).getProperties());
|
|
||||||
mqttData.put("deviceId", deviceId);
|
|
||||||
|
|
||||||
return new EncodedTopic(topic, mqttData);
|
|
||||||
} else if (message instanceof FunctionInvokeMessage) {
|
|
||||||
String topic = "/" .concat(deviceId).concat("/function/invoke");
|
|
||||||
FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message);
|
|
||||||
JSONObject mqttData = new JSONObject();
|
|
||||||
mqttData.put("messageId", message.getMessageId());
|
|
||||||
mqttData.put("function", invokeMessage.getFunctionId());
|
|
||||||
mqttData.put("inputs", invokeMessage.getInputs());
|
|
||||||
mqttData.put("deviceId", deviceId);
|
|
||||||
|
|
||||||
return new EncodedTopic(topic, mqttData);
|
|
||||||
} else if (message instanceof UpgradeFirmwareMessage) {
|
|
||||||
String topic = "/" .concat(deviceId).concat("/firmware/upgrade");
|
|
||||||
UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message);
|
|
||||||
JSONObject mqttData = new JSONObject();
|
|
||||||
mqttData.put("messageId", message.getMessageId());
|
|
||||||
mqttData.put("url", firmwareMessage.getUrl());
|
|
||||||
mqttData.put("sign", firmwareMessage.getSign());
|
|
||||||
mqttData.put("version", firmwareMessage.getVersion());
|
|
||||||
mqttData.put("signMethod", firmwareMessage.getSignMethod());
|
|
||||||
mqttData.put("parameters", firmwareMessage.getParameters());
|
|
||||||
mqttData.put("deviceId", deviceId);
|
|
||||||
|
|
||||||
return new EncodedTopic(topic, mqttData);
|
|
||||||
} else if (message instanceof ReadFirmwareMessage) {
|
|
||||||
String topic = "/" .concat(deviceId).concat("/firmware/read");
|
|
||||||
JSONObject mqttData = new JSONObject();
|
|
||||||
mqttData.put("messageId", message.getMessageId());
|
|
||||||
mqttData.put("deviceId", deviceId);
|
|
||||||
return new EncodedTopic(topic, mqttData);
|
|
||||||
} else if (message instanceof RequestFirmwareMessageReply) {
|
|
||||||
String topic = "/" .concat(deviceId).concat("/firmware/pull/reply");
|
|
||||||
RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message);
|
|
||||||
JSONObject mqttData = new JSONObject();
|
|
||||||
mqttData.put("messageId", message.getMessageId());
|
|
||||||
mqttData.put("url", firmwareMessage.getUrl());
|
|
||||||
mqttData.put("sign", firmwareMessage.getSign());
|
|
||||||
mqttData.put("version", firmwareMessage.getVersion());
|
|
||||||
mqttData.put("signMethod", firmwareMessage.getSignMethod());
|
|
||||||
mqttData.put("parameters", firmwareMessage.getParameters());
|
|
||||||
mqttData.put("deviceId", deviceId);
|
|
||||||
return new EncodedTopic(topic, mqttData);
|
|
||||||
} else if (message instanceof ChildDeviceMessage) {
|
|
||||||
ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message);
|
|
||||||
EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
|
|
||||||
String topic = "/" .concat(deviceId).concat("/child").concat(result.topic);
|
|
||||||
result.payload.put("deviceId", childDeviceMessage.getChildDeviceId());
|
|
||||||
|
|
||||||
return new EncodedTopic(topic, result.payload);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected DecodeResult decode(String topic, JSONObject object) {
|
|
||||||
DecodeResult result = new DecodeResult(topic);
|
|
||||||
Message message = null;
|
|
||||||
if (result.isEvent()) {
|
|
||||||
message = decodeEvent(result, object);
|
|
||||||
} else if (result.isReportProperties()) {
|
|
||||||
message = decodeReportPropertyReply(result, object);
|
|
||||||
} else if (result.isReadPropertyReply()) {
|
|
||||||
message = decodeReadPropertyReply(result, object);
|
|
||||||
} else if (result.isWritePropertyReply()) {
|
|
||||||
message = decodeWritePropertyReply(result, object);
|
|
||||||
} else if (result.isFunctionInvokeReply()) {
|
|
||||||
message = decodeInvokeReply(result, object);
|
|
||||||
} else if (result.isRegister()) {
|
|
||||||
message = decodeRegister(result, object);
|
|
||||||
} else if (result.isUnregister()) {
|
|
||||||
message = decodeUnregister(result, object);
|
|
||||||
} else if (result.isDerivedMetadata()) {
|
|
||||||
message = decodeDerivedMetadata(result, object);
|
|
||||||
} else if (result.isReadFirmwareReply()) {
|
|
||||||
message = object.toJavaObject(ReadFirmwareMessageReply.class);
|
|
||||||
} else if (result.isRequestFirmware()) {
|
|
||||||
message = object.toJavaObject(RequestFirmwareMessage.class);
|
|
||||||
} else if (result.isReportFirmware()) {
|
|
||||||
message = object.toJavaObject(ReportFirmwareMessage.class);
|
|
||||||
} else if (result.isUpgradeFirmwareProgress()) {
|
|
||||||
message = object.toJavaObject(UpgradeFirmwareProgressMessage.class);
|
|
||||||
}else if (topic.endsWith("connected")) {
|
|
||||||
message = object.toJavaObject(DeviceOnlineMessage.class);
|
|
||||||
} else if (topic.endsWith("disconnect")) {
|
|
||||||
message = object.toJavaObject(DeviceOfflineMessage.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result.isChild()) {
|
|
||||||
if (message == null) {
|
|
||||||
throw new UnsupportedOperationException("unsupported topic:" + topic);
|
|
||||||
}
|
|
||||||
applyCommons(message, result, object);
|
|
||||||
ChildDeviceMessage children = new ChildDeviceMessage();
|
|
||||||
children.setChildDeviceId(result.getChildDeviceId());
|
|
||||||
children.setDeviceId(result.getDeviceId());
|
|
||||||
children.setChildDeviceMessage(message);
|
|
||||||
children.setTimestamp(Optional.ofNullable(object.getLong("timestamp")).orElse(System.currentTimeMillis()));
|
|
||||||
Optional.ofNullable(object.getString("messageId")).ifPresent(children::setMessageId);
|
|
||||||
result.message = children;
|
|
||||||
} else {
|
|
||||||
if (message == null) {
|
|
||||||
throw new UnsupportedOperationException("unsupported topic:" + topic);
|
|
||||||
}
|
|
||||||
applyCommons(message, result, object);
|
|
||||||
result.message = message;
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private Message decodeEvent(DecodeResult result, JSONObject event) {
|
|
||||||
EventMessage message = event.toJavaObject(EventMessage.class);
|
|
||||||
message.setData(event.get("data"));
|
|
||||||
message.setEvent(result.args.get("eventId"));
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message decodeReadPropertyReply(DecodeResult result, JSONObject data) {
|
|
||||||
|
|
||||||
return data.toJavaObject(ReadPropertyMessageReply.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private Message decodeReportPropertyReply(DecodeResult result, JSONObject data) {
|
|
||||||
|
|
||||||
return data.toJavaObject(ReportPropertyMessage.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private Message decodeWritePropertyReply(DecodeResult result, JSONObject data) {
|
|
||||||
|
|
||||||
return data.toJavaObject(WritePropertyMessageReply.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message decodeInvokeReply(DecodeResult result, JSONObject data) {
|
|
||||||
return data.toJavaObject(FunctionInvokeMessageReply.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message decodeRegister(DecodeResult result, JSONObject data) {
|
|
||||||
return data.toJavaObject(DeviceRegisterMessage.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message decodeUnregister(DecodeResult result, JSONObject data) {
|
|
||||||
return data.toJavaObject(DeviceUnRegisterMessage.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message decodeDerivedMetadata(DecodeResult result, JSONObject data) {
|
|
||||||
return data.toJavaObject(DerivedMetadataMessage.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void applyCommons(Message message, DecodeResult result, JSONObject data) {
|
|
||||||
if (message instanceof CommonDeviceMessageReply) {
|
|
||||||
CommonDeviceMessageReply reply = ((CommonDeviceMessageReply) message);
|
|
||||||
reply.setSuccess(Optional.ofNullable(data.getBoolean("success")).orElse(true));
|
|
||||||
reply.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
|
|
||||||
if (result.isChild()) {
|
|
||||||
reply.setDeviceId(result.getChildDeviceId());
|
|
||||||
} else {
|
|
||||||
reply.setDeviceId(result.getDeviceId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (message instanceof CommonDeviceMessage) {
|
|
||||||
CommonDeviceMessage msg = ((CommonDeviceMessage) message);
|
|
||||||
msg.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
|
|
||||||
if (result.isChild()) {
|
|
||||||
msg.setDeviceId(result.getChildDeviceId());
|
|
||||||
} else {
|
|
||||||
msg.setDeviceId(result.getDeviceId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
@AllArgsConstructor
|
|
||||||
protected class EncodedTopic {
|
|
||||||
String topic;
|
|
||||||
|
|
||||||
JSONObject payload;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
protected class Decoded {
|
|
||||||
Message message;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||||
|
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
|
||||||
|
|
||||||
|
public class ObjectMappers {
|
||||||
|
|
||||||
|
public static final ObjectMapper JSON_MAPPER;
|
||||||
|
public static final ObjectMapper CBOR_MAPPER;
|
||||||
|
|
||||||
|
static {
|
||||||
|
JSON_MAPPER = Jackson2ObjectMapperBuilder
|
||||||
|
.json()
|
||||||
|
.build()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||||
|
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
|
||||||
|
;
|
||||||
|
CBOR_MAPPER = Jackson2ObjectMapperBuilder
|
||||||
|
.cbor()
|
||||||
|
.build()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||||
|
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,243 @@
|
||||||
|
package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import org.hswebframework.web.bean.FastBeanCopier;
|
||||||
|
import org.jetlinks.core.message.*;
|
||||||
|
import org.jetlinks.core.message.event.EventMessage;
|
||||||
|
import org.jetlinks.core.message.firmware.*;
|
||||||
|
import org.jetlinks.core.message.function.FunctionInvokeMessage;
|
||||||
|
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
|
||||||
|
import org.jetlinks.core.message.property.*;
|
||||||
|
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
|
||||||
|
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
|
||||||
|
import org.jetlinks.core.utils.TopicUtils;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public enum TopicMessageCodec {
|
||||||
|
//上报属性数据
|
||||||
|
reportProperty("/*/properties/report", ReportPropertyMessage.class),
|
||||||
|
//事件上报
|
||||||
|
event("/*/event/*", EventMessage.class),
|
||||||
|
//读取属性
|
||||||
|
readProperty("/*/properties/read", ReadPropertyMessage.class),
|
||||||
|
//读取属性回复
|
||||||
|
readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class),
|
||||||
|
//修改属性
|
||||||
|
writeProperty("/*/properties/write", WritePropertyMessage.class),
|
||||||
|
//修改属性回复
|
||||||
|
writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class),
|
||||||
|
//调用功能
|
||||||
|
functionInvoke("/*/function/invoke", FunctionInvokeMessage.class),
|
||||||
|
//调用功能回复
|
||||||
|
functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class),
|
||||||
|
//子设备消息
|
||||||
|
child("/*/child/*/**", ChildDeviceMessage.class) {
|
||||||
|
@Override
|
||||||
|
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
|
||||||
|
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
|
||||||
|
_topic[0] = "";// topic以/开头所有第一位是空白
|
||||||
|
return TopicMessageCodec
|
||||||
|
.decode(mapper, _topic, payload)
|
||||||
|
.map(childMsg -> {
|
||||||
|
ChildDeviceMessage msg = new ChildDeviceMessage();
|
||||||
|
msg.setDeviceId(topic[1]);
|
||||||
|
msg.setChildDeviceMessage(childMsg);
|
||||||
|
msg.setTimestamp(childMsg.getTimestamp());
|
||||||
|
msg.setMessageId(childMsg.getMessageId());
|
||||||
|
return msg;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
|
||||||
|
ChildDeviceMessage deviceMessage = ((ChildDeviceMessage) message);
|
||||||
|
|
||||||
|
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
|
||||||
|
|
||||||
|
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
|
||||||
|
String[] childTopic = payload.getTopic().split("/");
|
||||||
|
String[] topic = new String[topics.length + childTopic.length - 3];
|
||||||
|
//合并topic
|
||||||
|
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
|
||||||
|
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
|
||||||
|
|
||||||
|
refactorTopic(topic, message);
|
||||||
|
payload.setTopic(String.join("/", topic));
|
||||||
|
return payload;
|
||||||
|
|
||||||
|
}
|
||||||
|
}, //子设备消息回复
|
||||||
|
childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) {
|
||||||
|
@Override
|
||||||
|
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
|
||||||
|
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
|
||||||
|
_topic[0] = "";// topic以/开头所有第一位是空白
|
||||||
|
return TopicMessageCodec
|
||||||
|
.decode(mapper, _topic, payload)
|
||||||
|
.map(childMsg -> {
|
||||||
|
ChildDeviceMessageReply msg = new ChildDeviceMessageReply();
|
||||||
|
msg.setDeviceId(topic[1]);
|
||||||
|
msg.setChildDeviceMessage(childMsg);
|
||||||
|
msg.setTimestamp(childMsg.getTimestamp());
|
||||||
|
msg.setMessageId(childMsg.getMessageId());
|
||||||
|
return msg;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
|
||||||
|
ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message);
|
||||||
|
|
||||||
|
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
|
||||||
|
|
||||||
|
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
|
||||||
|
String[] childTopic = payload.getTopic().split("/");
|
||||||
|
String[] topic = new String[topics.length + childTopic.length - 3];
|
||||||
|
//合并topic
|
||||||
|
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
|
||||||
|
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
|
||||||
|
|
||||||
|
refactorTopic(topic, message);
|
||||||
|
payload.setTopic(String.join("/", topic));
|
||||||
|
return payload;
|
||||||
|
|
||||||
|
}
|
||||||
|
},
|
||||||
|
//更新标签
|
||||||
|
updateTag("/*/tags", UpdateTagMessage.class),
|
||||||
|
//注册
|
||||||
|
register("/*/register", DeviceRegisterMessage.class),
|
||||||
|
//注销
|
||||||
|
unregister("/*/unregister", DeviceUnRegisterMessage.class),
|
||||||
|
//更新固件消息
|
||||||
|
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
|
||||||
|
//更新固件升级进度消息
|
||||||
|
upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
|
||||||
|
//拉取固件
|
||||||
|
requestFirmware("/*/firmware/pull", RequestFirmwareMessage.class),
|
||||||
|
//拉取固件更新回复
|
||||||
|
requestFirmwareReply("/*/firmware/pull/reply", RequestFirmwareMessageReply.class),
|
||||||
|
//上报固件版本
|
||||||
|
reportFirmware("/*/firmware/report", ReportFirmwareMessage.class),
|
||||||
|
//读取固件回复
|
||||||
|
readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class),
|
||||||
|
//派生物模型上报
|
||||||
|
derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class),
|
||||||
|
//透传设备消息
|
||||||
|
direct("/*/direct", DirectDeviceMessage.class) {
|
||||||
|
@Override
|
||||||
|
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
|
||||||
|
DirectDeviceMessage message = new DirectDeviceMessage();
|
||||||
|
message.setDeviceId(topic[1]);
|
||||||
|
message.setPayload(payload);
|
||||||
|
return Mono.just(message);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
//断开连接消息
|
||||||
|
disconnect("/*/disconnect", DisconnectDeviceMessage.class),
|
||||||
|
//断开连接回复
|
||||||
|
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
|
||||||
|
//上线
|
||||||
|
connect("/*/online", DeviceOnlineMessage.class),
|
||||||
|
//离线
|
||||||
|
offline("/*/offline", DeviceOfflineMessage.class),
|
||||||
|
//日志
|
||||||
|
log("/*/log", DeviceLogMessage.class),
|
||||||
|
//状态检查
|
||||||
|
stateCheck("/*/state-check", DeviceStateCheckMessage.class),
|
||||||
|
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
|
||||||
|
;
|
||||||
|
|
||||||
|
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {
|
||||||
|
this.pattern = topic.split("/");
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
private final String[] pattern;
|
||||||
|
private final Class<? extends DeviceMessage> type;
|
||||||
|
|
||||||
|
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
|
||||||
|
return Mono
|
||||||
|
.justOrEmpty(fromTopic(topics))
|
||||||
|
.flatMapMany(topicMessageCodec -> topicMessageCodec.doDecode(mapper, topics, payload));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String topic, byte[] payload) {
|
||||||
|
return decode(mapper, topic.split("/"), payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TopicPayload encode(ObjectMapper mapper, DeviceMessage message) {
|
||||||
|
|
||||||
|
return fromMessage(message)
|
||||||
|
.orElseThrow(() -> new UnsupportedOperationException("unsupported message:" + message.getMessageType()))
|
||||||
|
.doEncode(mapper, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Optional<TopicMessageCodec> fromTopic(String[] topic) {
|
||||||
|
for (TopicMessageCodec value : values()) {
|
||||||
|
if (TopicUtils.match(value.pattern, topic)) {
|
||||||
|
return Optional.of(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
static Optional<TopicMessageCodec> fromMessage(DeviceMessage message) {
|
||||||
|
for (TopicMessageCodec value : values()) {
|
||||||
|
if (value.type == message.getClass()) {
|
||||||
|
return Optional.of(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
|
||||||
|
return Mono
|
||||||
|
.fromCallable(() -> {
|
||||||
|
DeviceMessage message = mapper.readValue(payload, type);
|
||||||
|
FastBeanCopier.copy(Collections.singletonMap("deviceId", topic[1]), message);
|
||||||
|
|
||||||
|
return message;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
|
||||||
|
refactorTopic(topics, message);
|
||||||
|
return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) {
|
||||||
|
String[] topics = Arrays.copyOf(pattern, pattern.length);
|
||||||
|
return doEncode(mapper, topics, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
void refactorTopic(String[] topics, DeviceMessage message) {
|
||||||
|
topics[1] = message.getDeviceId();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 移除topic中的产品信息,topic第一个层为产品ID,在解码时,不需要此信息,所以需要移除之.
|
||||||
|
*
|
||||||
|
* @param topic topic
|
||||||
|
* @return 移除后的topic
|
||||||
|
*/
|
||||||
|
public static String[] removeProductPath(String topic) {
|
||||||
|
if (!topic.startsWith("/")) {
|
||||||
|
topic = "/" + topic;
|
||||||
|
}
|
||||||
|
String[] topicArr = topic.split("/");
|
||||||
|
String[] topics = Arrays.copyOfRange(topicArr, 1, topicArr.length);
|
||||||
|
topics[0] = "";
|
||||||
|
return topics;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@AllArgsConstructor(staticName = "of") // FIXME 使用对象池,节省一丢丢内存?
|
||||||
|
public class TopicPayload {
|
||||||
|
|
||||||
|
private String topic;
|
||||||
|
|
||||||
|
private byte[] payload;
|
||||||
|
}
|
|
@ -1 +0,0 @@
|
||||||
org.jetlinks.protocol.official.JetLinksProtocolSupportProvider
|
|
|
@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
public class JetLinksCoapDeviceMessageCodecTest {
|
public class JetLinksCoapDeviceMessageCodecTest {
|
||||||
|
|
||||||
|
|
||||||
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
|
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
|
||||||
|
|
||||||
DeviceOperator device;
|
DeviceOperator device;
|
||||||
|
|
||||||
|
@ -39,10 +39,11 @@ public class JetLinksCoapDeviceMessageCodecTest {
|
||||||
@Before
|
@Before
|
||||||
public void init() {
|
public void init() {
|
||||||
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
|
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
|
||||||
device = registry.register(DeviceInfo.builder()
|
device = registry
|
||||||
.id("test")
|
.register(DeviceInfo.builder()
|
||||||
.protocol("jetlinks")
|
.id("test")
|
||||||
.build())
|
.protocol("jetlinks")
|
||||||
|
.build())
|
||||||
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
|
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
|
||||||
.block();
|
.block();
|
||||||
}
|
}
|
||||||
|
@ -59,19 +60,20 @@ public class JetLinksCoapDeviceMessageCodecTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handlePOST(CoapExchange exchange) {
|
public void handlePOST(CoapExchange exchange) {
|
||||||
codec.decode(new MessageDecodeContext() {
|
codec
|
||||||
@Nonnull
|
.decode(new MessageDecodeContext() {
|
||||||
@Override
|
@Nonnull
|
||||||
public EncodedMessage getMessage() {
|
@Override
|
||||||
return new CoapExchangeMessage(exchange);
|
public EncodedMessage getMessage() {
|
||||||
}
|
return new CoapExchangeMessage(exchange);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DeviceOperator getDevice() {
|
public DeviceOperator getDevice() {
|
||||||
return device;
|
return device;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.doOnSuccess(messageRef::set)
|
.doOnNext(messageRef::set)
|
||||||
.doOnError(Throwable::printStackTrace)
|
.doOnError(Throwable::printStackTrace)
|
||||||
.subscribe();
|
.subscribe();
|
||||||
}
|
}
|
||||||
|
@ -85,7 +87,7 @@ public class JetLinksCoapDeviceMessageCodecTest {
|
||||||
};
|
};
|
||||||
|
|
||||||
Endpoint endpoint = new CoapEndpoint.Builder()
|
Endpoint endpoint = new CoapEndpoint.Builder()
|
||||||
.setPort(12345).build();
|
.setPort(12341).build();
|
||||||
server.addEndpoint(endpoint);
|
server.addEndpoint(endpoint);
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
|
@ -96,8 +98,8 @@ public class JetLinksCoapDeviceMessageCodecTest {
|
||||||
Request request = Request.newPost();
|
Request request = Request.newPost();
|
||||||
String payload = "{\"data\":1}";
|
String payload = "{\"data\":1}";
|
||||||
|
|
||||||
request.setURI("coap://localhost:12345/test/test/event/event1");
|
request.setURI("coap://localhost:12341/test/test/event/event1");
|
||||||
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(),key));
|
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
|
||||||
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
|
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
|
||||||
|
|
||||||
CoapResponse response = coapClient.advanced(request);
|
CoapResponse response = coapClient.advanced(request);
|
||||||
|
|
|
@ -7,7 +7,6 @@ import org.jetlinks.core.device.DeviceOperator;
|
||||||
import org.jetlinks.core.device.ProductInfo;
|
import org.jetlinks.core.device.ProductInfo;
|
||||||
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
|
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
|
||||||
import org.jetlinks.core.message.ChildDeviceMessage;
|
import org.jetlinks.core.message.ChildDeviceMessage;
|
||||||
import org.jetlinks.core.message.ChildDeviceMessageReply;
|
|
||||||
import org.jetlinks.core.message.DerivedMetadataMessage;
|
import org.jetlinks.core.message.DerivedMetadataMessage;
|
||||||
import org.jetlinks.core.message.Message;
|
import org.jetlinks.core.message.Message;
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
|
@ -16,7 +15,6 @@ import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
|
||||||
import org.jetlinks.core.message.function.FunctionInvokeMessage;
|
import org.jetlinks.core.message.function.FunctionInvokeMessage;
|
||||||
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
|
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
|
||||||
import org.jetlinks.core.message.property.*;
|
import org.jetlinks.core.message.property.*;
|
||||||
import org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -28,7 +26,7 @@ import java.util.Collections;
|
||||||
|
|
||||||
public class JetLinksMqttDeviceMessageCodecTest {
|
public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
|
|
||||||
org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
|
JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
|
||||||
|
|
||||||
|
|
||||||
TestDeviceRegistry registry;
|
TestDeviceRegistry registry;
|
||||||
|
@ -87,7 +85,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/properties/read/reply")
|
.topic("/product1/device1/properties/read/reply")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
|
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
|
||||||
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
|
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
|
||||||
|
@ -103,12 +101,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/child/test/properties/read/reply")
|
.topic("/product1/device1/child/test/properties/read/reply")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
|
Assert.assertTrue(message instanceof ChildDeviceMessage);
|
||||||
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
|
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
|
||||||
|
|
||||||
Assert.assertTrue(childReply.isSuccess());
|
|
||||||
Assert.assertEquals(childReply.getDeviceId(),"device1");
|
Assert.assertEquals(childReply.getDeviceId(),"device1");
|
||||||
Assert.assertEquals(childReply.getMessageId(),"test");
|
Assert.assertEquals(childReply.getMessageId(),"test");
|
||||||
|
|
||||||
|
@ -151,7 +148,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
|
|
||||||
|
|
||||||
Assert.assertNotNull(encodedMessage);
|
Assert.assertNotNull(encodedMessage);
|
||||||
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/write");
|
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/device1/properties/write");
|
||||||
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
|
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +157,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/properties/write/reply")
|
.topic("/product1/device1/properties/write/reply")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof WritePropertyMessageReply);
|
Assert.assertTrue(message instanceof WritePropertyMessageReply);
|
||||||
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
|
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
|
||||||
|
@ -178,12 +175,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/child/test/properties/write/reply")
|
.topic("/product1/device1/child/test/properties/write/reply")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
|
Assert.assertTrue(message instanceof ChildDeviceMessage);
|
||||||
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
|
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
|
||||||
|
|
||||||
Assert.assertTrue(childReply.isSuccess());
|
|
||||||
Assert.assertEquals(childReply.getDeviceId(),"device1");
|
Assert.assertEquals(childReply.getDeviceId(),"device1");
|
||||||
Assert.assertEquals(childReply.getMessageId(),"test");
|
Assert.assertEquals(childReply.getMessageId(),"test");
|
||||||
|
|
||||||
|
@ -233,7 +229,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/function/invoke/reply")
|
.topic("/product1/device1/function/invoke/reply")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
|
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
|
||||||
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
|
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
|
||||||
|
@ -249,12 +245,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/child/test/function/invoke/reply")
|
.topic("/product1/device1/child/test/function/invoke/reply")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
|
Assert.assertTrue(message instanceof ChildDeviceMessage);
|
||||||
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
|
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
|
||||||
|
|
||||||
Assert.assertTrue(childReply.isSuccess());
|
|
||||||
Assert.assertEquals(childReply.getDeviceId(),"device1");
|
Assert.assertEquals(childReply.getDeviceId(),"device1");
|
||||||
Assert.assertEquals(childReply.getMessageId(),"test");
|
Assert.assertEquals(childReply.getMessageId(),"test");
|
||||||
|
|
||||||
|
@ -271,7 +266,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/event/temp")
|
.topic("/product1/device1/event/temp")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof EventMessage);
|
Assert.assertTrue(message instanceof EventMessage);
|
||||||
EventMessage reply = ((EventMessage) message);
|
EventMessage reply = ((EventMessage) message);
|
||||||
|
@ -286,11 +281,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/child/test/event/temp")
|
.topic("/product1/device1/child/test/event/temp")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
|
Assert.assertTrue(message instanceof ChildDeviceMessage);
|
||||||
|
|
||||||
EventMessage reply = ((EventMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
|
EventMessage reply = ((EventMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
|
||||||
Assert.assertEquals(reply.getDeviceId(), "test");
|
Assert.assertEquals(reply.getDeviceId(), "test");
|
||||||
Assert.assertEquals(reply.getMessageId(), "test");
|
Assert.assertEquals(reply.getMessageId(), "test");
|
||||||
Assert.assertEquals(reply.getData(), 100);
|
Assert.assertEquals(reply.getData(), 100);
|
||||||
|
@ -302,7 +297,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/properties/report")
|
.topic("/product1/device1/properties/report")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ReportPropertyMessage);
|
Assert.assertTrue(message instanceof ReportPropertyMessage);
|
||||||
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
|
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
|
||||||
|
@ -318,11 +313,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/child/test/properties/report")
|
.topic("/product1/device1/child/test/properties/report")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
|
Assert.assertTrue(message instanceof ChildDeviceMessage);
|
||||||
|
|
||||||
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
|
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
|
||||||
Assert.assertEquals(reply.getDeviceId(), "test");
|
Assert.assertEquals(reply.getDeviceId(), "test");
|
||||||
Assert.assertEquals(reply.getMessageId(), "test");
|
Assert.assertEquals(reply.getMessageId(), "test");
|
||||||
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
|
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
|
||||||
|
@ -335,7 +330,7 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/metadata/derived")
|
.topic("/product1/device1/metadata/derived")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof DerivedMetadataMessage);
|
Assert.assertTrue(message instanceof DerivedMetadataMessage);
|
||||||
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
|
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
|
||||||
|
@ -350,11 +345,11 @@ public class JetLinksMqttDeviceMessageCodecTest {
|
||||||
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
|
||||||
.topic("/product1/device1/child/test/metadata/derived")
|
.topic("/product1/device1/child/test/metadata/derived")
|
||||||
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
|
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
|
||||||
.build())).block();
|
.build())).blockFirst();
|
||||||
|
|
||||||
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
|
Assert.assertTrue(message instanceof ChildDeviceMessage);
|
||||||
|
|
||||||
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
|
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
|
||||||
Assert.assertEquals(reply.getDeviceId(), "test");
|
Assert.assertEquals(reply.getDeviceId(), "test");
|
||||||
Assert.assertEquals(reply.getMessageId(), "test");
|
Assert.assertEquals(reply.getMessageId(), "test");
|
||||||
Assert.assertEquals(reply.getMetadata(), "1");
|
Assert.assertEquals(reply.getMetadata(), "1");
|
||||||
|
|
|
@ -1,24 +0,0 @@
|
||||||
package org.jetlinks.protocol.official;
|
|
||||||
|
|
||||||
import org.jetlinks.core.message.property.ReadPropertyMessage;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
public class JetlinksTopicMessageCodecTest {
|
|
||||||
|
|
||||||
JetlinksTopicMessageCodec codec = new JetlinksTopicMessageCodec();
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadProperty() {
|
|
||||||
|
|
||||||
ReadPropertyMessage readProperty = new ReadPropertyMessage();
|
|
||||||
readProperty.setProperties(Arrays.asList("name"));
|
|
||||||
readProperty.setMessageId("test");
|
|
||||||
JetlinksTopicMessageCodec.EncodedTopic topic = codec.encode("test", readProperty);
|
|
||||||
Assert.assertEquals(topic.getTopic(),"/test/properties/read");
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
package org.jetlinks.protocol.official;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.jetlinks.core.message.ChildDeviceMessage;
|
||||||
|
import org.jetlinks.core.message.property.ReportPropertyMessage;
|
||||||
|
import org.junit.Test;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class TopicMessageCodecTest {
|
||||||
|
|
||||||
|
|
||||||
|
public void testChild(ObjectMapper objectMapper) {
|
||||||
|
ChildDeviceMessage message = new ChildDeviceMessage();
|
||||||
|
message.setDeviceId("test");
|
||||||
|
ReportPropertyMessage msg = new ReportPropertyMessage();
|
||||||
|
msg.setDeviceId("childId");
|
||||||
|
message.setChildDeviceMessage(msg);
|
||||||
|
message.setTimestamp(msg.getTimestamp());
|
||||||
|
|
||||||
|
|
||||||
|
TopicPayload payload = TopicMessageCodec.child.doEncode(objectMapper, message);
|
||||||
|
System.out.println(payload.getPayload().length);
|
||||||
|
assertEquals("/test/child/childId/properties/report", payload.getTopic());
|
||||||
|
|
||||||
|
TopicMessageCodec
|
||||||
|
.decode(objectMapper, payload.getTopic(), payload.getPayload())
|
||||||
|
.as(StepVerifier::create)
|
||||||
|
.expectNextMatches(deviceMessage -> {
|
||||||
|
System.out.println(message);
|
||||||
|
System.out.println(deviceMessage);
|
||||||
|
return deviceMessage.toJson().equals(message.toJson());
|
||||||
|
})
|
||||||
|
.verifyComplete();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void doTest() {
|
||||||
|
testChild(ObjectMappers.JSON_MAPPER);
|
||||||
|
testChild(ObjectMappers.CBOR_MAPPER);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
<?xml version="1.0" encoding="utf-8" ?>
|
||||||
|
<configuration>
|
||||||
|
<logger name="io.netty" level="warn"/>
|
||||||
|
<logger name="org.apache" level="warn"/>
|
||||||
|
|
||||||
|
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||||
|
<encoder>
|
||||||
|
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<root level="debug">
|
||||||
|
<appender-ref ref="STDOUT"/>
|
||||||
|
</root>
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue