websocket支持

This commit is contained in:
zhouhao 2023-01-10 17:16:39 +08:00
parent 57b0cb0a73
commit 5ec684c578
2 changed files with 101 additions and 5 deletions

View File

@ -6,6 +6,7 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.route.HttpRoute; import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec; import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
@ -17,6 +18,7 @@ import org.springframework.http.MediaType;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -101,10 +103,26 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig); support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec()); support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//Websocket
JetLinksHttpDeviceMessageCodec codec = new JetLinksHttpDeviceMessageCodec(DefaultTransport.WebSocket);
support.addMessageCodecSupport(codec);
support.addAuthenticator(DefaultTransport.WebSocket, codec);
support.addRoutes(
DefaultTransport.WebSocket,
Collections.singleton(
WebsocketRoute
.builder()
.path("/{productId:产品ID}/{productId:设备ID}/socket")
.description("通过Websocket接入平台")
.build()
));
//CoAP //CoAP
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig); support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec()); support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
return Mono.just(support); return Mono.just(support);
}); });
} }

View File

@ -1,13 +1,21 @@
package org.jetlinks.protocol.official.http; package org.jetlinks.protocol.official.http;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header; import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage; import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage; import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage;
import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer; import org.jetlinks.core.trace.DeviceTracer;
@ -16,6 +24,7 @@ import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec; import org.jetlinks.protocol.official.TopicMessageCodec;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -29,7 +38,7 @@ import java.util.Objects;
* @since 3.0.0 * @since 3.0.0
*/ */
@Slf4j @Slf4j
public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec, Authenticator {
public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata( public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置" "HTTP认证配置"
, "使用HTTP Bearer Token进行认证") , "使用HTTP Bearer Token进行认证")
@ -51,16 +60,20 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
} }
@Nonnull @Nonnull
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) { public Mono<EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
//http 暂不支持下发
return Mono.empty(); JSONObject json = context.getMessage().toJson();
//通过websocket下发
return Mono.just(DefaultWebSocketMessage.of(
WebSocketMessage.Type.TEXT,
Unpooled.wrappedBuffer(json.toJSONString().getBytes())));
} }
private static SimpleHttpResponseMessage unauthorized(String msg) { private static SimpleHttpResponseMessage unauthorized(String msg) {
return SimpleHttpResponseMessage return SimpleHttpResponseMessage
.builder() .builder()
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\""+msg+"\"}") .body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}")
.status(401) .status(401)
.build(); .build();
} }
@ -78,6 +91,28 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
@Nonnull @Nonnull
@Override @Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) { public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof HttpExchangeMessage) {
return decodeHttp(context);
}
if (context.getMessage() instanceof WebSocketSessionMessage) {
return decodeWebsocket(context);
}
return Flux.empty();
}
private Flux<DeviceMessage> decodeWebsocket(MessageDecodeContext context) {
WebSocketSessionMessage msg = ((WebSocketSessionMessage) context.getMessage());
return Mono
.justOrEmpty(MessageType.convertMessage(msg.payloadAsJson()))
.cast(DeviceMessage.class)
.flux();
}
private Flux<DeviceMessage> decodeHttp(MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage(); HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
//校验请求头中的Authorization header,格式: //校验请求头中的Authorization header,格式:
@ -153,4 +188,47 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec {
return "{\"success\":false,\"code\":\"server_error\"}"; return "{\"success\":false,\"code\":\"server_error\"}";
} }
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator device) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String token = req
.getSocketSession()
.getQueryParameters()
.get("token");
if (StringUtils.isEmpty(token)) {
return Mono.just(AuthenticationResponse.error(401, "认证参数错误"));
}
return device
.getConfig("bearer_token")
//校验token
.filter(value -> Objects.equals(value.asString(), token))
.map(ignore -> AuthenticationResponse.success(device.getDeviceId()))
//未配置或者配置不对
.switchIfEmpty(Mono.fromSupplier(() -> AuthenticationResponse.error(401, "token错误")));
}
static AuthenticationResponse deviceNotFound = AuthenticationResponse.error(404, "设备不存在");
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String[] paths = TopicMessageCodec.removeProductPath(req.getSocketSession().getPath());
if (paths.length < 1) {
return Mono.just(AuthenticationResponse.error(400, "URL格式错误"));
}
return registry
.getDevice(paths[1])
.flatMap(device -> authenticate(request, device))
.defaultIfEmpty(deviceNotFound);
}
} }