diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java index a7ccff9..bd2775a 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksProtocolSupportProvider.java @@ -6,6 +6,7 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.route.HttpRoute; +import org.jetlinks.core.route.WebsocketRoute; import org.jetlinks.core.spi.ProtocolSupportProvider; import org.jetlinks.core.spi.ServiceContext; import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec; @@ -17,6 +18,7 @@ import org.springframework.http.MediaType; import reactor.core.publisher.Mono; import java.util.Arrays; +import java.util.Collections; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -101,10 +103,26 @@ public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig); support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec()); + //Websocket + JetLinksHttpDeviceMessageCodec codec = new JetLinksHttpDeviceMessageCodec(DefaultTransport.WebSocket); + support.addMessageCodecSupport(codec); + support.addAuthenticator(DefaultTransport.WebSocket, codec); + + support.addRoutes( + DefaultTransport.WebSocket, + Collections.singleton( + WebsocketRoute + .builder() + .path("/{productId:产品ID}/{productId:设备ID}/socket") + .description("通过Websocket接入平台") + .build() + )); + //CoAP support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig); support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec()); + return Mono.just(support); }); } diff --git a/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java index debc4e0..9fbd524 100644 --- a/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/http/JetLinksHttpDeviceMessageCodec.java @@ -1,13 +1,21 @@ package org.jetlinks.protocol.official.http; +import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.core.JsonParseException; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.defaults.Authenticator; +import org.jetlinks.core.device.*; import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.MessageType; import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.codec.http.Header; import org.jetlinks.core.message.codec.http.HttpExchangeMessage; import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage; +import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage; +import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage; +import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage; import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.types.PasswordType; import org.jetlinks.core.trace.DeviceTracer; @@ -16,6 +24,7 @@ import org.jetlinks.protocol.official.ObjectMappers; import org.jetlinks.protocol.official.TopicMessageCodec; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; +import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -29,7 +38,7 @@ import java.util.Objects; * @since 3.0.0 */ @Slf4j -public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { +public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec, Authenticator { public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata( "HTTP认证配置" , "使用HTTP Bearer Token进行认证") @@ -51,16 +60,20 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { } @Nonnull - public Mono encode(@Nonnull MessageEncodeContext context) { - //http 暂不支持下发 - return Mono.empty(); + public Mono encode(@Nonnull MessageEncodeContext context) { + + JSONObject json = context.getMessage().toJson(); + //通过websocket下发 + return Mono.just(DefaultWebSocketMessage.of( + WebSocketMessage.Type.TEXT, + Unpooled.wrappedBuffer(json.toJSONString().getBytes()))); } private static SimpleHttpResponseMessage unauthorized(String msg) { return SimpleHttpResponseMessage .builder() .contentType(MediaType.APPLICATION_JSON) - .body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\""+msg+"\"}") + .body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}") .status(401) .build(); } @@ -78,6 +91,28 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { @Nonnull @Override public Flux decode(@Nonnull MessageDecodeContext context) { + if (context.getMessage() instanceof HttpExchangeMessage) { + return decodeHttp(context); + } + + if (context.getMessage() instanceof WebSocketSessionMessage) { + return decodeWebsocket(context); + } + + return Flux.empty(); + } + + private Flux decodeWebsocket(MessageDecodeContext context) { + WebSocketSessionMessage msg = ((WebSocketSessionMessage) context.getMessage()); + + return Mono + .justOrEmpty(MessageType.convertMessage(msg.payloadAsJson())) + .cast(DeviceMessage.class) + .flux(); + + } + + private Flux decodeHttp(MessageDecodeContext context) { HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage(); //校验请求头中的Authorization header,格式: @@ -153,4 +188,47 @@ public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec { return "{\"success\":false,\"code\":\"server_error\"}"; } + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator device) { + if (!(request instanceof WebsocketAuthenticationRequest)) { + return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式")); + } + WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request); + String token = req + .getSocketSession() + .getQueryParameters() + .get("token"); + + if (StringUtils.isEmpty(token)) { + return Mono.just(AuthenticationResponse.error(401, "认证参数错误")); + } + + return device + .getConfig("bearer_token") + //校验token + .filter(value -> Objects.equals(value.asString(), token)) + .map(ignore -> AuthenticationResponse.success(device.getDeviceId())) + //未配置或者配置不对 + .switchIfEmpty(Mono.fromSupplier(() -> AuthenticationResponse.error(401, "token错误"))); + } + + static AuthenticationResponse deviceNotFound = AuthenticationResponse.error(404, "设备不存在"); + + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) { + if (!(request instanceof WebsocketAuthenticationRequest)) { + return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式")); + } + WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request); + String[] paths = TopicMessageCodec.removeProductPath(req.getSocketSession().getPath()); + if (paths.length < 1) { + return Mono.just(AuthenticationResponse.error(400, "URL格式错误")); + } + + return registry + .getDevice(paths[1]) + .flatMap(device -> authenticate(request, device)) + .defaultIfEmpty(deviceNotFound); + + } }