first commit

This commit is contained in:
zhou-hao 2020-07-22 11:05:45 +08:00
commit 4ab059b090
15 changed files with 1711 additions and 0 deletions

26
.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
**/pom.xml.versionsBackup
**/target/
**/out/
*.class
# Mobile Tools for Java (J2ME)
.mtj.tmp/
.idea/
/nbproject
*.ipr
*.iws
*.iml
# Package Files #
*.jar
*.war
*.ear
*.log
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
**/transaction-logs/
!/.mvn/wrapper/maven-wrapper.jar
/data/
*.db
/static/
/upload
/ui/upload/

163
README.md Normal file
View File

@ -0,0 +1,163 @@
## JetLinks 官方设备接入协议
类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider`
## MQTT(S)接入
目前支持MQTT3.1.1和3.1版本协议.
### 认证
CONNECT报文:
```
clientId: 设备实例ID
username: secureId+"|"+timestamp
password: md5(secureId+"|"+timestamp+"|"+secureKey)
```
说明: `secureId`以及`secureKey`在创建设备产品和设备实例时进行配置.
`timestamp`为当前系统时间戳(毫秒),与系统时间不能相差5分钟.
### Topic
- 读取设备属性:
topic: `/{productId}/{deviceId}/properties/read`
方向: `下行`
消息格式:
{
"messageId":"消息ID",
"deviceId":"设备ID",
"properties":["sn","model"] //要读取到属性列表
}
回复Topic: `/{productId}/{deviceId}/properties/read/reply`
回复消息格式:
//成功
{
"messageId":"与下行消息中的messageId相同",
"properties":{"sn":"test","model":"test"}, //key与设备模型中定义的属性id一致
"deviceId":"设备ID",
"success":true,
}
//失败. 下同
{
"messageId":"与下行消息中的messageId相同",
"success":false,
"code":"error_code",
"message":"失败原因"
}
- 修改设备属性:
topic: `/{productId}/{deviceId}/properties/write`
消息格式:
{
"messageId":"消息ID",
"deviceId":"设备ID",
"properties":{"color":"red"} //要设置的属性
}
回复Topic: `/{productId}/{deviceId}/properties/wirte/reply`
回复消息格式:
{
"messageId":"与下行消息中的messageId相同",
"properties":{"color":"red"}, //设置成功后的属性,可不返回
"success":true,
}
- 调用设备功能
topic: `/{productId}/{deviceId}/function/invoke`
消息格式:
{
"messageId":"消息ID",
"deviceId":"设备ID",
"function":"playVoice",//功能ID
"inputs":[{"name":"text","value":"播放声音"}] //参数
}
回复Topic: `/{productId}/{deviceId}/function/invoke/reply`
回复消息格式:
{
"messageId":"与下行消息中的messageId相同",
"output":"success", //返回执行结果
"success":true,
}
- 设备事件上报
topic: /{productId}/{deviceId}/event/{eventId}
消息格式:
{
"messageId":"随机消息ID",
"data":100 //上报数据
}
拓展:
定时上报属性:
{
"messageId":"随机消息ID",
"data":{"color":"red"},//属性列表
"headers":{"report-properties":true} //标记为上报属性事件
}
### 动态注册
暂不支持
## CoAP接入
使用CoAP协议接入仅需要对数据进行加密即可.加密算法: AES/ECB/PKCS5Padding.
使用自定义Option: `2100:设备ID` 来标识设备.
将请求体进行加密,密钥为在创建设备产品和设备实例时进行配置的(`secureKey`).
请求地址(`URI`)与MQTT `Topic`相同.消息体(`payload`)与MQTT相同.
## DTLS接入
使用CoAP DTLS 协议接入时需要先进行认证:
发送认证请求:
```text
POST /auth
Accept: application/json
Content-Format: application/json
2100: 设备ID
2110: 签名 md5(payload+secureKey)
payload: {"timestamp":"时间戳"}
```
响应结果:
```text
2.05 (Content)
payload: {"token":"令牌"}
```
之后的请求中需要将返回的令牌携带到自定义Option:2111
例如:
```text
POST /test/device1/event/fire_alarm
2100: 设备ID
2111: 令牌
...其他Option
payload: json数据
```

114
pom.xml Normal file
View File

@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<name>JetLinks</name>
<url>http://jetlinks.org</url>
<inceptionYear>2019</inceptionYear>
<description>JetLinks 物联网平台</description>
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<developers>
<developer>
<name>zhouhao</name>
<email>i@hsweb.me</email>
<roles>
<role>Owner</role>
</roles>
<timezone>+8</timezone>
<url>https://github.com/zhou-hao</url>
</developer>
</developers>
<scm>
<connection>scm:git:https://github.com/jetlinks/jetlinks-official-protocol.git</connection>
<developerConnection>scm:git:https://github.com/jetlinks/jetlinks-official-protocol.git</developerConnection>
<url>https://github.com/jetlinks/jetlinks-official-protocol</url>
<tag>${project.version}</tag>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<spring.boot.version>2.2.8.RELEASE</spring.boot.version>
<hsweb.framework.version>4.0.3</hsweb.framework.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<reactor.version>Dysprosium-RELEASE</reactor.version>
</properties>
<dependencies>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
<version>2.2.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>http://nexus.hsweb.me/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
<repository>
<id>aliyun-nexus</id>
<name>aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>

View File

@ -0,0 +1,56 @@
package org.jetlinks.protocol.official;
import org.apache.commons.codec.digest.DigestUtils;
import org.jetlinks.core.Value;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.AuthenticationRequest;
import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.MqttAuthenticationRequest;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
public class JetLinksAuthenticator implements Authenticator {
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
if (request instanceof MqttAuthenticationRequest) {
MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
// secureId|timestamp
String username = mqtt.getUsername();
// md5(secureId|timestamp|secureKey)
String password = mqtt.getPassword();
String requestSecureId;
try {
String[] arr = username.split("[|]");
if (arr.length <= 1) {
return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
}
requestSecureId = arr[0];
long time = Long.parseLong(arr[1]);
//和设备时间差大于5分钟则认为无效
if (Math.abs(System.currentTimeMillis() - time) > TimeUnit.MINUTES.toMillis(5)) {
return Mono.just(AuthenticationResponse.error(401, "设备时间不同步"));
}
return deviceOperation.getConfigs("secureId", "secureKey")
.map(conf -> {
String secureId = conf.getValue("secureId").map(Value::asString).orElse(null);
String secureKey = conf.getValue("secureKey").map(Value::asString).orElse(null);
//签名
String digest = DigestUtils.md5Hex(username + "|" + secureKey);
if (requestSecureId.equals(secureId) && digest.equals(password)) {
return AuthenticationResponse.success();
} else {
return AuthenticationResponse.error(401, "密钥错误");
}
});
} catch (NumberFormatException e) {
return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
}
}
return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
}
}

View File

@ -0,0 +1,121 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.eclipse.californium.core.coap.CoAP;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP_DTLS;
}
public Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
return Mono.defer(() -> {
String path = message.getPath();
String sign = message.getStringOption(2110).orElse(null);
String token = message.getStringOption(2111).orElse(null);
String payload = message.getPayload().toString(StandardCharsets.UTF_8);
if ("/auth".equals(path)) {
//认证
return context.getDevice()
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return context.getDevice()
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
})
.then(Mono.empty());
}
if (StringUtils.isEmpty(token)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return context.getDevice()
.getConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
}))
.flatMap(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return Mono
.just(decode(path, JSON.parseObject(payload)).getMessage())
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
})
.doOnSuccess(msg -> {
response.accept(CoAP.ResponseCode.CREATED);
})
.doOnError(error -> {
log.error("decode coap message error", error);
response.accept(CoAP.ResponseCode.BAD_REQUEST);
});
});
}
@Nonnull
@Override
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
return decode(exchangeMessage, context, resp -> {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
});
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Mono.empty();
}
protected boolean verifySign(String secureKey, String deviceId, String payload, String sign) {
//验证签名
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(payload.concat(secureKey)).equalsIgnoreCase(sign)) {
log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
return false;
}
return true;
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -0,0 +1,85 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.jetlinks.core.Value;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.protocol.official.cipher.Ciphers;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
@Slf4j
public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP;
}
protected JSONObject decode(String text) {
return JSON.parseObject(text);
}
protected Mono<? extends Message> decode(CoapMessage message, MessageDecodeContext context) {
String path = message.getPath();
return context
.getDevice()
.getConfigs("encAlg", "secureKey")
.flatMap(configs -> {
Ciphers ciphers = configs.getValue("encAlg").map(Value::asString).flatMap(Ciphers::of).orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
ByteBuf byteBuf = message.getPayload();
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
byteBuf.resetReaderIndex();
String payload = new String(ciphers.decrypt(req, secureKey));
//解码
return Mono.just(decode(path, decode(payload)).getMessage());
});
}
protected Mono<? extends Message> decode(CoapExchangeMessage message, MessageDecodeContext context) {
CoapExchange exchange = message.getExchange();
return decode((CoapMessage) message, context)
.doOnSuccess(msg -> {
exchange.respond(CoAP.ResponseCode.CREATED);
exchange.accept();
})
.switchIfEmpty(Mono.fromRunnable(() -> {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
}))
.doOnError(error -> {
log.error("decode coap message error", error);
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
});
}
@Nonnull
@Override
public Mono<? extends Message> decode(@Nonnull MessageDecodeContext context) {
return Mono.defer(() -> {
log.debug("handle coap message:\n{}", context.getMessage());
if (context.getMessage() instanceof CoapExchangeMessage) {
return decode(((CoapExchangeMessage) context.getMessage()), context);
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context);
}
return Mono.empty();
});
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -0,0 +1,112 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
/**
* <pre>
* 下行Topic:
* 读取设备属性: /{productId}/{deviceId}/properties/read
* 修改设备属性: /{productId}/{deviceId}/properties/write
* 调用设备功能: /{productId}/{deviceId}/function/invoke
*
* //网关设备
* 读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
* 修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
* 调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
*
* 上行Topic:
* 读取属性回复: /{productId}/{deviceId}/properties/read/reply
* 修改属性回复: /{productId}/{deviceId}/properties/write/reply
* 调用设备功能: /{productId}/{deviceId}/function/invoke/reply
* 上报设备事件: /{productId}/{deviceId}/event/{eventId}
* 上报设备属性: /{productId}/{deviceId}/properties/report
* 上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
*
* //网关设备
* 子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
* 子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnected
* 读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
* 修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
* 调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
* 上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
* 上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
*
* </pre>
* 基于jet links 的消息编解码器
*
* @author zhouhao
* @since 1.0.0
*/
@Slf4j
public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec {
private Transport transport;
public JetLinksMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public JetLinksMqttDeviceMessageCodec() {
this(DefaultTransport.MQTT);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.defer(() -> {
Message message = context.getMessage();
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
EncodedTopic convertResult = encode(deviceMessage.getDeviceId(), deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
return context.getDevice()
.getConfig(DeviceConfigKey.productId)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage.builder()
.clientId(deviceMessage.getDeviceId())
.topic("/" .concat(productId).concat(convertResult.topic))
.payloadType(MessagePayloadType.JSON)
.payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(convertResult.payload)))
.build());
} else {
return Mono.empty();
}
});
}
@Nonnull
@Override
public Mono<Message> decode(@Nonnull MessageDecodeContext context) {
return Mono.fromSupplier(() -> {
MqttMessage message = (MqttMessage) context.getMessage();
String topic = message.getTopic();
String jsonData = message.getPayload().toString(StandardCharsets.UTF_8);
JSONObject object = JSON.parseObject(jsonData, JSONObject.class);
if (object == null) {
throw new UnsupportedOperationException("cannot parse payload:{}" + jsonData);
}
return decode(topic, object).getMessage();
});
}
}

View File

@ -0,0 +1,72 @@
package org.jetlinks.protocol.official;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import reactor.core.publisher.Mono;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
"MQTT认证配置"
, "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" +
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
.add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法",
new EnumType().addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")))
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) {
return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport();
support.setId("jetlinks.v1.0");
support.setName("JetLinks V1.0");
support.setDescription("JetLinks Protocol Version 1.0");
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig);
support.addConfigMetadata(DefaultTransport.CoAP, coapConfig);
support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig);
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT));
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
return Mono.just(support);
});
}
}

View File

@ -0,0 +1,291 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.supports.utils.MqttTopicUtils;
import org.springframework.util.Assert;
import java.util.Map;
import java.util.Optional;
class JetlinksTopicMessageCodec {
@Getter
protected class DecodeResult {
private Map<String, String> args;
private boolean child;
private boolean event;
private boolean readPropertyReply;
private boolean writePropertyReply;
private boolean functionInvokeReply;
private boolean reportProperties;
private boolean derivedMetadata;
private boolean register;
private boolean unregister;
private boolean requestFirmware;
private boolean reportFirmware;
private boolean upgradeFirmwareProgress;
private boolean readFirmwareReply;
public DecodeResult(String topic) {
this.topic = topic;
args = MqttTopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic);
if (topic.contains("child")) {
child = true;
args.putAll(MqttTopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic));
}
if (topic.contains("event")) {
event = true;
args.putAll(MqttTopicUtils.getPathVariables("/**/event/{eventId}", topic));
}
derivedMetadata = topic.endsWith("metadata/derived");
if (event) {
} else if (reportProperties = topic.endsWith("properties/report")) {
} else if (unregister = topic.endsWith("unregister")) {
} else if (register = topic.endsWith("register")) {
} else if (readPropertyReply = topic.endsWith("properties/read/reply")) {
} else if (writePropertyReply = topic.endsWith("properties/write/reply")) {
} else if (functionInvokeReply = topic.endsWith("function/invoke/reply")) {
} else if (upgradeFirmwareProgress = topic.endsWith("firmware/upgrade/progress")) {
} else if (requestFirmware = topic.endsWith("firmware/pull")) {
} else if (reportFirmware = topic.endsWith("firmware/report")) {
} else if (readFirmwareReply = topic.endsWith("firmware/read/reply")) {
} else if (derivedMetadata = topic.endsWith("metadata/derived")) {
}
}
private final String topic;
public String getDeviceId() {
return args.get("deviceId");
}
public String getChildDeviceId() {
return args.get("childDeviceId");
}
protected Message message;
}
protected EncodedTopic encode(String deviceId, Message message) {
Assert.hasText(deviceId, "deviceId can not be null");
Assert.notNull(message, "message can not be null");
if (message instanceof ReadPropertyMessage) {
String topic = "/" .concat(deviceId).concat("/properties/read");
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("properties", ((ReadPropertyMessage) message).getProperties());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof WritePropertyMessage) {
String topic = "/" .concat(deviceId).concat("/properties/write");
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("properties", ((WritePropertyMessage) message).getProperties());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof FunctionInvokeMessage) {
String topic = "/" .concat(deviceId).concat("/function/invoke");
FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message);
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("function", invokeMessage.getFunctionId());
mqttData.put("inputs", invokeMessage.getInputs());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof UpgradeFirmwareMessage) {
String topic = "/" .concat(deviceId).concat("/firmware/upgrade");
UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message);
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("url", firmwareMessage.getUrl());
mqttData.put("sign", firmwareMessage.getSign());
mqttData.put("version", firmwareMessage.getVersion());
mqttData.put("signMethod", firmwareMessage.getSignMethod());
mqttData.put("parameters", firmwareMessage.getParameters());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof ReadFirmwareMessage) {
String topic = "/" .concat(deviceId).concat("/firmware/read");
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof RequestFirmwareMessageReply) {
String topic = "/" .concat(deviceId).concat("/firmware/pull/reply");
RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message);
JSONObject mqttData = new JSONObject();
mqttData.put("messageId", message.getMessageId());
mqttData.put("url", firmwareMessage.getUrl());
mqttData.put("sign", firmwareMessage.getSign());
mqttData.put("version", firmwareMessage.getVersion());
mqttData.put("signMethod", firmwareMessage.getSignMethod());
mqttData.put("parameters", firmwareMessage.getParameters());
mqttData.put("deviceId", deviceId);
return new EncodedTopic(topic, mqttData);
} else if (message instanceof ChildDeviceMessage) {
ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message);
EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
String topic = "/" .concat(deviceId).concat("/child").concat(result.topic);
result.payload.put("deviceId", childDeviceMessage.getChildDeviceId());
return new EncodedTopic(topic, result.payload);
}
return null;
}
protected DecodeResult decode(String topic, JSONObject object) {
DecodeResult result = new DecodeResult(topic);
Message message = null;
if (result.isEvent()) {
message = decodeEvent(result, object);
} else if (result.isReportProperties()) {
message = decodeReportPropertyReply(result, object);
} else if (result.isReadPropertyReply()) {
message = decodeReadPropertyReply(result, object);
} else if (result.isWritePropertyReply()) {
message = decodeWritePropertyReply(result, object);
} else if (result.isFunctionInvokeReply()) {
message = decodeInvokeReply(result, object);
} else if (result.isRegister()) {
message = decodeRegister(result, object);
} else if (result.isUnregister()) {
message = decodeUnregister(result, object);
} else if (result.isDerivedMetadata()) {
message = decodeDerivedMetadata(result, object);
} else if (result.isReadFirmwareReply()) {
message = object.toJavaObject(ReadFirmwareMessageReply.class);
} else if (result.isRequestFirmware()) {
message = object.toJavaObject(RequestFirmwareMessage.class);
} else if (result.isReportFirmware()) {
message = object.toJavaObject(ReportFirmwareMessage.class);
} else if (result.isUpgradeFirmwareProgress()) {
message = object.toJavaObject(UpgradeFirmwareProgressMessage.class);
}
if (result.isChild()) {
if (topic.endsWith("connected")) {
message = object.toJavaObject(DeviceOnlineMessage.class);
} else if (topic.endsWith("disconnect")) {
message = object.toJavaObject(DeviceOfflineMessage.class);
}
if (message == null) {
throw new UnsupportedOperationException("unsupported topic:" + topic);
}
applyCommons(message, result, object);
ChildDeviceMessageReply children = new ChildDeviceMessageReply();
children.setChildDeviceId(result.getChildDeviceId());
children.setDeviceId(result.getDeviceId());
children.setChildDeviceMessage(message);
children.setSuccess(Optional.ofNullable(object.getBoolean("success")).orElse(true));
children.setTimestamp(Optional.ofNullable(object.getLong("timestamp")).orElse(System.currentTimeMillis()));
Optional.ofNullable(object.getString("messageId")).ifPresent(children::setMessageId);
result.message = children;
} else {
if (message == null) {
throw new UnsupportedOperationException("unsupported topic:" + topic);
}
applyCommons(message, result, object);
result.message = message;
}
return result;
}
private Message decodeEvent(DecodeResult result, JSONObject event) {
EventMessage message = event.toJavaObject(EventMessage.class);
message.setData(event.get("data"));
message.setEvent(result.args.get("eventId"));
return message;
}
private Message decodeReadPropertyReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(ReadPropertyMessageReply.class);
}
private Message decodeReportPropertyReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(ReportPropertyMessage.class);
}
private Message decodeWritePropertyReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(WritePropertyMessageReply.class);
}
private Message decodeInvokeReply(DecodeResult result, JSONObject data) {
return data.toJavaObject(FunctionInvokeMessageReply.class);
}
private Message decodeRegister(DecodeResult result, JSONObject data) {
return data.toJavaObject(DeviceRegisterMessage.class);
}
private Message decodeUnregister(DecodeResult result, JSONObject data) {
return data.toJavaObject(DeviceUnRegisterMessage.class);
}
private Message decodeDerivedMetadata(DecodeResult result, JSONObject data) {
return data.toJavaObject(DerivedMetadataMessage.class);
}
private void applyCommons(Message message, DecodeResult result, JSONObject data) {
if (message instanceof CommonDeviceMessageReply) {
CommonDeviceMessageReply reply = ((CommonDeviceMessageReply) message);
reply.setSuccess(Optional.ofNullable(data.getBoolean("success")).orElse(true));
reply.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
if (result.isChild()) {
reply.setDeviceId(result.getChildDeviceId());
} else {
reply.setDeviceId(result.getDeviceId());
}
}
if (message instanceof CommonDeviceMessage) {
CommonDeviceMessage msg = ((CommonDeviceMessage) message);
msg.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis()));
if (result.isChild()) {
msg.setDeviceId(result.getChildDeviceId());
} else {
msg.setDeviceId(result.getDeviceId());
}
}
}
@Getter
@Setter
@AllArgsConstructor
protected class EncodedTopic {
String topic;
JSONObject payload;
}
@Getter
@Setter
protected class Decoded {
Message message;
}
}

View File

@ -0,0 +1,56 @@
package org.jetlinks.protocol.official.cipher;
import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.util.Optional;
public enum Ciphers {
AES {
@SneakyThrows
public byte[] encrypt(byte[] src, String key) {
if (key == null || key.length() != 16) {
throw new IllegalArgumentException("illegal key");
}
SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, skeySpec);
return cipher.doFinal(src);
}
@SneakyThrows
public byte[] decrypt(byte[] src, String key) {
if (key == null || key.length() != 16) {
throw new IllegalArgumentException("illegal key");
}
SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, skeySpec);
return cipher.doFinal(src);
}
};
public static Optional<Ciphers> of(String name) {
try {
return Optional.of(valueOf(name.toUpperCase()));
} catch (Exception e) {
return Optional.empty();
}
}
public abstract byte[] encrypt(byte[] src, String key);
public abstract byte[] decrypt(byte[] src, String key);
String encryptBase64(String src, String key) {
return Base64.encodeBase64String(encrypt(src.getBytes(), key));
}
byte[] decryptBase64(String src, String key) {
return decrypt(Base64.decodeBase64(src), key);
}
}

View File

@ -0,0 +1 @@
org.jetlinks.protocol.official.JetLinksProtocolSupportProvider

View File

@ -0,0 +1,112 @@
package org.jetlinks.protocol.official;
import lombok.SneakyThrows;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
import org.hswebframework.utils.RandomUtil;
import org.jetlinks.core.defaults.CompositeProtocolSupports;
import org.jetlinks.core.device.DeviceInfo;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.CoapExchangeMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicReference;
public class JetLinksCoapDeviceMessageCodecTest {
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
DeviceOperator device;
private String key = RandomUtil.randomChar(16);
@Before
public void init() {
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
device = registry.register(DeviceInfo.builder()
.id("test")
.protocol("jetlinks")
.build())
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
.block();
}
@Test
@SneakyThrows
public void test() {
AtomicReference<Message> messageRef = new AtomicReference<>();
CoapServer server = new CoapServer() {
@Override
protected Resource createRoot() {
return new CoapResource("/", true) {
@Override
public void handlePOST(CoapExchange exchange) {
codec.decode(new MessageDecodeContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return new CoapExchangeMessage(exchange);
}
@Override
public DeviceOperator getDevice() {
return device;
}
})
.doOnSuccess(messageRef::set)
.doOnError(Throwable::printStackTrace)
.subscribe();
}
@Override
public Resource getChild(String name) {
return this;
}
};
}
};
Endpoint endpoint = new CoapEndpoint.Builder()
.setPort(12345).build();
server.addEndpoint(endpoint);
server.start();
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"data\":1}";
request.setURI("coap://localhost:12345/test/test/event/event1");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(),key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Thread.sleep(1000);
Assert.assertNotNull(messageRef.get());
System.out.println(messageRef.get());
}
}

View File

@ -0,0 +1,398 @@
package org.jetlinks.protocol.official;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.defaults.CompositeProtocolSupports;
import org.jetlinks.core.device.DeviceInfo;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.ProductInfo;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DerivedMetadataMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
public class JetLinksMqttDeviceMessageCodecTest {
org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
TestDeviceRegistry registry;
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
registry.register(ProductInfo.builder()
.id("product1")
.protocol("jetlinks")
.build())
.flatMap(product -> registry.register(DeviceInfo.builder()
.id("device1")
.productId("product1")
.build()))
.subscribe();
}
@Test
public void testReadProperty() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setProperties(Arrays.asList("name", "sn"));
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/properties/read");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testReadChildProperty() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test");
message.setProperties(Arrays.asList("name", "sn"));
ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
childDeviceMessage.setChildDeviceMessage(message);
childDeviceMessage.setChildDeviceId("test");
childDeviceMessage.setDeviceId("device1");
MqttMessage encodedMessage = codec.encode(createMessageContext(childDeviceMessage)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/read");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testChildReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
ReadPropertyMessageReply reply = (ReadPropertyMessageReply)childReply.getChildDeviceMessage();;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testWriteProperty() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setProperties(Collections.singletonMap("sn", "123"));
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/properties/write");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testChildWriteProperty() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setProperties(Collections.singletonMap("sn", "123"));
ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
childDeviceMessage.setChildDeviceMessage(message);
childDeviceMessage.setChildDeviceId("test");
childDeviceMessage.setDeviceId("device1");
MqttMessage encodedMessage = codec.encode(createMessageContext(childDeviceMessage)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/write");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testWritePropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof WritePropertyMessageReply);
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testWriteChildPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
WritePropertyMessageReply reply = (WritePropertyMessageReply)childReply.getChildDeviceMessage();;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testInvokeFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setFunctionId("playVoice");
message.addInput("file", "http://baidu.com/1.mp3");
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/function/invoke");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testFirmwareUpgrade() {
UpgradeFirmwareMessage message = new UpgradeFirmwareMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setVersion("1.0");
message.setUrl("http://baidu.com/1.mp3");
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/firmware/upgrade");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testInvokeFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getOutput(), "ok");
System.out.println(reply);
}
@Test
public void testInvokeChildFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message);
Assert.assertTrue(childReply.isSuccess());
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply)childReply.getChildDeviceMessage();;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getOutput(), "ok");
System.out.println(reply);
}
@Test
public void testEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof EventMessage);
EventMessage reply = ((EventMessage) message);
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100);
System.out.println(reply);
}
@Test
public void testChildEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
EventMessage reply = ((EventMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100);
System.out.println(reply);
}
@Test
public void testPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply);
}
@Test
public void testChildPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply);
}
@Test
public void testMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof DerivedMetadataMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1");
System.out.println(reply);
}
@Test
public void testChildMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).block();
Assert.assertTrue(message instanceof ChildDeviceMessageReply);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1");
System.out.println(reply);
}
public MessageEncodeContext createMessageContext(Message message) {
System.out.println(message.toString());
return new MessageEncodeContext() {
@Nonnull
@Override
public Message getMessage() {
return message;
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
};
}
public MessageDecodeContext createMessageContext(EncodedMessage message) {
System.out.println(message.toString());
return new MessageDecodeContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return message;
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
};
}
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.protocol.official;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class JetlinksTopicMessageCodecTest {
JetlinksTopicMessageCodec codec = new JetlinksTopicMessageCodec();
@Test
public void testReadProperty() {
ReadPropertyMessage readProperty = new ReadPropertyMessage();
readProperty.setProperties(Arrays.asList("name"));
readProperty.setMessageId("test");
JetlinksTopicMessageCodec.EncodedTopic topic = codec.encode("test", readProperty);
Assert.assertEquals(topic.getTopic(),"/test/properties/read");
}
}

View File

@ -0,0 +1,80 @@
package org.jetlinks.protocol.official;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.defaults.DefaultDeviceOperator;
import org.jetlinks.core.defaults.DefaultDeviceProductOperator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.supports.config.InMemoryConfigStorageManager;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TestDeviceRegistry implements DeviceRegistry {
private DeviceMessageSenderInterceptor interceptor = new CompositeDeviceMessageSenderInterceptor();
private ConfigStorageManager manager = new InMemoryConfigStorageManager();
private Map<String, DeviceOperator> operatorMap = new ConcurrentHashMap<>();
private Map<String, DeviceProductOperator> productOperatorMap = new ConcurrentHashMap<>();
private ProtocolSupports supports;
private DeviceOperationBroker handler;
public TestDeviceRegistry(ProtocolSupports supports, DeviceOperationBroker handler) {
this.supports = supports;
this.handler = handler;
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return Mono.fromSupplier(() -> operatorMap.get(deviceId));
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return Mono.fromSupplier(() -> productOperatorMap.get(productId));
}
@Override
public Mono<DeviceOperator> register(DeviceInfo deviceInfo) {
DefaultDeviceOperator operator = new DefaultDeviceOperator(
deviceInfo.getId(),
supports, manager, handler, this
);
operatorMap.put(operator.getDeviceId(), operator);
return operator.setConfigs(
DeviceConfigKey.productId.value(deviceInfo.getProductId()),
DeviceConfigKey.protocol.value(deviceInfo.getProtocol()))
.thenReturn(operator);
}
@Override
public Mono<DeviceProductOperator> register(ProductInfo productInfo) {
DefaultDeviceProductOperator operator = new DefaultDeviceProductOperator(productInfo.getId(), supports, manager);
productOperatorMap.put(operator.getId(), operator);
return operator.setConfigs(
DeviceConfigKey.productId.value(productInfo.getMetadata()),
DeviceConfigKey.protocol.value(productInfo.getProtocol()))
.thenReturn(operator);
}
@Override
public Mono<Void> unregisterDevice(String deviceId) {
return Mono.justOrEmpty(deviceId)
.map(operatorMap::remove)
.then();
}
@Override
public Mono<Void> unregisterProduct(String productId) {
return Mono.justOrEmpty(productId)
.map(productOperatorMap::remove)
.then();
}
}