diff --git a/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java index 3bc4bcb..debc4e0 100644 --- a/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java @@ -1,17 +1,9 @@ package org.jetlinks.protocol.official.http; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBufUtil; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.binary.Base64; -import org.hswebframework.web.exception.BusinessException; -import org.jetlinks.core.device.DeviceConfigKey; -import org.jetlinks.core.enums.ErrorCode; -import org.jetlinks.core.exception.DeviceOperationException; import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.DisconnectDeviceMessage; -import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.codec.http.Header; import org.jetlinks.core.message.codec.http.HttpExchangeMessage; @@ -20,12 +12,9 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.trace.DeviceTracer; import org.jetlinks.core.trace.FluxTracer; -import org.jetlinks.protocol.official.FunctionalTopicHandlers; import org.jetlinks.protocol.official.ObjectMappers; import org.jetlinks.protocol.official.TopicMessageCodec; -import org.jetlinks.protocol.official.TopicPayload; import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -67,11 +56,11 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { return Mono.empty(); } - private static SimpleHttpResponseMessage unauthorized() { + private static SimpleHttpResponseMessage unauthorized(String msg) { return SimpleHttpResponseMessage .builder() .contentType(MediaType.APPLICATION_JSON) - .body("{\"success\":true,\"code\":\"unauthorized\"}") + .body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\""+msg+"\"}") .status(401) .build(); } @@ -90,20 +79,20 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { @Override public Flux decode(@Nonnull MessageDecodeContext context) { HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage(); - byte[] payload = message.payloadAsBytes(); - ObjectMapper mapper = ObjectMappers.JSON_MAPPER; + //校验请求头中的Authorization header,格式: + // Authorization: Bearer Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null); if (header == null || header.getValue() == null || header.getValue().length == 0) { return message - .response(unauthorized()) + .response(unauthorized("Authorization header is required")) .thenMany(Mono.empty()); } String[] token = header.getValue()[0].split(" "); if (token.length == 1) { return message - .response(unauthorized()) + .response(unauthorized("Illegal token format")) .thenMany(Mono.empty()); } String basicToken = token[1]; @@ -118,8 +107,14 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { return context .getDevice(deviceId) .flatMap(device -> device.getConfig("bearer_token")) + //校验token .filter(value -> Objects.equals(value.asString(), basicToken)) - .flatMapMany(ignore -> TopicMessageCodec.decode(mapper, paths, payload)) + //设备或者配置不对 + .switchIfEmpty(Mono.defer(() -> message + .response(unauthorized("Device no register or token not match")) + .then(Mono.empty()))) + //解码 + .flatMapMany(ignore -> doDecode(message, paths)) .switchOnFirst((s, flux) -> { Mono handler; //有结果则认为成功 @@ -127,8 +122,8 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { handler = message.ok("{\"success\":true}"); } else { return message - .response(unauthorized()) - .then(Mono.error(new BusinessException("设备[" + deviceId + "]未激活或token [" + basicToken + "]错误"))); + .response(badRequest()) + .then(Mono.empty()); } return handler.thenMany(flux); }) @@ -142,6 +137,15 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { } + private Flux doDecode(HttpExchangeMessage message, String[] paths) { + return message + .payload() + .flatMapMany(buf -> { + byte[] body = ByteBufUtil.getBytes(buf); + return TopicMessageCodec.decode(ObjectMappers.JSON_MAPPER, paths, body); + }); + } + public String getErrorMessage(Throwable err) { if (err instanceof JsonParseException) { return "{\"success\":false,\"code\":\"request_body_format_error\"}";