使用新的http逻辑

This commit is contained in:
zhouhao 2022-12-06 11:18:41 +08:00
parent a4876e49ce
commit b478566fc1
1 changed files with 25 additions and 21 deletions

View File

@ -1,17 +1,9 @@
package org.jetlinks.protocol.official.http; package org.jetlinks.protocol.official.http;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header; import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage; import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
@ -20,12 +12,9 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer; import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer; import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.protocol.official.FunctionalTopicHandlers;
import org.jetlinks.protocol.official.ObjectMappers; import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec; import org.jetlinks.protocol.official.TopicMessageCodec;
import org.jetlinks.protocol.official.TopicPayload;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -67,11 +56,11 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
return Mono.empty(); return Mono.empty();
} }
private static SimpleHttpResponseMessage unauthorized() { private static SimpleHttpResponseMessage unauthorized(String msg) {
return SimpleHttpResponseMessage return SimpleHttpResponseMessage
.builder() .builder()
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":true,\"code\":\"unauthorized\"}") .body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\""+msg+"\"}")
.status(401) .status(401)
.build(); .build();
} }
@ -90,20 +79,20 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
@Override @Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) { public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage(); HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
ObjectMapper mapper = ObjectMappers.JSON_MAPPER;
//校验请求头中的Authorization header,格式:
// Authorization: Bearer <token>
Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null); Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null);
if (header == null || header.getValue() == null || header.getValue().length == 0) { if (header == null || header.getValue() == null || header.getValue().length == 0) {
return message return message
.response(unauthorized()) .response(unauthorized("Authorization header is required"))
.thenMany(Mono.empty()); .thenMany(Mono.empty());
} }
String[] token = header.getValue()[0].split(" "); String[] token = header.getValue()[0].split(" ");
if (token.length == 1) { if (token.length == 1) {
return message return message
.response(unauthorized()) .response(unauthorized("Illegal token format"))
.thenMany(Mono.empty()); .thenMany(Mono.empty());
} }
String basicToken = token[1]; String basicToken = token[1];
@ -118,8 +107,14 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
return context return context
.getDevice(deviceId) .getDevice(deviceId)
.flatMap(device -> device.getConfig("bearer_token")) .flatMap(device -> device.getConfig("bearer_token"))
//校验token
.filter(value -> Objects.equals(value.asString(), basicToken)) .filter(value -> Objects.equals(value.asString(), basicToken))
.flatMapMany(ignore -> TopicMessageCodec.decode(mapper, paths, payload)) //设备或者配置不对
.switchIfEmpty(Mono.defer(() -> message
.response(unauthorized("Device no register or token not match"))
.then(Mono.empty())))
//解码
.flatMapMany(ignore -> doDecode(message, paths))
.switchOnFirst((s, flux) -> { .switchOnFirst((s, flux) -> {
Mono<Void> handler; Mono<Void> handler;
//有结果则认为成功 //有结果则认为成功
@ -127,8 +122,8 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
handler = message.ok("{\"success\":true}"); handler = message.ok("{\"success\":true}");
} else { } else {
return message return message
.response(unauthorized()) .response(badRequest())
.then(Mono.error(new BusinessException("设备[" + deviceId + "]未激活或token [" + basicToken + "]错误"))); .then(Mono.empty());
} }
return handler.thenMany(flux); return handler.thenMany(flux);
}) })
@ -142,6 +137,15 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
} }
private Flux<DeviceMessage> doDecode(HttpExchangeMessage message, String[] paths) {
return message
.payload()
.flatMapMany(buf -> {
byte[] body = ByteBufUtil.getBytes(buf);
return TopicMessageCodec.decode(ObjectMappers.JSON_MAPPER, paths, body);
});
}
public String getErrorMessage(Throwable err) { public String getErrorMessage(Throwable err) {
if (err instanceof JsonParseException) { if (err instanceof JsonParseException) {
return "{\"success\":false,\"code\":\"request_body_format_error\"}"; return "{\"success\":false,\"code\":\"request_body_format_error\"}";