diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java index 66f6719..248bffa 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java @@ -12,6 +12,8 @@ import org.jetlinks.core.message.codec.CoapMessage; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.MessageDecodeContext; import org.jetlinks.core.message.codec.Transport; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.types.PasswordType; import org.springframework.http.MediaType; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; @@ -23,6 +25,15 @@ import java.util.function.Consumer; @Slf4j public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { + + private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata( + "CoAP DTLS配置", + "使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" + + "之后上报数据需要在Option中携带token信息. \n" + + "自定义Option: 2110,sign ; 2111,token ") + .add("secureKey", "密钥", "认证签名密钥", new PasswordType()); + + @Override public Transport getSupportTransport() { return DefaultTransport.CoAP_DTLS; diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java index 79ebe9e..02b3219 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java @@ -10,6 +10,10 @@ import org.jetlinks.core.message.codec.CoapMessage; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.MessageDecodeContext; import org.jetlinks.core.message.codec.Transport; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.DeviceConfigScope; +import org.jetlinks.core.metadata.types.EnumType; +import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.protocol.official.cipher.Ciphers; import org.springframework.http.MediaType; import org.springframework.util.StringUtils; @@ -20,6 +24,13 @@ import java.util.function.Consumer; @Slf4j public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { + public static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata( + "CoAP认证配置", + "使用CoAP进行数据上报时,需要对数据进行加密:" + + "encrypt(payload,secureKey);") + .add("encAlg", "加密算法", "加密算法", new EnumType() + .addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product) + .add("secureKey", "密钥", "16位密钥KEY", new PasswordType()); @Override public Transport getSupportTransport() { diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java index 496d604..54dea46 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java @@ -3,19 +3,21 @@ package org.jetlinks.protocol.official; import org.jetlinks.core.defaults.CompositeProtocolSupport; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.metadata.DefaultConfigMetadata; -import org.jetlinks.core.metadata.DeviceConfigScope; -import org.jetlinks.core.metadata.types.EnumType; import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.route.HttpRoute; import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ServiceContext; +import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; -import org.springframework.core.io.ClassPathResource; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; import reactor.core.publisher.Mono; import java.util.Arrays; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider { @@ -29,22 +31,6 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider .add("secureId", "secureId", "密钥ID", new StringType()) .add("secureKey", "secureKey", "密钥KEY", new PasswordType()); - private static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata( - "CoAP认证配置", - "使用CoAP进行数据上报时,需要对数据进行加密:" + - "encrypt(payload,secureKey);") - .add("encAlg", "加密算法", "加密算法", new EnumType() - .addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product) - .add("secureKey", "密钥", "16位密钥KEY", new PasswordType()); - - private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata( - "CoAP DTLS配置", - "使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" + - "之后上报数据需要在Option中携带token信息. \n" + - "自定义Option: 2110,sign ; 2111,token ") - .add("secureKey", "密钥", "认证签名密钥", new PasswordType()); - - @Override public Mono create(ServiceContext context) { @@ -65,19 +51,46 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider "document-mqtt.md", JetLinksProtocolSupportProvider.class.getClassLoader()); + support.addRoutes(DefaultTransport.HTTP, Stream + .of(TopicMessageCodec.reportProperty, + TopicMessageCodec.event, + TopicMessageCodec.online, + TopicMessageCodec.offline) + .map(TopicMessageCodec::getRoute) + .filter(route -> route != null && route.isUpstream()) + .map(route -> HttpRoute + .builder() + .address(route.getTopic()) + .group(route.getGroup()) + .contentType(MediaType.APPLICATION_JSON) + .method(HttpMethod.POST) + .description(route.getDescription()) + .example(route.getExample()) + .build()) + .collect(Collectors.toList()) + ); + + support.setDocument(DefaultTransport.HTTP, + "document-http.md", + JetLinksProtocolSupportProvider.class.getClassLoader()); + + support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator()); support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig); - support.addConfigMetadata(DefaultTransport.CoAP, coapConfig); + support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig); + support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig); - support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT)); - support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec(DefaultTransport.MQTT_TLS)); + //MQTT + support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec()); + //HTTP + support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec()); + + //CoAP support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec()); - support.addMessageCodecSupport(new JetLinksCoapDTLSDeviceMessageCodec()); - return Mono.just(support); }); diff --git a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java index a4ec3f4..a814b1e 100644 --- a/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/TopicMessageCodec.java @@ -262,7 +262,7 @@ public enum TopicMessageCodec { //断开连接回复 disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class), //上线 - connect("/*/online", DeviceOnlineMessage.class, builder -> builder + online("/*/online", DeviceOnlineMessage.class, builder -> builder .upstream(true) .group("状态管理") .description("设备上线")), diff --git a/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java new file mode 100644 index 0000000..9dde42a --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java @@ -0,0 +1,147 @@ +package org.jetlinks.protocol.official.http; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; +import org.hswebframework.web.exception.BusinessException; +import org.jetlinks.core.device.DeviceConfigKey; +import org.jetlinks.core.enums.ErrorCode; +import org.jetlinks.core.exception.DeviceOperationException; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DisconnectDeviceMessage; +import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.message.codec.http.Header; +import org.jetlinks.core.message.codec.http.HttpExchangeMessage; +import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.types.PasswordType; +import org.jetlinks.core.trace.DeviceTracer; +import org.jetlinks.core.trace.FluxTracer; +import org.jetlinks.protocol.official.FunctionalTopicHandlers; +import org.jetlinks.protocol.official.ObjectMappers; +import org.jetlinks.protocol.official.TopicMessageCodec; +import org.jetlinks.protocol.official.TopicPayload; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.util.Objects; + +/** + * Http 的消息编解码器 + * + * @author zhouhao + * @since 3.0.0 + */ +@Slf4j +public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { + public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata( + "HTTP认证配置" + , "使用HTTP Bearer Token进行认证") + .add("bearer_token", "Token", "Token", new PasswordType()); + + private final Transport transport; + + public JetLinksHttpDeviceMessageCodec(Transport transport) { + this.transport = transport; + } + + public JetLinksHttpDeviceMessageCodec() { + this(DefaultTransport.HTTP); + } + + @Override + public Transport getSupportTransport() { + return transport; + } + + @Nonnull + public Mono encode(@Nonnull MessageEncodeContext context) { + //http 暂不支持下发 + return Mono.empty(); + } + + private static final SimpleHttpResponseMessage unauthorized = SimpleHttpResponseMessage + .builder() + .contentType(MediaType.APPLICATION_JSON) + .body("{\"success\":true,\"code\":\"unauthorized\"}") + .status(401) + .build(); + + private static final SimpleHttpResponseMessage badRequest = SimpleHttpResponseMessage + .builder() + .contentType(MediaType.APPLICATION_JSON) + .body("{\"success\":false,\"code\":\"bad_request\"}") + .status(400) + .build(); + + @Nonnull + @Override + public Flux decode(@Nonnull MessageDecodeContext context) { + HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage(); + byte[] payload = message.payloadAsBytes(); + ObjectMapper mapper = ObjectMappers.JSON_MAPPER; + + Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null); + if (header == null || header.getValue() == null || header.getValue().length == 0) { + return message + .response(unauthorized) + .thenMany(Mono.empty()); + } + + String[] token = header.getValue()[0].split(" "); + if (token.length == 1) { + return message + .response(unauthorized) + .thenMany(Mono.empty()); + } + String basicToken = token[1]; + + String[] paths = TopicMessageCodec.removeProductPath(message.getPath()); + if (paths.length < 1) { + return message + .response(badRequest) + .thenMany(Mono.empty()); + } + String deviceId = paths[1]; + return context + .getDevice(deviceId) + .flatMap(device -> device.getConfig("bearer_token")) + .filter(value -> Objects.equals(value.asString(), basicToken)) + .flatMapMany(ignore -> TopicMessageCodec.decode(mapper, paths, payload)) + .switchOnFirst((s, flux) -> { + Mono handler; + //有结果则认为成功 + if (s.hasValue()) { + handler = message.ok("{\"success\":true}"); + } else { + return message + .response(unauthorized) + .then(Mono.error(new BusinessException("设备[" + deviceId + "]未激活或token [" + basicToken + "]错误"))); + } + return handler.thenMany(flux); + }) + .onErrorResume(err -> message + .error(500, getErrorMessage(err)) + .then(Mono.error(err))) + //跟踪信息 + .as(FluxTracer + .create(DeviceTracer.SpanName.decode(deviceId), + builder -> builder.setAttribute(DeviceTracer.SpanKey.message, message.print()))); + + } + + public String getErrorMessage(Throwable err) { + if (err instanceof JsonParseException) { + return "{\"success\":false,\"code\":\"request_body_format_error\"}"; + } + return "{\"success\":false,\"code\":\"server_error\"}"; + } + +} diff --git a/src/main/resources/document-http.md b/src/main/resources/document-http.md new file mode 100644 index 0000000..5a6b4c5 --- /dev/null +++ b/src/main/resources/document-http.md @@ -0,0 +1,29 @@ +#### 使用HTTP推送设备数据 + +上报属性例子: + +```http request +POST /{productId}/{deviceId}/properties/report +Authorization: Bearer {产品或者设备中配置的Token} +Content-Type: application/json + +{ + "properties":{ + "temp":38.5 + } +} +``` + +上报事件例子: + +```http request +POST /{productId}/{deviceId}/event/{eventId} +Authorization: Bearer {产品或者设备中配置的Token} +Content-Type: application/json + +{ + "data":{ + "address": "" + } +} +``` \ No newline at end of file