增加时间同步

This commit is contained in:
zhou-hao 2021-07-30 18:45:23 +08:00
parent 263ab0ebb3
commit f38b76fc24
9 changed files with 526 additions and 201 deletions

View File

@ -0,0 +1,81 @@
package org.jetlinks.protocol.official;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Slf4j
public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec {
protected abstract Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response);
protected String getPath(CoapMessage message){
String path = message.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
}
return path;
}
protected String getDeviceId(CoapMessage message){
String deviceId = message.getStringOption(2100).orElse(null);
String[] paths = TopicMessageCodec.removeProductPath(getPath(message));
if (StringUtils.isEmpty(deviceId) && paths.length > 1) {
deviceId = paths[1];
}
return deviceId;
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
AtomicBoolean alreadyReply = new AtomicBoolean();
Consumer<Object> responseHandler = (resp) -> {
if (alreadyReply.compareAndSet(false, true)) {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
if (resp instanceof byte[]) {
exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp));
}
}
};
return this
.decode(exchangeMessage, context, responseHandler)
.doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST);
})
.switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST)));
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
@Nonnull
@Override
public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -0,0 +1,73 @@
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.functional.TimeSyncRequest;
import org.jetlinks.protocol.official.functional.TimeSyncResponse;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.function.Function;
/**
* 功能性的topic,不和平台交互
*/
public enum FunctionalTopicHandlers {
//同步时间
timeSync("/*/*/time-sync") {
@SneakyThrows
@SuppressWarnings("all")
Mono<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
TopicPayload topicPayload = new TopicPayload();
topicPayload.setTopic(String.join("/", topic) + "/reply");
TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class);
TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis());
topicPayload.setPayload(mapper.writeValueAsBytes(response));
//直接回复给设备
return sender
.apply(topicPayload)
.then(Mono.empty());
}
};
FunctionalTopicHandlers(String topic) {
this.pattern = topic.split("/");
}
private final String[] pattern;
abstract Publisher<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender);
public static Publisher<DeviceMessage> handle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
return Mono
.justOrEmpty(fromTopic(topic))
.flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender));
}
static Optional<FunctionalTopicHandlers> fromTopic(String[] topic) {
for (FunctionalTopicHandlers value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
}

View File

@ -8,18 +8,20 @@ import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
@Override
public Transport getSupportTransport() {
@ -28,8 +30,12 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
public Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
if (context.getDevice() == null) {
return Flux.empty();
}
return Flux.defer(() -> {
String path = message.getPath();
String path = getPath(message);
String deviceId = getDeviceId(message);
String sign = message.getStringOption(2110).orElse(null);
String token = message.getStringOption(2111).orElse(null);
byte[] payload = message.payloadAsBytes();
@ -39,26 +45,34 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
if ("/auth".equals(path)) {
if (StringUtils.isEmpty(deviceId)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
// TODO: 2021/7/30 移到 FunctionalTopicHandlers
if (path.endsWith("/request-token")) {
//认证
return context
.getDevice()
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return context.getDevice()
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
})
.getDevice(deviceId)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMap(device -> device
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, deviceId, payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return device
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
}))
.then(Mono.empty());
}
if (StringUtils.isEmpty(token)) {
@ -66,22 +80,28 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
return Mono.empty();
}
return context
.getDevice()
.getConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST)));
})
.doOnComplete(() -> {
response.accept(CoAP.ResponseCode.CREATED);
})
.getDevice(deviceId)
.flatMapMany(device -> device
.getSelfConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}))
.doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
response.accept(CoAP.ResponseCode.BAD_REQUEST);
@ -90,28 +110,6 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
return decode(exchangeMessage, context, resp -> {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
});
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) {
//验证签名
@ -125,9 +123,5 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec {
return true;
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -1,34 +1,34 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.jetlinks.core.Value;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDeviceMessageCodec implements DeviceMessageCodec {
public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP;
}
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context) {
String path = message.getPath();
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
String path = getPath(message);
String deviceId = getDeviceId(message);
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
@ -36,60 +36,29 @@ public class JetLinksCoapDeviceMessageCodec implements DeviceMessageCodec {
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
return context
.getDevice()
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
ByteBuf byteBuf = message.getPayload();
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
byteBuf.resetReaderIndex();
byte[] payload = ciphers.decrypt(req, secureKey);
//解码
return TopicMessageCodec.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload);
});
.getDevice(deviceId)
.flatMapMany(device -> device
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey);
//解码
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}));
}
protected Flux<DeviceMessage> decode(CoapExchangeMessage message, MessageDecodeContext context) {
CoapExchange exchange = message.getExchange();
return decode(((CoapMessage) message), context)
.doOnComplete(() -> {
exchange.respond(CoAP.ResponseCode.CREATED);
exchange.accept();
})
.switchIfEmpty(Mono.fromRunnable(() -> {
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
}))
.doOnError(error -> {
log.error("decode coap message error", error);
exchange.respond(CoAP.ResponseCode.BAD_REQUEST);
});
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
return Flux.defer(() -> {
log.debug("handle coap message:\n{}", context.getMessage());
if (context.getMessage() instanceof CoapExchangeMessage) {
return decode(((CoapExchangeMessage) context.getMessage()), context);
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context);
}
return Flux.empty();
});
}
@Nonnull
@Override
public Mono<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}

