This commit is contained in:
zhouhao 2022-03-11 09:29:34 +08:00
parent f670bf0bb5
commit d344dba0f1
4 changed files with 185 additions and 27 deletions

View File

@ -6,17 +6,17 @@
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId>
<version>2.0-SNAPSHOT</version>
<version>3.0-SNAPSHOT</version>
<name>JetLinks</name>
<url>http://jetlinks.org</url>
<url>https://jetlinks.org</url>
<inceptionYear>2019</inceptionYear>
<description>JetLinks 物联网平台</description>
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
@ -158,7 +158,7 @@
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.1.7</version>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -10,17 +10,22 @@ import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.core.io.ClassPathResource;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
"MQTT认证配置"
, "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" +
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
.add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType());
@ -46,9 +51,19 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport();
support.setId("jetlinks.v2.0");
support.setName("JetLinks V2.0");
support.setDescription("JetLinks Protocol Version 2.0");
support.setId("jetlinks.v3.0");
support.setName("JetLinks V3.0");
support.setDescription("JetLinks Protocol Version 3.0");
support.addRoutes(DefaultTransport.MQTT, Arrays
.stream(TopicMessageCodec.values())
.map(TopicMessageCodec::getRoute)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.MQTT,
"document-mqtt.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.addAuthenticator(DefaultTransport.MQTT_TLS, new JetLinksAuthenticator());

View File

@ -11,6 +11,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -19,12 +20,69 @@ import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Function;
public enum TopicMessageCodec {
//上报属性数据
reportProperty("/*/properties/report", ReportPropertyMessage.class),
reportProperty("/*/properties/report",
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性
readProperty("/*/properties/read",
ReadPropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
//读取属性回复
readPropertyReply("/*/properties/read/reply",
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性
writeProperty("/*/properties/write",
WritePropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性回复
writePropertyReply("/*/properties/write/reply",
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//事件上报
event("/*/event/*", EventMessage.class) {
event("/*/event/*",
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{eventId:事件ID}";
}
@Override
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String event = topic[topic.length - 1];
@ -42,20 +100,42 @@ public enum TopicMessageCodec {
topics[topics.length - 1] = String.valueOf(event.getEvent());
}
},
//读取属性
readProperty("/*/properties/read", ReadPropertyMessage.class),
//读取属性回复
readPropertyReply("/*/properties/read/reply", ReadPropertyMessageReply.class),
//修改属性
writeProperty("/*/properties/write", WritePropertyMessage.class),
//修改属性回复
writePropertyReply("/*/properties/write/reply", WritePropertyMessageReply.class),
//调用功能
functionInvoke("/*/function/invoke", FunctionInvokeMessage.class),
functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
//调用功能回复
functionInvokeReply("/*/function/invoke/reply", FunctionInvokeMessageReply.class),
functionInvokeReply("/*/function/invoke/reply",
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
//子设备消息
child("/*/child/*/**", ChildDeviceMessage.class) {
child("/*/child/*/**",
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
@ -91,7 +171,19 @@ public enum TopicMessageCodec {
}
}, //子设备消息回复
childReply("/*/child-reply/*/**", ChildDeviceMessageReply.class) {
childReply("/*/child-reply/*/**",
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
@ -128,7 +220,13 @@ public enum TopicMessageCodec {
}
},
//更新标签
updateTag("/*/tags", UpdateTagMessage.class),
updateTag("/*/tags",
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
//注册
register("/*/register", DeviceRegisterMessage.class),
//注销
@ -174,14 +272,48 @@ public enum TopicMessageCodec {
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
;
TopicMessageCodec(String topic, Class<? extends DeviceMessage> type) {
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type,
Function<MqttRoute.Builder, MqttRoute.Builder> routeCustom) {
this.pattern = topic.split("/");
this.type = type;
this.route = routeCustom.apply(toRoute()).build();
}
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
this.route = null;
}
private final String[] pattern;
private final MqttRoute route;
private final Class<? extends DeviceMessage> type;
protected void transMqttTopic(String[] topic) {
}
@SneakyThrows
private MqttRoute.Builder toRoute() {
String[] topics = new String[1 + pattern.length];
topics[0] = "{productId:产品ID}";
System.arraycopy(pattern, 0, topics, 1, pattern.length);
topics[1] = "{deviceId:设备ID}";
transMqttTopic(topics);
StringJoiner joiner = new StringJoiner("/", "/", "");
for (String topic : topics) {
joiner.add(topic);
}
return MqttRoute
.builder(joiner.toString());
}
public MqttRoute getRoute() {
return route;
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
return Mono
.justOrEmpty(fromTopic(topics))

View File

@ -0,0 +1,11 @@
# MQTT认证说明
CONNECT报文:
```text
clientId: 设备ID
username: secureId+"|"+timestamp
password: md5(secureId+"|"+timestamp+"|"+secureKey)
```
说明: secureId以及secureKey在创建设备产品或设备实例时进行配置.
timestamp为当前系统时间戳(毫秒),与系统时间不能相差5分钟.
md5为32位,不区分大小写.