增加http 解析

This commit is contained in:
zhouhao 2022-06-21 17:02:17 +08:00
parent 8e0faae4db
commit 07b7bdf16d
6 changed files with 236 additions and 25 deletions

View File

@ -12,6 +12,8 @@ import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
@ -23,6 +25,15 @@ import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP_DTLS;

View File

@ -10,6 +10,10 @@ import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
@ -20,6 +24,13 @@ import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
public static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
@Override
public Transport getSupportTransport() {

View File

@ -3,19 +3,21 @@ package org.jetlinks.protocol.official;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
@ -29,22 +31,6 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
.add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) {
@ -65,19 +51,46 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
"document-mqtt.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addRoutes(DefaultTransport.HTTP, Stream
.of(TopicMessageCodec.reportProperty,
TopicMessageCodec.event,
TopicMessageCodec.online,
TopicMessageCodec.offline)
.map(TopicMessageCodec::getRoute)
.filter(route -> route != null && route.isUpstream())
.map(route -> HttpRoute
.builder()
.address(route.getTopic())
.group(route.getGroup())
.contentType(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST)
.description(route.getDescription())
.example(route.getExample())
.build())
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.HTTP,
"document-http.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
support.addConfigMetadata(DefaultTransport.CoAP, coapConfig);
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT));
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS));
//MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());
//HTTP
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//CoAP
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec());
return Mono.just(support);
});

View File

@ -262,7 +262,7 @@ public enum TopicMessageCodec {
//断开连接回复
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
//上线
connect("/*/online", DeviceOnlineMessage.class, builder -> builder
online("/*/online", DeviceOnlineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备上线")),

View File

@ -0,0 +1,147 @@
package org.jetlinks.protocol.official.http;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.protocol.official.FunctionalTopicHandlers;
import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec;
import org.jetlinks.protocol.official.TopicPayload;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Objects;
/**
* Http 的消息编解码器
*
* @author zhouhao
* @since 3.0.0
*/
@Slf4j
public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置"
, "使用HTTP Bearer Token进行认证")
.add("bearer_token", "Token", "Token", new PasswordType());
private final Transport transport;
public JetLinksHttpDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public JetLinksHttpDeviceMessageCodec() {
this(DefaultTransport.HTTP);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
//http 暂不支持下发
return Mono.empty();
}
private static final SimpleHttpResponseMessage unauthorized = SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":true,\"code\":\"unauthorized\"}")
.status(401)
.build();
private static final SimpleHttpResponseMessage badRequest = SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"bad_request\"}")
.status(400)
.build();
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
ObjectMapper mapper = ObjectMappers.JSON_MAPPER;
Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null);
if (header == null || header.getValue() == null || header.getValue().length == 0) {
return message
.response(unauthorized)
.thenMany(Mono.empty());
}
String[] token = header.getValue()[0].split(" ");
if (token.length == 1) {
return message
.response(unauthorized)
.thenMany(Mono.empty());
}
String basicToken = token[1];
String[] paths = TopicMessageCodec.removeProductPath(message.getPath());
if (paths.length < 1) {
return message
.response(badRequest)
.thenMany(Mono.empty());
}
String deviceId = paths[1];
return context
.getDevice(deviceId)
.flatMap(device -> device.getConfig("bearer_token"))
.filter(value -> Objects.equals(value.asString(), basicToken))
.flatMapMany(ignore -> TopicMessageCodec.decode(mapper, paths, payload))
.switchOnFirst((s, flux) -> {
Mono<Void> handler;
//有结果则认为成功
if (s.hasValue()) {
handler = message.ok("{\"success\":true}");
} else {
return message
.response(unauthorized)
.then(Mono.error(new BusinessException("设备[" + deviceId + "]未激活或token [" + basicToken + "]错误")));
}
return handler.thenMany(flux);
})
.onErrorResume(err -> message
.error(500, getErrorMessage(err))
.then(Mono.error(err)))
//跟踪信息
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(deviceId),
builder -> builder.setAttribute(DeviceTracer.SpanKey.message, message.print())));
}
public String getErrorMessage(Throwable err) {
if (err instanceof JsonParseException) {
return "{\"success\":false,\"code\":\"request_body_format_error\"}";
}
return "{\"success\":false,\"code\":\"server_error\"}";
}
}

View File

@ -0,0 +1,29 @@
#### 使用HTTP推送设备数据
上报属性例子:
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
上报事件例子:
```http request
POST /{productId}/{deviceId}/event/{eventId}
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"data":{
"address": ""
}
}
```