View File

@ -1,6 +1,8 @@
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
@ -13,6 +15,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Map;
/**
* <pre>
@ -54,8 +57,11 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
private final ObjectMapper mapper;
public JetLinksMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public JetLinksMqttDeviceMessageCodec() {
@ -81,15 +87,14 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
TopicPayload convertResult = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, deviceMessage);
TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device
.getConfig(DeviceConfigKey.productId))
.flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage
@ -109,11 +114,42 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
MqttMessage message = (MqttMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
return TopicMessageCodec
.decode(ObjectMappers.JSON_MAPPER, TopicMessageCodec.removeProductPath(message.getTopic()), payload);
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
//如果不能直接解码可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(context.getDevice(),
message.getTopic().split("/"),
payload,
mapper,
reply -> doReply(context, reply)))
;
}
private Mono<Void> doReply(MessageCodecContext context, TopicPayload reply) {
if (context instanceof FromDeviceMessageContext) {
return ((FromDeviceMessageContext) context)
.getSession()
.send(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
} else if (context instanceof ToDeviceMessageContext) {
return ((ToDeviceMessageContext) context)
.sendToDevice(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
}
return Mono.empty();
}

View File

@ -0,0 +1,10 @@
package org.jetlinks.protocol.official.functional;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TimeSyncRequest {
private String messageId;
}

View File

@ -0,0 +1,15 @@
package org.jetlinks.protocol.official.functional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TimeSyncResponse {
private String messageId;
private long timestamp;
}

View File

@ -5,6 +5,7 @@ import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.Option;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
@ -19,26 +20,37 @@ import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.CoapExchangeMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicReference;
public class JetLinksCoapDeviceMessageCodecTest {
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
DeviceOperator device;
private String key = RandomUtil.randomChar(16);
private final String key = RandomUtil.randomChar(16);
private TestDeviceRegistry registry;
AtomicReference<Message> messageRef = new AtomicReference<>();
private CoapServer server;
@After
public void shutdown(){
server.stop();
}
@Before
public void init() {
TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
device = registry
.register(DeviceInfo.builder()
.id("test")
@ -46,14 +58,8 @@ public class JetLinksCoapDeviceMessageCodecTest {
.build())
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
.block();
}
@Test
@SneakyThrows
public void test() {
AtomicReference<Message> messageRef = new AtomicReference<>();
CoapServer server = new CoapServer() {
server = new CoapServer() {
@Override
protected Resource createRoot() {
return new CoapResource("/", true) {
@ -72,6 +78,11 @@ public class JetLinksCoapDeviceMessageCodecTest {
public DeviceOperator getDevice() {
return device;
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
})
.doOnNext(messageRef::set)
.doOnError(Throwable::printStackTrace)
@ -90,11 +101,14 @@ public class JetLinksCoapDeviceMessageCodecTest {
.setPort(12341).build();
server.addEndpoint(endpoint);
server.start();
}
@Test
@SneakyThrows
public void test() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"data\":1}";
@ -104,11 +118,31 @@ public class JetLinksCoapDeviceMessageCodecTest {
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Thread.sleep(1000);
Assert.assertNotNull(messageRef.get());
Assert.assertNotNull(messageRef.get());
Assert.assertTrue(messageRef.get() instanceof EventMessage);
System.out.println(messageRef.get());
}
@Test
@SneakyThrows
public void testTimeSync() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"messageId\":1}";
request.setURI("coap://localhost:12341/test/test/time-sync");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Assert.assertTrue(response.getResponseText().contains("timestamp"));
}
}

View File

@ -15,11 +15,14 @@ import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.server.session.DeviceSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@ -31,19 +34,21 @@ public class JetLinksMqttDeviceMessageCodecTest {
TestDeviceRegistry registry;
private EncodedMessage currentReply;
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
registry.register(ProductInfo.builder()
.id("product1")
.protocol("jetlinks")
.build())
.id("product1")
.protocol("jetlinks")
.build())
.flatMap(product -> registry.register(DeviceInfo.builder()
.id("device1")
.productId("product1")
.build()))
.subscribe();
.id("device1")
.productId("product1")
.build()))
.block();
}
@ -83,9 +88,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
@ -99,17 +105,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
ReadPropertyMessageReply reply = (ReadPropertyMessageReply)childReply.getChildDeviceMessage();;
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -155,9 +163,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testWritePropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof WritePropertyMessageReply);
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
@ -169,21 +178,22 @@ public class JetLinksMqttDeviceMessageCodecTest {
}
@Test
public void testWriteChildPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
WritePropertyMessageReply reply = (WritePropertyMessageReply)childReply.getChildDeviceMessage();;
WritePropertyMessageReply reply = (WritePropertyMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -227,9 +237,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testInvokeFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
@ -243,17 +254,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testInvokeChildFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(),"device1");
Assert.assertEquals(childReply.getMessageId(),"test");
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply)childReply.getChildDeviceMessage();;
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
@ -264,9 +277,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof EventMessage);
EventMessage reply = ((EventMessage) message);
@ -279,9 +293,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
@ -295,9 +310,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
@ -311,9 +327,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
@ -328,9 +345,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).blockFirst();
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof DerivedMetadataMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
@ -342,10 +360,12 @@ public class JetLinksMqttDeviceMessageCodecTest {
@Test
public void testChildMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
.build())).blockFirst();
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
@ -356,6 +376,25 @@ public class JetLinksMqttDeviceMessageCodecTest {
System.out.println(reply);
}
@Test
public void testTimeSync() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/time-sync")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\"}"
.getBytes()))
.build()))
.blockFirst();
Assert.assertNull(message);
Assert.assertNotNull(currentReply);
Assert.assertEquals(((MqttMessage) currentReply).getTopic(),"/product1/device1/time-sync/reply");
Assert.assertTrue(currentReply.payloadAsString().contains("timestamp"));
}
public MessageEncodeContext createMessageContext(Message message) {
System.out.println(message.toString());
return new MessageEncodeContext() {
@ -369,25 +408,99 @@ public class JetLinksMqttDeviceMessageCodecTest {
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
public MessageDecodeContext createMessageContext(EncodedMessage message) {
System.out.println(message.toString());
return new MessageDecodeContext() {
return new FromDeviceMessageContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return message;
}
@Override
public DeviceSession getSession() {
return new MockSession();
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
class MockSession implements DeviceSession {
@Override
public String getId() {
return "device1";
}
@Override
public String getDeviceId() {
return "device1";
}
@Nullable
@Override
public DeviceOperator getOperator() {
return registry.getDevice("device1").block();
}
@Override
public long lastPingTime() {
return 0;
}
@Override
public long connectTime() {
return 0;
}
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
currentReply = encodedMessage;
return Mono.just(true);
}
@Override
public Transport getTransport() {
return null;
}
@Override
public void close() {
}
@Override
public void ping() {
}
@Override
public boolean isAlive() {
return false;
}
@Override
public void onClose(Runnable call) {
}
}
}