commit 4ab059b0902cdb0daa19bb2e5ca6c2840883407e Author: zhou-hao Date: Wed Jul 22 11:05:45 2020 +0800 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1a81fa8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +**/pom.xml.versionsBackup +**/target/ +**/out/ +*.class +# Mobile Tools for Java (J2ME) +.mtj.tmp/ +.idea/ +/nbproject +*.ipr +*.iws +*.iml + +# Package Files # +*.jar +*.war +*.ear +*.log +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +**/transaction-logs/ +!/.mvn/wrapper/maven-wrapper.jar +/data/ +*.db +/static/ +/upload +/ui/upload/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..b44fa79 --- /dev/null +++ b/README.md @@ -0,0 +1,163 @@ +## JetLinks 官方设备接入协议 + +类名: `org.jetlinks.protocol.official.JetLinksProtocolSupportProvider` + + +## MQTT(S)接入 +目前支持MQTT3.1.1和3.1版本协议. + +### 认证 + +CONNECT报文: +``` +clientId: 设备实例ID +username: secureId+"|"+timestamp +password: md5(secureId+"|"+timestamp+"|"+secureKey) +``` + +说明: `secureId`以及`secureKey`在创建设备产品和设备实例时进行配置. +`timestamp`为当前系统时间戳(毫秒),与系统时间不能相差5分钟. + + +### Topic + + - 读取设备属性: + + topic: `/{productId}/{deviceId}/properties/read` + + 方向: `下行` + + 消息格式: + + { + "messageId":"消息ID", + "deviceId":"设备ID", + "properties":["sn","model"] //要读取到属性列表 + } + + 回复Topic: `/{productId}/{deviceId}/properties/read/reply` + + 回复消息格式: + + //成功 + { + "messageId":"与下行消息中的messageId相同", + "properties":{"sn":"test","model":"test"}, //key与设备模型中定义的属性id一致 + "deviceId":"设备ID", + "success":true, + } + //失败. 下同 + { + "messageId":"与下行消息中的messageId相同", + "success":false, + "code":"error_code", + "message":"失败原因" + } + - 修改设备属性: + + topic: `/{productId}/{deviceId}/properties/write` + + 消息格式: + + { + "messageId":"消息ID", + "deviceId":"设备ID", + "properties":{"color":"red"} //要设置的属性 + } + 回复Topic: `/{productId}/{deviceId}/properties/wirte/reply` + + 回复消息格式: + + { + "messageId":"与下行消息中的messageId相同", + "properties":{"color":"red"}, //设置成功后的属性,可不返回 + "success":true, + } + + - 调用设备功能 + + topic: `/{productId}/{deviceId}/function/invoke` + + 消息格式: + + { + "messageId":"消息ID", + "deviceId":"设备ID", + "function":"playVoice",//功能ID + "inputs":[{"name":"text","value":"播放声音"}] //参数 + } + + 回复Topic: `/{productId}/{deviceId}/function/invoke/reply` + + 回复消息格式: + + { + "messageId":"与下行消息中的messageId相同", + "output":"success", //返回执行结果 + "success":true, + } + + - 设备事件上报 + + topic: /{productId}/{deviceId}/event/{eventId} + + 消息格式: + + { + "messageId":"随机消息ID", + "data":100 //上报数据 + } + + 拓展: + + 定时上报属性: + + { + "messageId":"随机消息ID", + "data":{"color":"red"},//属性列表 + "headers":{"report-properties":true} //标记为上报属性事件 + } + +### 动态注册 + +暂不支持 + +## CoAP接入 +使用CoAP协议接入仅需要对数据进行加密即可.加密算法: AES/ECB/PKCS5Padding. + +使用自定义Option: `2100:设备ID` 来标识设备. + +将请求体进行加密,密钥为在创建设备产品和设备实例时进行配置的(`secureKey`). + +请求地址(`URI`)与MQTT `Topic`相同.消息体(`payload`)与MQTT相同. + + +## DTLS接入 +使用CoAP DTLS 协议接入时需要先进行认证: + +发送认证请求: +```text +POST /auth +Accept: application/json +Content-Format: application/json +2100: 设备ID +2110: 签名 md5(payload+secureKey) +payload: {"timestamp":"时间戳"} +``` + +响应结果: +```text +2.05 (Content) +payload: {"token":"令牌"} +``` + +之后的请求中需要将返回的令牌携带到自定义Option:2111 + +例如: +```text +POST /test/device1/event/fire_alarm +2100: 设备ID +2111: 令牌 +...其他Option +payload: json数据 +``` diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..878b6ad --- /dev/null +++ b/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + + org.jetlinks + jetlinks-official-protocol + 1.0-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + JetLinks + http://jetlinks.org + 2019 + JetLinks 物联网平台 + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + zhouhao + i@hsweb.me + + Owner + + +8 + https://github.com/zhou-hao + + + + + scm:git:https://github.com/jetlinks/jetlinks-official-protocol.git + scm:git:https://github.com/jetlinks/jetlinks-official-protocol.git + https://github.com/jetlinks/jetlinks-official-protocol + ${project.version} + + + + UTF-8 + zh_CN + 1.8 + ${java.version} + 2.2.8.RELEASE + 4.0.3 + 3.0.2 + Dysprosium-RELEASE + + + + + + org.jetlinks + jetlinks-supports + 1.1.0 + + + + org.eclipse.californium + californium-core + 2.2.3 + true + + + + org.projectlombok + lombok + 1.18.12 + provided + + + + junit + junit + 4.13 + test + + + + + + + hsweb-nexus + Nexus Release Repository + http://nexus.hsweb.me/content/groups/public/ + + true + always + + + + aliyun-nexus + aliyun + http://maven.aliyun.com/nexus/content/groups/public/ + + + + + \ No newline at end of file diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksAuthenticator.java b/src/main/java/org/jetlinks/protocol/official/JetLinksAuthenticator.java new file mode 100644 index 0000000..2fc6dce --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksAuthenticator.java @@ -0,0 +1,56 @@ +package org.jetlinks.protocol.official; + +import org.apache.commons.codec.digest.DigestUtils; +import org.jetlinks.core.Value; +import org.jetlinks.core.defaults.Authenticator; +import org.jetlinks.core.device.AuthenticationRequest; +import org.jetlinks.core.device.AuthenticationResponse; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.MqttAuthenticationRequest; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.util.concurrent.TimeUnit; + +public class JetLinksAuthenticator implements Authenticator { + + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) { + if (request instanceof MqttAuthenticationRequest) { + MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request); + // secureId|timestamp + String username = mqtt.getUsername(); + // md5(secureId|timestamp|secureKey) + String password = mqtt.getPassword(); + String requestSecureId; + try { + String[] arr = username.split("[|]"); + if (arr.length <= 1) { + return Mono.just(AuthenticationResponse.error(401, "用户名格式错误")); + } + requestSecureId = arr[0]; + long time = Long.parseLong(arr[1]); + //和设备时间差大于5分钟则认为无效 + if (Math.abs(System.currentTimeMillis() - time) > TimeUnit.MINUTES.toMillis(5)) { + return Mono.just(AuthenticationResponse.error(401, "设备时间不同步")); + } + return deviceOperation.getConfigs("secureId", "secureKey") + .map(conf -> { + String secureId = conf.getValue("secureId").map(Value::asString).orElse(null); + + String secureKey = conf.getValue("secureKey").map(Value::asString).orElse(null); + //签名 + String digest = DigestUtils.md5Hex(username + "|" + secureKey); + if (requestSecureId.equals(secureId) && digest.equals(password)) { + return AuthenticationResponse.success(); + } else { + return AuthenticationResponse.error(401, "密钥错误"); + } + }); + } catch (NumberFormatException e) { + return Mono.just(AuthenticationResponse.error(401, "用户名格式错误")); + } + } + return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request)); + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java new file mode 100644 index 0000000..549df66 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java @@ -0,0 +1,121 @@ +package org.jetlinks.protocol.official; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; +import org.eclipse.californium.core.coap.CoAP; +import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; +import java.util.function.Consumer; + +@Slf4j +public class JetLinksCoapDTLSDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec { + + @Override + public Transport getSupportTransport() { + return DefaultTransport.CoAP_DTLS; + } + + + public Mono decode(CoapMessage message, MessageDecodeContext context, Consumer response) { + return Mono.defer(() -> { + String path = message.getPath(); + String sign = message.getStringOption(2110).orElse(null); + String token = message.getStringOption(2111).orElse(null); + String payload = message.getPayload().toString(StandardCharsets.UTF_8); + if ("/auth".equals(path)) { + //认证 + return context.getDevice() + .getConfig("secureKey") + .flatMap(sk -> { + String secureKey = sk.asString(); + if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) { + response.accept(CoAP.ResponseCode.BAD_REQUEST); + return Mono.empty(); + } + String newToken = IDGenerator.MD5.generate(); + return context.getDevice() + .setConfig("coap-token", newToken) + .doOnSuccess(success -> { + JSONObject json = new JSONObject(); + json.put("token", newToken); + response.accept(json.toJSONString()); + }); + }) + .then(Mono.empty()); + } + if (StringUtils.isEmpty(token)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + return context.getDevice() + .getConfig("coap-token") + .switchIfEmpty(Mono.fromRunnable(() -> { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + })) + .flatMap(value -> { + String tk = value.asString(); + if (!token.equals(tk)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + return Mono + .just(decode(path, JSON.parseObject(payload)).getMessage()) + .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST))); + }) + .doOnSuccess(msg -> { + response.accept(CoAP.ResponseCode.CREATED); + }) + .doOnError(error -> { + log.error("decode coap message error", error); + response.accept(CoAP.ResponseCode.BAD_REQUEST); + }); + }); + + } + + @Nonnull + @Override + public Mono decode(@Nonnull MessageDecodeContext context) { + if (context.getMessage() instanceof CoapExchangeMessage) { + CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage()); + return decode(exchangeMessage, context, resp -> { + if (resp instanceof CoAP.ResponseCode) { + exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp)); + } + if (resp instanceof String) { + exchangeMessage.getExchange().respond(((String) resp)); + } + }); + } + if (context.getMessage() instanceof CoapMessage) { + return decode(((CoapMessage) context.getMessage()), context, resp -> { + log.info("skip response coap request:{}", resp); + }); + } + + return Mono.empty(); + } + + protected boolean verifySign(String secureKey, String deviceId, String payload, String sign) { + //验证签名 + if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(payload.concat(secureKey)).equalsIgnoreCase(sign)) { + log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload); + return false; + } + return true; + } + + @Nonnull + @Override + public Mono encode(@Nonnull MessageEncodeContext context) { + return Mono.empty(); + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java new file mode 100644 index 0000000..6d206cb --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java @@ -0,0 +1,85 @@ +package org.jetlinks.protocol.official; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.ByteBuf; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.jetlinks.core.Value; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.protocol.official.cipher.Ciphers; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; + +@Slf4j +public class JetLinksCoapDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec { + + + @Override + public Transport getSupportTransport() { + return DefaultTransport.CoAP; + } + + protected JSONObject decode(String text) { + return JSON.parseObject(text); + } + + protected Mono decode(CoapMessage message, MessageDecodeContext context) { + String path = message.getPath(); + return context + .getDevice() + .getConfigs("encAlg", "secureKey") + .flatMap(configs -> { + Ciphers ciphers = configs.getValue("encAlg").map(Value::asString).flatMap(Ciphers::of).orElse(Ciphers.AES); + String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null); + ByteBuf byteBuf = message.getPayload(); + byte[] req = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(req); + byteBuf.resetReaderIndex(); + String payload = new String(ciphers.decrypt(req, secureKey)); + //解码 + return Mono.just(decode(path, decode(payload)).getMessage()); + }); + } + + protected Mono decode(CoapExchangeMessage message, MessageDecodeContext context) { + CoapExchange exchange = message.getExchange(); + return decode((CoapMessage) message, context) + .doOnSuccess(msg -> { + exchange.respond(CoAP.ResponseCode.CREATED); + exchange.accept(); + }) + .switchIfEmpty(Mono.fromRunnable(() -> { + exchange.respond(CoAP.ResponseCode.BAD_REQUEST); + })) + .doOnError(error -> { + log.error("decode coap message error", error); + exchange.respond(CoAP.ResponseCode.BAD_REQUEST); + }); + } + + @Nonnull + @Override + public Mono decode(@Nonnull MessageDecodeContext context) { + return Mono.defer(() -> { + log.debug("handle coap message:\n{}", context.getMessage()); + if (context.getMessage() instanceof CoapExchangeMessage) { + return decode(((CoapExchangeMessage) context.getMessage()), context); + } + if (context.getMessage() instanceof CoapMessage) { + return decode(((CoapMessage) context.getMessage()), context); + } + + return Mono.empty(); + }); + } + + @Nonnull + @Override + public Mono encode(@Nonnull MessageEncodeContext context) { + return Mono.empty(); + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java new file mode 100644 index 0000000..6324162 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java @@ -0,0 +1,112 @@ +package org.jetlinks.protocol.official; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.device.DeviceConfigKey; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; + +/** + *
+ *     下行Topic:
+ *          读取设备属性: /{productId}/{deviceId}/properties/read
+ *          修改设备属性: /{productId}/{deviceId}/properties/write
+ *          调用设备功能: /{productId}/{deviceId}/function/invoke
+ *
+ *          //网关设备
+ *          读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
+ *          修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
+ *          调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
+ *
+ *      上行Topic:
+ *          读取属性回复: /{productId}/{deviceId}/properties/read/reply
+ *          修改属性回复: /{productId}/{deviceId}/properties/write/reply
+ *          调用设备功能: /{productId}/{deviceId}/function/invoke/reply
+ *          上报设备事件: /{productId}/{deviceId}/event/{eventId}
+ *          上报设备属性: /{productId}/{deviceId}/properties/report
+ *          上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
+ *
+ *          //网关设备
+ *          子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
+ *          子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnected
+ *          读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
+ *          修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
+ *          调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
+ *          上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
+ *          上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
+ *
+ * 
+ * 基于jet links 的消息编解码器 + * + * @author zhouhao + * @since 1.0.0 + */ +@Slf4j +public class JetLinksMqttDeviceMessageCodec extends JetlinksTopicMessageCodec implements DeviceMessageCodec { + + private Transport transport; + + + public JetLinksMqttDeviceMessageCodec(Transport transport) { + this.transport = transport; + } + + public JetLinksMqttDeviceMessageCodec() { + this(DefaultTransport.MQTT); + } + + @Override + public Transport getSupportTransport() { + return transport; + } + + @Nonnull + public Mono encode(@Nonnull MessageEncodeContext context) { + return Mono.defer(() -> { + Message message = context.getMessage(); + if (message instanceof DeviceMessage) { + DeviceMessage deviceMessage = ((DeviceMessage) message); + + EncodedTopic convertResult = encode(deviceMessage.getDeviceId(), deviceMessage); + if (convertResult == null) { + return Mono.empty(); + } + return context.getDevice() + .getConfig(DeviceConfigKey.productId) + .defaultIfEmpty("null") + .map(productId -> SimpleMqttMessage.builder() + .clientId(deviceMessage.getDeviceId()) + .topic("/" .concat(productId).concat(convertResult.topic)) + .payloadType(MessagePayloadType.JSON) + .payload(Unpooled.wrappedBuffer(JSON.toJSONBytes(convertResult.payload))) + .build()); + } else { + return Mono.empty(); + } + }); + } + + @Nonnull + @Override + public Mono decode(@Nonnull MessageDecodeContext context) { + return Mono.fromSupplier(() -> { + MqttMessage message = (MqttMessage) context.getMessage(); + String topic = message.getTopic(); + String jsonData = message.getPayload().toString(StandardCharsets.UTF_8); + + JSONObject object = JSON.parseObject(jsonData, JSONObject.class); + if (object == null) { + throw new UnsupportedOperationException("cannot parse payload:{}" + jsonData); + } + return decode(topic, object).getMessage(); + }); + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java new file mode 100644 index 0000000..2be1b9d --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java @@ -0,0 +1,72 @@ +package org.jetlinks.protocol.official; + +import org.jetlinks.core.defaults.CompositeProtocolSupport; +import org.jetlinks.core.message.codec.DefaultTransport; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.types.EnumType; +import org.jetlinks.core.metadata.types.PasswordType; +import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.spi.ProtocolSupportProvider; +import org.jetlinks.core.spi.ServiceContext; +import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; +import reactor.core.publisher.Mono; + +public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider { + + private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata( + "MQTT认证配置" + , "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" + + "username=secureId|timestamp\n" + + "password=md5(secureId|timestamp|secureKey)\n" + + "\n" + + "timestamp为时间戳,与服务时间不能相差5分钟") + .add("secureId", "secureId", "密钥ID", new StringType()) + .add("secureKey", "secureKey", "密钥KEY", new PasswordType()); + + private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata( + "CoAP认证配置", + "使用CoAP进行数据上报时,需要对数据进行加密:" + + "encrypt(payload,secureKey);") + .add("encAlg", "加密算法", "加密算法", + new EnumType().addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5"))) + .add("secureKey", "密钥", "16位密钥KEY", new PasswordType()); + + private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata( + "CoAP DTLS配置", + "使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" + + "之后上报数据需要在Option中携带token信息. \n" + + "自定义Option: 2110,sign ; 2111,token ") + .add("secureKey", "密钥", "认证签名密钥", new PasswordType()); + + + @Override + public Mono create(ServiceContext context) { + + return Mono.defer(() -> { + CompositeProtocolSupport support = new CompositeProtocolSupport(); + + support.setId("jetlinks.v1.0"); + support.setName("JetLinks V1.0"); + support.setDescription("JetLinks Protocol Version 1.0"); + + support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator()); + support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator()); + + support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); + + support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig); + support.addConfigMetadata(DefaultTransport.MQTT_TLS, mqttConfig); + support.addConfigMetadata(DefaultTransport.CoAP, coapConfig); + support.addConfigMetadata(DefaultTransport.CoAP_DTLS, coapDTLSConfig); + + support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT)); + support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS)); + + support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec()); + support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec()); + + + return Mono.just(support); + }); + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodec.java new file mode 100644 index 0000000..07cef14 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodec.java @@ -0,0 +1,291 @@ +package org.jetlinks.protocol.official; + +import com.alibaba.fastjson.JSONObject; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.core.message.*; +import org.jetlinks.core.message.event.EventMessage; +import org.jetlinks.core.message.firmware.*; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessageReply; +import org.jetlinks.core.message.property.*; +import org.jetlinks.supports.utils.MqttTopicUtils; +import org.springframework.util.Assert; + +import java.util.Map; +import java.util.Optional; + +class JetlinksTopicMessageCodec { + + @Getter + protected class DecodeResult { + private Map args; + + private boolean child; + + private boolean event; + private boolean readPropertyReply; + private boolean writePropertyReply; + private boolean functionInvokeReply; + private boolean reportProperties; + private boolean derivedMetadata; + private boolean register; + private boolean unregister; + + private boolean requestFirmware; + private boolean reportFirmware; + private boolean upgradeFirmwareProgress; + private boolean readFirmwareReply; + + + public DecodeResult(String topic) { + this.topic = topic; + args = MqttTopicUtils.getPathVariables("/{productId}/{deviceId}/**", topic); + if (topic.contains("child")) { + child = true; + args.putAll(MqttTopicUtils.getPathVariables("/**/child/{childDeviceId}/**", topic)); + } + if (topic.contains("event")) { + event = true; + args.putAll(MqttTopicUtils.getPathVariables("/**/event/{eventId}", topic)); + } + derivedMetadata = topic.endsWith("metadata/derived"); + if (event) { + } else if (reportProperties = topic.endsWith("properties/report")) { + } else if (unregister = topic.endsWith("unregister")) { + } else if (register = topic.endsWith("register")) { + } else if (readPropertyReply = topic.endsWith("properties/read/reply")) { + } else if (writePropertyReply = topic.endsWith("properties/write/reply")) { + } else if (functionInvokeReply = topic.endsWith("function/invoke/reply")) { + } else if (upgradeFirmwareProgress = topic.endsWith("firmware/upgrade/progress")) { + } else if (requestFirmware = topic.endsWith("firmware/pull")) { + } else if (reportFirmware = topic.endsWith("firmware/report")) { + } else if (readFirmwareReply = topic.endsWith("firmware/read/reply")) { + } else if (derivedMetadata = topic.endsWith("metadata/derived")) { + } + } + + private final String topic; + + public String getDeviceId() { + return args.get("deviceId"); + } + + public String getChildDeviceId() { + return args.get("childDeviceId"); + } + + protected Message message; + } + + protected EncodedTopic encode(String deviceId, Message message) { + + Assert.hasText(deviceId, "deviceId can not be null"); + Assert.notNull(message, "message can not be null"); + + if (message instanceof ReadPropertyMessage) { + String topic = "/" .concat(deviceId).concat("/properties/read"); + JSONObject mqttData = new JSONObject(); + mqttData.put("messageId", message.getMessageId()); + mqttData.put("properties", ((ReadPropertyMessage) message).getProperties()); + mqttData.put("deviceId", deviceId); + + return new EncodedTopic(topic, mqttData); + } else if (message instanceof WritePropertyMessage) { + String topic = "/" .concat(deviceId).concat("/properties/write"); + JSONObject mqttData = new JSONObject(); + mqttData.put("messageId", message.getMessageId()); + mqttData.put("properties", ((WritePropertyMessage) message).getProperties()); + mqttData.put("deviceId", deviceId); + + return new EncodedTopic(topic, mqttData); + } else if (message instanceof FunctionInvokeMessage) { + String topic = "/" .concat(deviceId).concat("/function/invoke"); + FunctionInvokeMessage invokeMessage = ((FunctionInvokeMessage) message); + JSONObject mqttData = new JSONObject(); + mqttData.put("messageId", message.getMessageId()); + mqttData.put("function", invokeMessage.getFunctionId()); + mqttData.put("inputs", invokeMessage.getInputs()); + mqttData.put("deviceId", deviceId); + + return new EncodedTopic(topic, mqttData); + } else if (message instanceof UpgradeFirmwareMessage) { + String topic = "/" .concat(deviceId).concat("/firmware/upgrade"); + UpgradeFirmwareMessage firmwareMessage = ((UpgradeFirmwareMessage) message); + JSONObject mqttData = new JSONObject(); + mqttData.put("messageId", message.getMessageId()); + mqttData.put("url", firmwareMessage.getUrl()); + mqttData.put("sign", firmwareMessage.getSign()); + mqttData.put("version", firmwareMessage.getVersion()); + mqttData.put("signMethod", firmwareMessage.getSignMethod()); + mqttData.put("parameters", firmwareMessage.getParameters()); + mqttData.put("deviceId", deviceId); + + return new EncodedTopic(topic, mqttData); + } else if (message instanceof ReadFirmwareMessage) { + String topic = "/" .concat(deviceId).concat("/firmware/read"); + JSONObject mqttData = new JSONObject(); + mqttData.put("messageId", message.getMessageId()); + mqttData.put("deviceId", deviceId); + return new EncodedTopic(topic, mqttData); + } else if (message instanceof RequestFirmwareMessageReply) { + String topic = "/" .concat(deviceId).concat("/firmware/pull/reply"); + RequestFirmwareMessageReply firmwareMessage = ((RequestFirmwareMessageReply) message); + JSONObject mqttData = new JSONObject(); + mqttData.put("messageId", message.getMessageId()); + mqttData.put("url", firmwareMessage.getUrl()); + mqttData.put("sign", firmwareMessage.getSign()); + mqttData.put("version", firmwareMessage.getVersion()); + mqttData.put("signMethod", firmwareMessage.getSignMethod()); + mqttData.put("parameters", firmwareMessage.getParameters()); + mqttData.put("deviceId", deviceId); + return new EncodedTopic(topic, mqttData); + } else if (message instanceof ChildDeviceMessage) { + ChildDeviceMessage childDeviceMessage = ((ChildDeviceMessage) message); + EncodedTopic result = encode(childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage()); + String topic = "/" .concat(deviceId).concat("/child").concat(result.topic); + result.payload.put("deviceId", childDeviceMessage.getChildDeviceId()); + + return new EncodedTopic(topic, result.payload); + } + return null; + } + + protected DecodeResult decode(String topic, JSONObject object) { + DecodeResult result = new DecodeResult(topic); + Message message = null; + if (result.isEvent()) { + message = decodeEvent(result, object); + } else if (result.isReportProperties()) { + message = decodeReportPropertyReply(result, object); + } else if (result.isReadPropertyReply()) { + message = decodeReadPropertyReply(result, object); + } else if (result.isWritePropertyReply()) { + message = decodeWritePropertyReply(result, object); + } else if (result.isFunctionInvokeReply()) { + message = decodeInvokeReply(result, object); + } else if (result.isRegister()) { + message = decodeRegister(result, object); + } else if (result.isUnregister()) { + message = decodeUnregister(result, object); + } else if (result.isDerivedMetadata()) { + message = decodeDerivedMetadata(result, object); + } else if (result.isReadFirmwareReply()) { + message = object.toJavaObject(ReadFirmwareMessageReply.class); + } else if (result.isRequestFirmware()) { + message = object.toJavaObject(RequestFirmwareMessage.class); + } else if (result.isReportFirmware()) { + message = object.toJavaObject(ReportFirmwareMessage.class); + } else if (result.isUpgradeFirmwareProgress()) { + message = object.toJavaObject(UpgradeFirmwareProgressMessage.class); + } + + if (result.isChild()) { + if (topic.endsWith("connected")) { + message = object.toJavaObject(DeviceOnlineMessage.class); + } else if (topic.endsWith("disconnect")) { + message = object.toJavaObject(DeviceOfflineMessage.class); + } + if (message == null) { + throw new UnsupportedOperationException("unsupported topic:" + topic); + } + applyCommons(message, result, object); + ChildDeviceMessageReply children = new ChildDeviceMessageReply(); + children.setChildDeviceId(result.getChildDeviceId()); + children.setDeviceId(result.getDeviceId()); + children.setChildDeviceMessage(message); + children.setSuccess(Optional.ofNullable(object.getBoolean("success")).orElse(true)); + children.setTimestamp(Optional.ofNullable(object.getLong("timestamp")).orElse(System.currentTimeMillis())); + Optional.ofNullable(object.getString("messageId")).ifPresent(children::setMessageId); + result.message = children; + } else { + if (message == null) { + throw new UnsupportedOperationException("unsupported topic:" + topic); + } + applyCommons(message, result, object); + result.message = message; + } + return result; + } + + + private Message decodeEvent(DecodeResult result, JSONObject event) { + EventMessage message = event.toJavaObject(EventMessage.class); + message.setData(event.get("data")); + message.setEvent(result.args.get("eventId")); + return message; + } + + private Message decodeReadPropertyReply(DecodeResult result, JSONObject data) { + + return data.toJavaObject(ReadPropertyMessageReply.class); + } + + + private Message decodeReportPropertyReply(DecodeResult result, JSONObject data) { + + return data.toJavaObject(ReportPropertyMessage.class); + } + + + private Message decodeWritePropertyReply(DecodeResult result, JSONObject data) { + + return data.toJavaObject(WritePropertyMessageReply.class); + } + + private Message decodeInvokeReply(DecodeResult result, JSONObject data) { + return data.toJavaObject(FunctionInvokeMessageReply.class); + } + + private Message decodeRegister(DecodeResult result, JSONObject data) { + return data.toJavaObject(DeviceRegisterMessage.class); + } + + private Message decodeUnregister(DecodeResult result, JSONObject data) { + return data.toJavaObject(DeviceUnRegisterMessage.class); + } + + private Message decodeDerivedMetadata(DecodeResult result, JSONObject data) { + return data.toJavaObject(DerivedMetadataMessage.class); + } + + private void applyCommons(Message message, DecodeResult result, JSONObject data) { + if (message instanceof CommonDeviceMessageReply) { + CommonDeviceMessageReply reply = ((CommonDeviceMessageReply) message); + reply.setSuccess(Optional.ofNullable(data.getBoolean("success")).orElse(true)); + reply.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis())); + if (result.isChild()) { + reply.setDeviceId(result.getChildDeviceId()); + } else { + reply.setDeviceId(result.getDeviceId()); + } + } + if (message instanceof CommonDeviceMessage) { + CommonDeviceMessage msg = ((CommonDeviceMessage) message); + msg.setTimestamp(Optional.ofNullable(data.getLong("timestamp")).orElse(System.currentTimeMillis())); + if (result.isChild()) { + msg.setDeviceId(result.getChildDeviceId()); + } else { + msg.setDeviceId(result.getDeviceId()); + } + } + } + + @Getter + @Setter + @AllArgsConstructor + protected class EncodedTopic { + String topic; + + JSONObject payload; + } + + @Getter + @Setter + protected class Decoded { + Message message; + + } + +} diff --git a/src/main/java/org/jetlinks/protocol/official/cipher/Ciphers.java b/src/main/java/org/jetlinks/protocol/official/cipher/Ciphers.java new file mode 100644 index 0000000..e96ec05 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/cipher/Ciphers.java @@ -0,0 +1,56 @@ +package org.jetlinks.protocol.official.cipher; + +import lombok.SneakyThrows; +import org.apache.commons.codec.binary.Base64; + +import javax.crypto.Cipher; +import javax.crypto.spec.SecretKeySpec; +import java.util.Optional; + +public enum Ciphers { + AES { + @SneakyThrows + public byte[] encrypt(byte[] src, String key) { + if (key == null || key.length() != 16) { + throw new IllegalArgumentException("illegal key"); + } + SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES"); + Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); + cipher.init(Cipher.ENCRYPT_MODE, skeySpec); + return cipher.doFinal(src); + } + + @SneakyThrows + public byte[] decrypt(byte[] src, String key) { + if (key == null || key.length() != 16) { + throw new IllegalArgumentException("illegal key"); + } + SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES"); + Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding"); + cipher.init(Cipher.DECRYPT_MODE, skeySpec); + return cipher.doFinal(src); + } + }; + + + public static Optional of(String name) { + try { + return Optional.of(valueOf(name.toUpperCase())); + } catch (Exception e) { + return Optional.empty(); + } + } + + public abstract byte[] encrypt(byte[] src, String key); + + public abstract byte[] decrypt(byte[] src, String key); + + String encryptBase64(String src, String key) { + return Base64.encodeBase64String(encrypt(src.getBytes(), key)); + } + + byte[] decryptBase64(String src, String key) { + return decrypt(Base64.decodeBase64(src), key); + } + +} diff --git a/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider b/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider new file mode 100644 index 0000000..39e2abe --- /dev/null +++ b/src/main/resources/services/org.jetlinks.core.spi.ProtocolSupportProvider @@ -0,0 +1 @@ +org.jetlinks.protocol.official.JetLinksProtocolSupportProvider \ No newline at end of file diff --git a/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java new file mode 100644 index 0000000..00ec80e --- /dev/null +++ b/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java @@ -0,0 +1,112 @@ +package org.jetlinks.protocol.official; + +import lombok.SneakyThrows; +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.CoapServer; +import org.eclipse.californium.core.coap.Request; +import org.eclipse.californium.core.network.CoapEndpoint; +import org.eclipse.californium.core.network.Endpoint; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.eclipse.californium.core.server.resources.Resource; +import org.hswebframework.utils.RandomUtil; +import org.jetlinks.core.defaults.CompositeProtocolSupports; +import org.jetlinks.core.device.DeviceInfo; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.StandaloneDeviceMessageBroker; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.CoapExchangeMessage; +import org.jetlinks.core.message.codec.EncodedMessage; +import org.jetlinks.core.message.codec.MessageDecodeContext; +import org.jetlinks.protocol.official.cipher.Ciphers; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.util.concurrent.atomic.AtomicReference; + +public class JetLinksCoapDeviceMessageCodecTest { + + + JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec(); + + DeviceOperator device; + + private String key = RandomUtil.randomChar(16); + + @Before + public void init() { + TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker()); + device = registry.register(DeviceInfo.builder() + .id("test") + .protocol("jetlinks") + .build()) + .flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator)) + .block(); + } + + @Test + @SneakyThrows + public void test() { + AtomicReference messageRef = new AtomicReference<>(); + + CoapServer server = new CoapServer() { + @Override + protected Resource createRoot() { + return new CoapResource("/", true) { + + @Override + public void handlePOST(CoapExchange exchange) { + codec.decode(new MessageDecodeContext() { + @Nonnull + @Override + public EncodedMessage getMessage() { + return new CoapExchangeMessage(exchange); + } + + @Override + public DeviceOperator getDevice() { + return device; + } + }) + .doOnSuccess(messageRef::set) + .doOnError(Throwable::printStackTrace) + .subscribe(); + } + + @Override + public Resource getChild(String name) { + return this; + } + }; + } + }; + + Endpoint endpoint = new CoapEndpoint.Builder() + .setPort(12345).build(); + server.addEndpoint(endpoint); + server.start(); + + + CoapClient coapClient = new CoapClient(); + + + Request request = Request.newPost(); + String payload = "{\"data\":1}"; + + request.setURI("coap://localhost:12345/test/test/event/event1"); + request.setPayload(Ciphers.AES.encrypt(payload.getBytes(),key)); +// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON); + + CoapResponse response = coapClient.advanced(request); + Assert.assertTrue(response.isSuccess()); + Thread.sleep(1000); + Assert.assertNotNull(messageRef.get()); + + System.out.println(messageRef.get()); + } + + +} \ No newline at end of file diff --git a/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java new file mode 100644 index 0000000..2079c84 --- /dev/null +++ b/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java @@ -0,0 +1,398 @@ +package org.jetlinks.protocol.official; + +import io.netty.buffer.Unpooled; +import org.jetlinks.core.defaults.CompositeProtocolSupports; +import org.jetlinks.core.device.DeviceInfo; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.ProductInfo; +import org.jetlinks.core.device.StandaloneDeviceMessageBroker; +import org.jetlinks.core.message.ChildDeviceMessage; +import org.jetlinks.core.message.ChildDeviceMessageReply; +import org.jetlinks.core.message.DerivedMetadataMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.message.event.EventMessage; +import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionInvokeMessageReply; +import org.jetlinks.core.message.property.*; +import org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; + +public class JetLinksMqttDeviceMessageCodecTest { + + org.jetlinks.supports.official.JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec(); + + + TestDeviceRegistry registry; + + @Before + public void init() { + registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker()); + + registry.register(ProductInfo.builder() + .id("product1") + .protocol("jetlinks") + .build()) + .flatMap(product -> registry.register(DeviceInfo.builder() + .id("device1") + .productId("product1") + .build())) + .subscribe(); + + } + + @Test + public void testReadProperty() { + ReadPropertyMessage message = new ReadPropertyMessage(); + message.setDeviceId("device1"); + message.setMessageId("test"); + message.setProperties(Arrays.asList("name", "sn")); + MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block(); + + + Assert.assertNotNull(encodedMessage); + Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/properties/read"); + System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8)); + } + + @Test + public void testReadChildProperty() { + ReadPropertyMessage message = new ReadPropertyMessage(); + message.setDeviceId("test"); + message.setMessageId("test"); + message.setProperties(Arrays.asList("name", "sn")); + ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage(); + childDeviceMessage.setChildDeviceMessage(message); + childDeviceMessage.setChildDeviceId("test"); + childDeviceMessage.setDeviceId("device1"); + + MqttMessage encodedMessage = codec.encode(createMessageContext(childDeviceMessage)).block(); + + + Assert.assertNotNull(encodedMessage); + Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/read"); + System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8)); + } + + @Test + public void testReadPropertyReply() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/properties/read/reply") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ReadPropertyMessageReply); + ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(reply.getDeviceId(), "device1"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getProperties().get("sn"), "test"); + System.out.println(reply); + } + + @Test + public void testChildReadPropertyReply() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/child/test/properties/read/reply") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ChildDeviceMessageReply); + ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message); + + Assert.assertTrue(childReply.isSuccess()); + Assert.assertEquals(childReply.getDeviceId(),"device1"); + Assert.assertEquals(childReply.getMessageId(),"test"); + + ReadPropertyMessageReply reply = (ReadPropertyMessageReply)childReply.getChildDeviceMessage();; + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(reply.getDeviceId(), "test"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getProperties().get("sn"), "test"); + System.out.println(reply); + } + + @Test + public void testWriteProperty() { + WritePropertyMessage message = new WritePropertyMessage(); + message.setDeviceId("device1"); + message.setMessageId("test"); + message.setProperties(Collections.singletonMap("sn", "123")); + MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block(); + + + Assert.assertNotNull(encodedMessage); + Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/properties/write"); + System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8)); + } + + @Test + public void testChildWriteProperty() { + WritePropertyMessage message = new WritePropertyMessage(); + message.setDeviceId("device1"); + message.setMessageId("test"); + message.setProperties(Collections.singletonMap("sn", "123")); + + ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage(); + childDeviceMessage.setChildDeviceMessage(message); + childDeviceMessage.setChildDeviceId("test"); + childDeviceMessage.setDeviceId("device1"); + + + MqttMessage encodedMessage = codec.encode(createMessageContext(childDeviceMessage)).block(); + + + Assert.assertNotNull(encodedMessage); + Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/write"); + System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8)); + } + + @Test + public void testWritePropertyReply() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/properties/write/reply") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof WritePropertyMessageReply); + WritePropertyMessageReply reply = ((WritePropertyMessageReply) message); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(reply.getDeviceId(), "device1"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getProperties().get("sn"), "test"); + System.out.println(reply); + } + + + + @Test + public void testWriteChildPropertyReply() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/child/test/properties/write/reply") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ChildDeviceMessageReply); + ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message); + + Assert.assertTrue(childReply.isSuccess()); + Assert.assertEquals(childReply.getDeviceId(),"device1"); + Assert.assertEquals(childReply.getMessageId(),"test"); + + WritePropertyMessageReply reply = (WritePropertyMessageReply)childReply.getChildDeviceMessage();; + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(reply.getDeviceId(), "test"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getProperties().get("sn"), "test"); + System.out.println(reply); + } + + + @Test + public void testInvokeFunction() { + FunctionInvokeMessage message = new FunctionInvokeMessage(); + message.setDeviceId("device1"); + message.setMessageId("test"); + message.setFunctionId("playVoice"); + message.addInput("file", "http://baidu.com/1.mp3"); + + MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block(); + + + Assert.assertNotNull(encodedMessage); + Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/function/invoke"); + System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8)); + } + + @Test + public void testFirmwareUpgrade() { + UpgradeFirmwareMessage message = new UpgradeFirmwareMessage(); + message.setDeviceId("device1"); + message.setMessageId("test"); + message.setVersion("1.0"); + message.setUrl("http://baidu.com/1.mp3"); + + MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block(); + + + Assert.assertNotNull(encodedMessage); + Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/firmware/upgrade"); + System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8)); + } + + @Test + public void testInvokeFunctionReply() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/function/invoke/reply") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof FunctionInvokeMessageReply); + FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(reply.getDeviceId(), "device1"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getOutput(), "ok"); + System.out.println(reply); + } + + @Test + public void testInvokeChildFunctionReply() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/child/test/function/invoke/reply") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ChildDeviceMessageReply); + ChildDeviceMessageReply childReply = ((ChildDeviceMessageReply) message); + + Assert.assertTrue(childReply.isSuccess()); + Assert.assertEquals(childReply.getDeviceId(),"device1"); + Assert.assertEquals(childReply.getMessageId(),"test"); + + FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply)childReply.getChildDeviceMessage();; + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(reply.getDeviceId(), "test"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getOutput(), "ok"); + System.out.println(reply); + } + + @Test + public void testEvent() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/event/temp") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof EventMessage); + EventMessage reply = ((EventMessage) message); + Assert.assertEquals(reply.getDeviceId(), "device1"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getData(), 100); + System.out.println(reply); + } + + @Test + public void testChildEvent() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/child/test/event/temp") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ChildDeviceMessageReply); + + EventMessage reply = ((EventMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage()); + Assert.assertEquals(reply.getDeviceId(), "test"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getData(), 100); + System.out.println(reply); + } + + @Test + public void testPropertiesReport() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/properties/report") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ReportPropertyMessage); + ReportPropertyMessage reply = ((ReportPropertyMessage) message); + Assert.assertEquals(reply.getDeviceId(), "device1"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test")); + System.out.println(reply); + } + + + @Test + public void testChildPropertiesReport() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/child/test/properties/report") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ChildDeviceMessageReply); + + ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage()); + Assert.assertEquals(reply.getDeviceId(), "test"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test")); + System.out.println(reply); + } + + + @Test + public void testMetadataDerived() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/metadata/derived") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof DerivedMetadataMessage); + DerivedMetadataMessage reply = ((DerivedMetadataMessage) message); + Assert.assertEquals(reply.getDeviceId(), "device1"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getMetadata(), "1"); + System.out.println(reply); + } + + @Test + public void testChildMetadataDerived() { + Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder() + .topic("/product1/device1/child/test/metadata/derived") + .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes())) + .build())).block(); + + Assert.assertTrue(message instanceof ChildDeviceMessageReply); + + DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage()); + Assert.assertEquals(reply.getDeviceId(), "test"); + Assert.assertEquals(reply.getMessageId(), "test"); + Assert.assertEquals(reply.getMetadata(), "1"); + System.out.println(reply); + } + + public MessageEncodeContext createMessageContext(Message message) { + System.out.println(message.toString()); + return new MessageEncodeContext() { + @Nonnull + @Override + public Message getMessage() { + return message; + } + + @Override + public DeviceOperator getDevice() { + return registry.getDevice("device1").block(); + } + }; + } + + + public MessageDecodeContext createMessageContext(EncodedMessage message) { + System.out.println(message.toString()); + return new MessageDecodeContext() { + @Nonnull + @Override + public EncodedMessage getMessage() { + return message; + } + + @Override + public DeviceOperator getDevice() { + return registry.getDevice("device1").block(); + } + }; + } + + +} \ No newline at end of file diff --git a/src/test/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodecTest.java new file mode 100644 index 0000000..e34950c --- /dev/null +++ b/src/test/java/org/jetlinks/protocol/official/JetlinksTopicMessageCodecTest.java @@ -0,0 +1,24 @@ +package org.jetlinks.protocol.official; + +import org.jetlinks.core.message.property.ReadPropertyMessage; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class JetlinksTopicMessageCodecTest { + + JetlinksTopicMessageCodec codec = new JetlinksTopicMessageCodec(); + + @Test + public void testReadProperty() { + + ReadPropertyMessage readProperty = new ReadPropertyMessage(); + readProperty.setProperties(Arrays.asList("name")); + readProperty.setMessageId("test"); + JetlinksTopicMessageCodec.EncodedTopic topic = codec.encode("test", readProperty); + Assert.assertEquals(topic.getTopic(),"/test/properties/read"); + + } + +} \ No newline at end of file diff --git a/src/test/java/org/jetlinks/protocol/official/TestDeviceRegistry.java b/src/test/java/org/jetlinks/protocol/official/TestDeviceRegistry.java new file mode 100644 index 0000000..a8ba594 --- /dev/null +++ b/src/test/java/org/jetlinks/protocol/official/TestDeviceRegistry.java @@ -0,0 +1,80 @@ +package org.jetlinks.protocol.official; + +import org.jetlinks.core.ProtocolSupports; +import org.jetlinks.core.config.ConfigStorageManager; +import org.jetlinks.core.defaults.DefaultDeviceOperator; +import org.jetlinks.core.defaults.DefaultDeviceProductOperator; +import org.jetlinks.core.device.*; +import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; +import org.jetlinks.supports.config.InMemoryConfigStorageManager; +import reactor.core.publisher.Mono; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TestDeviceRegistry implements DeviceRegistry { + + private DeviceMessageSenderInterceptor interceptor = new CompositeDeviceMessageSenderInterceptor(); + + private ConfigStorageManager manager = new InMemoryConfigStorageManager(); + + private Map operatorMap = new ConcurrentHashMap<>(); + + private Map productOperatorMap = new ConcurrentHashMap<>(); + + private ProtocolSupports supports; + + private DeviceOperationBroker handler; + + public TestDeviceRegistry(ProtocolSupports supports, DeviceOperationBroker handler) { + this.supports = supports; + this.handler = handler; + } + + @Override + public Mono getDevice(String deviceId) { + return Mono.fromSupplier(() -> operatorMap.get(deviceId)); + } + + @Override + public Mono getProduct(String productId) { + return Mono.fromSupplier(() -> productOperatorMap.get(productId)); + } + + @Override + public Mono register(DeviceInfo deviceInfo) { + DefaultDeviceOperator operator = new DefaultDeviceOperator( + deviceInfo.getId(), + supports, manager, handler, this + ); + operatorMap.put(operator.getDeviceId(), operator); + return operator.setConfigs( + DeviceConfigKey.productId.value(deviceInfo.getProductId()), + DeviceConfigKey.protocol.value(deviceInfo.getProtocol())) + .thenReturn(operator); + } + + @Override + public Mono register(ProductInfo productInfo) { + DefaultDeviceProductOperator operator = new DefaultDeviceProductOperator(productInfo.getId(), supports, manager); + productOperatorMap.put(operator.getId(), operator); + return operator.setConfigs( + DeviceConfigKey.productId.value(productInfo.getMetadata()), + DeviceConfigKey.protocol.value(productInfo.getProtocol())) + .thenReturn(operator); + } + + @Override + public Mono unregisterDevice(String deviceId) { + return Mono.justOrEmpty(deviceId) + .map(operatorMap::remove) + .then(); + } + + @Override + public Mono unregisterProduct(String productId) { + return Mono.justOrEmpty(productId) + .map(productOperatorMap::remove) + .then(); + } +}