diff --git a/pom.xml b/pom.xml index 751f81a..57a6c6d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,17 +6,17 @@ org.jetlinks jetlinks-official-protocol - 2.0-SNAPSHOT + 3.0-SNAPSHOT JetLinks - http://jetlinks.org + https://jetlinks.org 2019 JetLinks 物联网平台 The Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt + https://www.apache.org/licenses/LICENSE-2.0.txt @@ -158,7 +158,7 @@ org.jetlinks jetlinks-supports - 1.1.7 + 1.2.0-SNAPSHOT diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java index 8769b0b..04997d9 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java @@ -10,17 +10,22 @@ import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; +import org.springframework.core.io.ClassPathResource; import reactor.core.publisher.Mono; +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Collectors; + public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider { private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata( "MQTT认证配置" , "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" + - "username=secureId|timestamp\n" + - "password=md5(secureId|timestamp|secureKey)\n" + - "\n" + - "timestamp为时间戳,与服务时间不能相差5分钟") + "username=secureId|timestamp\n" + + "password=md5(secureId|timestamp|secureKey)\n" + + "\n" + + "timestamp为时间戳,与服务时间不能相差5分钟") .add("secureId", "secureId", "密钥ID", new StringType()) .add("secureKey", "secureKey", "密钥KEY", new PasswordType()); @@ -46,9 +51,19 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider return Mono.defer(() -> { CompositeProtocolSupport support = new CompositeProtocolSupport(); - support.setId("jetlinks.v2.0"); - support.setName("JetLinks V2.0"); - support.setDescription("JetLinks Protocol Version 2.0"); + support.setId("jetlinks.v3.0"); + support.setName("JetLinks V3.0"); + support.setDescription("JetLinks Protocol Version 3.0"); + + support.addRoutes(DefaultTransport.MQTT, Arrays + .stream(TopicMessageCodec.values()) + .map(TopicMessageCodec::getRoute) + .filter(Objects::nonNull) + .collect(Collectors.toList()) + ); + support.setDocument(DefaultTransport.MQTT, + "document-mqtt.md", + JetLinksProtocolSupportProvider.class.getClassLoader()); support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator()); support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator()); diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index 21a90a2..e4aec67 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -11,6 +11,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply; import org.jetlinks.core.message.property.*; import org.jetlinks.core.message.state.DeviceStateCheckMessage; import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; +import org.jetlinks.core.route.MqttRoute; import org.jetlinks.core.utils.TopicUtils; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -19,12 +20,69 @@ import reactor.core.publisher.Mono; import java.util.Arrays; import java.util.Collections; import java.util.Optional; +import java.util.StringJoiner; +import java.util.function.Function; public enum TopicMessageCodec { //上报属性数据 - reportProperty("/*/properties/report", ReportPropertyMessage.class), + reportProperty("/*/properties/report", + ReportPropertyMessage.class, + route -> route + .upstream(true) + .downstream(false) + .group("属性上报") + .description("上报物模型属性数据") + .example("{\"properties\":{\"属性ID\":\"属性值\"}}")), + //读取属性 + readProperty("/*/properties/read", + ReadPropertyMessage.class, + route -> route + .upstream(false) + .downstream(true) + .group("读取属性") + .description("平台下发读取物模型属性数据指令") + .example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")), + //读取属性回复 + readPropertyReply("/*/properties/read/reply", + ReadPropertyMessageReply.class, + route -> route + .upstream(true) + .downstream(false) + .group("读取属性") + .description("对平台下发的读取属性指令进行响应") + .example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")), + //修改属性 + writeProperty("/*/properties/write", + WritePropertyMessage.class, + route -> route + .upstream(false) + .downstream(true) + .group("修改属性") + .description("平台下发修改物模型属性数据指令") + .example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")), + //修改属性回复 + writePropertyReply("/*/properties/write/reply", + WritePropertyMessageReply.class, + route -> route + .upstream(true) + .downstream(false) + .group("修改属性") + .description("对平台下发的修改属性指令进行响应") + .example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")), //事件上报 - event("/*/event/*", EventMessage.class) { + event("/*/event/*", + EventMessage.class, + route -> route + .upstream(true) + .downstream(false) + .group("事件上报") + .description("上报物模型事件数据") + .example("{\"data\":{\"key\":\"value\"}}")) { + @Override + protected void transMqttTopic(String[] topic) { + topic[topic.length - 1] = "{eventId:事件ID}"; + } + @Override Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { String event = topic[topic.length - 1]; @@ -42,20 +100,42 @@ public enum TopicMessageCodec { topics[topics.length - 1] = String.valueOf(event.getEvent()); } }, - //读取属性 - readProperty("/*/properties/read", ReadPropertyMessage.class), - //读取属性回复 - readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class), - //修改属性 - writeProperty("/*/properties/write", WritePropertyMessage.class), - //修改属性回复 - writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class), + //调用功能 - functionInvoke("/*/function/invoke", FunctionInvokeMessage.class), + functionInvoke("/*/function/invoke", + FunctionInvokeMessage.class, + route -> route + .upstream(false) + .downstream(true) + .group("调用功能") + .description("平台下发功能调用指令") + .example("{\"messageId\":\"消息ID,回复时需要一致.\"," + + "\"functionId\":\"功能标识\"," + + "\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")), //调用功能回复 - functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class), + functionInvokeReply("/*/function/invoke/reply", + FunctionInvokeMessageReply.class, + route -> route + .upstream(true) + .downstream(false) + .group("调用功能") + .description("设备响应平台下发的功能调用指令") + .example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," + + "\"output\":\"输出结果,格式与物模型中定义的类型一致\"")), //子设备消息 - child("/*/child/*/**", ChildDeviceMessage.class) { + child("/*/child/*/**", + ChildDeviceMessage.class, + route -> route + .upstream(true) + .downstream(true) + .group("子设备消息") + .description("网关上报或者平台下发子设备消息")) { + @Override + protected void transMqttTopic(String[] topic) { + topic[topic.length - 1] = "{#:子设备相应操作的topic}"; + topic[topic.length - 2] = "{childDeviceId:子设备ID}"; + } + @Override public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); @@ -91,7 +171,19 @@ public enum TopicMessageCodec { } }, //子设备消息回复 - childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) { + childReply("/*/child-reply/*/**", + ChildDeviceMessageReply.class, + route -> route + .upstream(true) + .downstream(true) + .group("子设备消息") + .description("网关回复平台下发给子设备的指令结果")) { + @Override + protected void transMqttTopic(String[] topic) { + topic[topic.length - 1] = "{#:子设备相应操作的topic}"; + topic[topic.length - 2] = "{childDeviceId:子设备ID}"; + } + @Override public Publisher doDecode(ObjectMapper mapper, String[] topic, byte[] payload) { String[] _topic = Arrays.copyOfRange(topic, 2, topic.length); @@ -128,7 +220,13 @@ public enum TopicMessageCodec { } }, //更新标签 - updateTag("/*/tags", UpdateTagMessage.class), + updateTag("/*/tags", + UpdateTagMessage.class, + route -> route.upstream(true) + .downstream(false) + .group("更新标签") + .description("更新标签数据") + .example("{\"tags\":{\"key\",\"value\"}}")), //注册 register("/*/register", DeviceRegisterMessage.class), //注销 @@ -174,14 +272,48 @@ public enum TopicMessageCodec { stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class), ; - TopicMessageCodec(String topic, Class type) { + TopicMessageCodec(String topic, + Class type, + Function routeCustom) { this.pattern = topic.split("/"); this.type = type; + this.route = routeCustom.apply(toRoute()).build(); + } + + TopicMessageCodec(String topic, + Class type) { + this.pattern = topic.split("/"); + this.type = type; + this.route = null; } private final String[] pattern; + private final MqttRoute route; private final Class type; + protected void transMqttTopic(String[] topic) { + + } + + @SneakyThrows + private MqttRoute.Builder toRoute() { + String[] topics = new String[1 + pattern.length]; + topics[0] = "{productId:产品ID}"; + System.arraycopy(pattern, 0, topics, 1, pattern.length); + topics[1] = "{deviceId:设备ID}"; + transMqttTopic(topics); + StringJoiner joiner = new StringJoiner("/", "/", ""); + for (String topic : topics) { + joiner.add(topic); + } + return MqttRoute + .builder(joiner.toString()); + } + + public MqttRoute getRoute() { + return route; + } + public static Flux decode(ObjectMapper mapper, String[] topics, byte[] payload) { return Mono .justOrEmpty(fromTopic(topics)) diff --git a/src/main/resources/document-mqtt.md b/src/main/resources/document-mqtt.md new file mode 100644 index 0000000..62f93b5 --- /dev/null +++ b/src/main/resources/document-mqtt.md @@ -0,0 +1,11 @@ +# MQTT认证说明 +CONNECT报文: +```text +clientId: 设备ID +username: secureId+"|"+timestamp +password: md5(secureId+"|"+timestamp+"|"+secureKey) + ``` + +说明: secureId以及secureKey在创建设备产品或设备实例时进行配置. +timestamp为当前系统时间戳(毫秒),与系统时间不能相差5分钟. +md5为32位,不区分大小写. \ No newline at end of file