From f38b76fc24ef101a5f21ef9deffb21bfaef4cb3f Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Fri, 30 Jul 2021 18:45:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=97=B4=E5=90=8C?= =?UTF-8?q?=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AbstractCoapDeviceMessageCodec.java | 81 +++++++ .../official/FunctionalTopicHandlers.java | 73 ++++++ .../JetLinksCoapDTLSDeviceMessageCodec.java | 124 +++++----- .../JetLinksCoapDeviceMessageCodec.java | 95 +++----- .../JetLinksMqttDeviceMessageCodec.java | 46 +++- .../official/functional/TimeSyncRequest.java | 10 + .../official/functional/TimeSyncResponse.java | 15 ++ .../JetLinksCoapDeviceMessageCodecTest.java | 60 ++++- .../JetLinksMqttDeviceMessageCodecTest.java | 223 +++++++++++++----- 9 files changed, 526 insertions(+), 201 deletions(-) create mode 100644 src/main/java/org/jetlinks/protocol/official/AbstractCoapDeviceMessageCodec.java create mode 100644 src/main/java/org/jetlinks/protocol/official/FunctionalTopicHandlers.java create mode 100644 src/main/java/org/jetlinks/protocol/official/functional/TimeSyncRequest.java create mode 100644 src/main/java/org/jetlinks/protocol/official/functional/TimeSyncResponse.java diff --git a/src/main/java/org/jetlinks/protocol/official/AbstractCoapDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/AbstractCoapDeviceMessageCodec.java new file mode 100644 index 0000000..2fb7802 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/AbstractCoapDeviceMessageCodec.java @@ -0,0 +1,81 @@ +package org.jetlinks.protocol.official; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.codec.*; +import org.reactivestreams.Publisher; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +@Slf4j +public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec { + + protected abstract Flux decode(CoapMessage message, MessageDecodeContext context, Consumer response); + + protected String getPath(CoapMessage message){ + String path = message.getPath(); + if (!path.startsWith("/")) { + path = "/" + path; + } + return path; + } + + protected String getDeviceId(CoapMessage message){ + String deviceId = message.getStringOption(2100).orElse(null); + String[] paths = TopicMessageCodec.removeProductPath(getPath(message)); + if (StringUtils.isEmpty(deviceId) && paths.length > 1) { + deviceId = paths[1]; + } + return deviceId; + } + + @Nonnull + @Override + public Flux decode(@Nonnull MessageDecodeContext context) { + if (context.getMessage() instanceof CoapExchangeMessage) { + CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage()); + AtomicBoolean alreadyReply = new AtomicBoolean(); + Consumer responseHandler = (resp) -> { + if (alreadyReply.compareAndSet(false, true)) { + if (resp instanceof CoAP.ResponseCode) { + exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp)); + } + if (resp instanceof String) { + exchangeMessage.getExchange().respond(((String) resp)); + } + if (resp instanceof byte[]) { + exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp)); + } + } + }; + + return this + .decode(exchangeMessage, context, responseHandler) + .doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED)) + .doOnError(error -> { + log.error("decode coap message error", error); + responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST); + }) + .switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST))); + } + if (context.getMessage() instanceof CoapMessage) { + return decode(((CoapMessage) context.getMessage()), context, resp -> { + log.info("skip response coap request:{}", resp); + }); + } + + return Flux.empty(); + } + + @Nonnull + @Override + public Publisher encode(@Nonnull MessageEncodeContext context) { + return Mono.empty(); + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/FunctionalTopicHandlers.java b/src/main/java/org/jetlinks/protocol/official/FunctionalTopicHandlers.java new file mode 100644 index 0000000..fedb402 --- /dev/null +++ b/src/main/java/org/jetlinks/protocol/official/FunctionalTopicHandlers.java @@ -0,0 +1,73 @@ +package org.jetlinks.protocol.official; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.utils.TopicUtils; +import org.jetlinks.protocol.official.functional.TimeSyncRequest; +import org.jetlinks.protocol.official.functional.TimeSyncResponse; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.Optional; +import java.util.function.Function; + +/** + * 功能性的topic,不和平台交互 + */ +public enum FunctionalTopicHandlers { + + //同步时间 + timeSync("/*/*/time-sync") { + @SneakyThrows + @SuppressWarnings("all") + Mono doHandle(DeviceOperator device, + String[] topic, + byte[] payload, + ObjectMapper mapper, + Function> sender) { + TopicPayload topicPayload = new TopicPayload(); + topicPayload.setTopic(String.join("/", topic) + "/reply"); + TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class); + TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis()); + topicPayload.setPayload(mapper.writeValueAsBytes(response)); + //直接回复给设备 + return sender + .apply(topicPayload) + .then(Mono.empty()); + } + }; + + FunctionalTopicHandlers(String topic) { + this.pattern = topic.split("/"); + } + + private final String[] pattern; + + abstract Publisher doHandle(DeviceOperator device, + String[] topic, + byte[] payload, + ObjectMapper mapper, + Function> sender); + + + public static Publisher handle(DeviceOperator device, + String[] topic, + byte[] payload, + ObjectMapper mapper, + Function> sender) { + return Mono + .justOrEmpty(fromTopic(topic)) + .flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender)); + } + + static Optional fromTopic(String[] topic) { + for (FunctionalTopicHandlers value : values()) { + if (TopicUtils.match(value.pattern, topic)) { + return Optional.of(value); + } + } + return Optional.empty(); + } +} diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java index 8dc0806..66f6719 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDTLSDeviceMessageCodec.java @@ -8,18 +8,20 @@ import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.OptionNumberRegistry; import org.hswebframework.web.id.IDGenerator; import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.message.codec.CoapMessage; +import org.jetlinks.core.message.codec.DefaultTransport; +import org.jetlinks.core.message.codec.MessageDecodeContext; +import org.jetlinks.core.message.codec.Transport; import org.springframework.http.MediaType; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.annotation.Nonnull; import java.util.Arrays; import java.util.function.Consumer; @Slf4j -public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec { +public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { @Override public Transport getSupportTransport() { @@ -28,8 +30,12 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec { public Flux decode(CoapMessage message, MessageDecodeContext context, Consumer response) { + if (context.getDevice() == null) { + return Flux.empty(); + } return Flux.defer(() -> { - String path = message.getPath(); + String path = getPath(message); + String deviceId = getDeviceId(message); String sign = message.getStringOption(2110).orElse(null); String token = message.getStringOption(2111).orElse(null); byte[] payload = message.payloadAsBytes(); @@ -39,26 +45,34 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec { .map(MediaType.APPLICATION_CBOR::includes) .orElse(false); ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER; - if ("/auth".equals(path)) { + + if (StringUtils.isEmpty(deviceId)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + // TODO: 2021/7/30 移到 FunctionalTopicHandlers + if (path.endsWith("/request-token")) { //认证 return context - .getDevice() - .getConfig("secureKey") - .flatMap(sk -> { - String secureKey = sk.asString(); - if (!verifySign(secureKey, context.getDevice().getDeviceId(), payload, sign)) { - response.accept(CoAP.ResponseCode.BAD_REQUEST); - return Mono.empty(); - } - String newToken = IDGenerator.MD5.generate(); - return context.getDevice() - .setConfig("coap-token", newToken) - .doOnSuccess(success -> { - JSONObject json = new JSONObject(); - json.put("token", newToken); - response.accept(json.toJSONString()); - }); - }) + .getDevice(deviceId) + .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED))) + .flatMap(device -> device + .getConfig("secureKey") + .flatMap(sk -> { + String secureKey = sk.asString(); + if (!verifySign(secureKey, deviceId, payload, sign)) { + response.accept(CoAP.ResponseCode.BAD_REQUEST); + return Mono.empty(); + } + String newToken = IDGenerator.MD5.generate(); + return device + .setConfig("coap-token", newToken) + .doOnSuccess(success -> { + JSONObject json = new JSONObject(); + json.put("token", newToken); + response.accept(json.toJSONString()); + }); + })) .then(Mono.empty()); } if (StringUtils.isEmpty(token)) { @@ -66,22 +80,28 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec { return Mono.empty(); } return context - .getDevice() - .getConfig("coap-token") - .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED))) - .flatMapMany(value -> { - String tk = value.asString(); - if (!token.equals(tk)) { - response.accept(CoAP.ResponseCode.UNAUTHORIZED); - return Mono.empty(); - } - return TopicMessageCodec - .decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload) - .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.BAD_REQUEST))); - }) - .doOnComplete(() -> { - response.accept(CoAP.ResponseCode.CREATED); - }) + .getDevice(deviceId) + .flatMapMany(device -> device + .getSelfConfig("coap-token") + .switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED))) + .flatMapMany(value -> { + String tk = value.asString(); + if (!token.equals(tk)) { + response.accept(CoAP.ResponseCode.UNAUTHORIZED); + return Mono.empty(); + } + return TopicMessageCodec + .decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload) + //如果不能直接解码,可能是其他设备功能 + .switchIfEmpty(FunctionalTopicHandlers + .handle(device, + path.split("/"), + payload, + objectMapper, + reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload())))); + })) + + .doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED)) .doOnError(error -> { log.error("decode coap message error", error); response.accept(CoAP.ResponseCode.BAD_REQUEST); @@ -90,28 +110,6 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec { } - @Nonnull - @Override - public Flux decode(@Nonnull MessageDecodeContext context) { - if (context.getMessage() instanceof CoapExchangeMessage) { - CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage()); - return decode(exchangeMessage, context, resp -> { - if (resp instanceof CoAP.ResponseCode) { - exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp)); - } - if (resp instanceof String) { - exchangeMessage.getExchange().respond(((String) resp)); - } - }); - } - if (context.getMessage() instanceof CoapMessage) { - return decode(((CoapMessage) context.getMessage()), context, resp -> { - log.info("skip response coap request:{}", resp); - }); - } - - return Flux.empty(); - } protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) { //验证签名 @@ -125,9 +123,5 @@ public class JetLinksCoapDTLSDeviceMessageCodec implements DeviceMessageCodec { return true; } - @Nonnull - @Override - public Mono encode(@Nonnull MessageEncodeContext context) { - return Mono.empty(); - } + } diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java index 5c78ae5..79ebe9e 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodec.java @@ -1,34 +1,34 @@ package org.jetlinks.protocol.official; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.buffer.ByteBuf; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.OptionNumberRegistry; -import org.eclipse.californium.core.server.resources.CoapExchange; import org.jetlinks.core.Value; import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.codec.*; +import org.jetlinks.core.message.codec.CoapMessage; +import org.jetlinks.core.message.codec.DefaultTransport; +import org.jetlinks.core.message.codec.MessageDecodeContext; +import org.jetlinks.core.message.codec.Transport; import org.jetlinks.protocol.official.cipher.Ciphers; import org.springframework.http.MediaType; +import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import javax.annotation.Nonnull; +import java.util.function.Consumer; @Slf4j -public class JetLinksCoapDeviceMessageCodec implements DeviceMessageCodec { +public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec { @Override public Transport getSupportTransport() { return DefaultTransport.CoAP; } - protected Flux decode(CoapMessage message, MessageDecodeContext context) { - String path = message.getPath(); - + protected Flux decode(CoapMessage message, MessageDecodeContext context, Consumer response) { + String path = getPath(message); + String deviceId = getDeviceId(message); boolean cbor = message .getStringOption(OptionNumberRegistry.CONTENT_FORMAT) .map(MediaType::valueOf) @@ -36,60 +36,29 @@ public class JetLinksCoapDeviceMessageCodec implements DeviceMessageCodec { .orElse(false); ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER; return context - .getDevice() - .getConfigs("encAlg", "secureKey") - .flatMapMany(configs -> { - Ciphers ciphers = configs - .getValue("encAlg") - .map(Value::asString) - .flatMap(Ciphers::of) - .orElse(Ciphers.AES); - String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null); - ByteBuf byteBuf = message.getPayload(); - byte[] req = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(req); - byteBuf.resetReaderIndex(); - byte[] payload = ciphers.decrypt(req, secureKey); - //解码 - return TopicMessageCodec.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload); - }); + .getDevice(deviceId) + .flatMapMany(device -> device + .getConfigs("encAlg", "secureKey") + .flatMapMany(configs -> { + Ciphers ciphers = configs + .getValue("encAlg") + .map(Value::asString) + .flatMap(Ciphers::of) + .orElse(Ciphers.AES); + String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null); + byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey); + //解码 + return TopicMessageCodec + .decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload) + //如果不能直接解码,可能是其他设备功能 + .switchIfEmpty(FunctionalTopicHandlers + .handle(device, + path.split("/"), + payload, + objectMapper, + reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload())))); + })); } - protected Flux decode(CoapExchangeMessage message, MessageDecodeContext context) { - CoapExchange exchange = message.getExchange(); - return decode(((CoapMessage) message), context) - .doOnComplete(() -> { - exchange.respond(CoAP.ResponseCode.CREATED); - exchange.accept(); - }) - .switchIfEmpty(Mono.fromRunnable(() -> { - exchange.respond(CoAP.ResponseCode.BAD_REQUEST); - })) - .doOnError(error -> { - log.error("decode coap message error", error); - exchange.respond(CoAP.ResponseCode.BAD_REQUEST); - }); - } - @Nonnull - @Override - public Flux decode(@Nonnull MessageDecodeContext context) { - return Flux.defer(() -> { - log.debug("handle coap message:\n{}", context.getMessage()); - if (context.getMessage() instanceof CoapExchangeMessage) { - return decode(((CoapExchangeMessage) context.getMessage()), context); - } - if (context.getMessage() instanceof CoapMessage) { - return decode(((CoapMessage) context.getMessage()), context); - } - - return Flux.empty(); - }); - } - - @Nonnull - @Override - public Mono encode(@Nonnull MessageEncodeContext context) { - return Mono.empty(); - } } diff --git a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java index b6594f0..73fcff5 100644 --- a/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java +++ b/src/main/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.java @@ -1,6 +1,8 @@ package org.jetlinks.protocol.official; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.device.DeviceConfigKey; @@ -13,6 +15,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.annotation.Nonnull; +import java.util.Map; /** *
@@ -54,8 +57,11 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
 
     private final Transport transport;
 
+    private final ObjectMapper mapper;
+
     public JetLinksMqttDeviceMessageCodec(Transport transport) {
         this.transport = transport;
+        this.mapper = ObjectMappers.JSON_MAPPER;
     }
 
     public JetLinksMqttDeviceMessageCodec() {
@@ -81,15 +87,14 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
             if (message instanceof DeviceMessage) {
                 DeviceMessage deviceMessage = ((DeviceMessage) message);
 
-                TopicPayload convertResult = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, deviceMessage);
+                TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
                 if (convertResult == null) {
                     return Mono.empty();
                 }
                 return Mono
                         .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
                         .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
-                                              .flatMap(device -> device
-                                                      .getConfig(DeviceConfigKey.productId))
+                                              .flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
                         )
                         .defaultIfEmpty("null")
                         .map(productId -> SimpleMqttMessage
@@ -109,11 +114,42 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
     @Override
     public Flux decode(@Nonnull MessageDecodeContext context) {
         MqttMessage message = (MqttMessage) context.getMessage();
-
         byte[] payload = message.payloadAsBytes();
 
         return TopicMessageCodec
-                .decode(ObjectMappers.JSON_MAPPER, TopicMessageCodec.removeProductPath(message.getTopic()), payload);
+                .decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
+                //如果不能直接解码,可能是其他设备功能
+                .switchIfEmpty(FunctionalTopicHandlers
+                                       .handle(context.getDevice(),
+                                               message.getTopic().split("/"),
+                                               payload,
+                                               mapper,
+                                               reply -> doReply(context, reply)))
+                ;
+
+    }
+
+    private Mono doReply(MessageCodecContext context, TopicPayload reply) {
+
+        if (context instanceof FromDeviceMessageContext) {
+            return ((FromDeviceMessageContext) context)
+                    .getSession()
+                    .send(SimpleMqttMessage
+                                  .builder()
+                                  .topic(reply.getTopic())
+                                  .payload(reply.getPayload())
+                                  .build())
+                    .then();
+        } else if (context instanceof ToDeviceMessageContext) {
+            return ((ToDeviceMessageContext) context)
+                    .sendToDevice(SimpleMqttMessage
+                                          .builder()
+                                          .topic(reply.getTopic())
+                                          .payload(reply.getPayload())
+                                          .build())
+                    .then();
+        }
+        return Mono.empty();
 
     }
 
diff --git a/src/main/java/org/jetlinks/protocol/official/functional/TimeSyncRequest.java b/src/main/java/org/jetlinks/protocol/official/functional/TimeSyncRequest.java
new file mode 100644
index 0000000..119598c
--- /dev/null
+++ b/src/main/java/org/jetlinks/protocol/official/functional/TimeSyncRequest.java
@@ -0,0 +1,10 @@
+package org.jetlinks.protocol.official.functional;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class TimeSyncRequest {
+    private String messageId;
+}
diff --git a/src/main/java/org/jetlinks/protocol/official/functional/TimeSyncResponse.java b/src/main/java/org/jetlinks/protocol/official/functional/TimeSyncResponse.java
new file mode 100644
index 0000000..a543e8e
--- /dev/null
+++ b/src/main/java/org/jetlinks/protocol/official/functional/TimeSyncResponse.java
@@ -0,0 +1,15 @@
+package org.jetlinks.protocol.official.functional;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+public class TimeSyncResponse {
+    private String messageId;
+    private long timestamp;
+}
diff --git a/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java
index 815e115..6df4ac2 100644
--- a/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java
+++ b/src/test/java/org/jetlinks/protocol/official/JetLinksCoapDeviceMessageCodecTest.java
@@ -5,6 +5,7 @@ import org.eclipse.californium.core.CoapClient;
 import org.eclipse.californium.core.CoapResource;
 import org.eclipse.californium.core.CoapResponse;
 import org.eclipse.californium.core.CoapServer;
+import org.eclipse.californium.core.coap.Option;
 import org.eclipse.californium.core.coap.Request;
 import org.eclipse.californium.core.network.CoapEndpoint;
 import org.eclipse.californium.core.network.Endpoint;
@@ -19,26 +20,37 @@ import org.jetlinks.core.message.Message;
 import org.jetlinks.core.message.codec.CoapExchangeMessage;
 import org.jetlinks.core.message.codec.EncodedMessage;
 import org.jetlinks.core.message.codec.MessageDecodeContext;
+import org.jetlinks.core.message.event.EventMessage;
 import org.jetlinks.protocol.official.cipher.Ciphers;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import reactor.core.publisher.Mono;
 
 import javax.annotation.Nonnull;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class JetLinksCoapDeviceMessageCodecTest {
 
-
     JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
 
     DeviceOperator device;
 
-    private String key = RandomUtil.randomChar(16);
+    private final String key = RandomUtil.randomChar(16);
 
+    private TestDeviceRegistry registry;
+    AtomicReference messageRef = new AtomicReference<>();
+
+    private CoapServer server;
+
+    @After
+    public void shutdown(){
+        server.stop();
+    }
     @Before
     public void init() {
-        TestDeviceRegistry registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
+        registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
         device = registry
                 .register(DeviceInfo.builder()
                                     .id("test")
@@ -46,14 +58,8 @@ public class JetLinksCoapDeviceMessageCodecTest {
                                     .build())
                 .flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
                 .block();
-    }
 
-    @Test
-    @SneakyThrows
-    public void test() {
-        AtomicReference messageRef = new AtomicReference<>();
-
-        CoapServer server = new CoapServer() {
+        server = new CoapServer() {
             @Override
             protected Resource createRoot() {
                 return new CoapResource("/", true) {
@@ -72,6 +78,11 @@ public class JetLinksCoapDeviceMessageCodecTest {
                                     public DeviceOperator getDevice() {
                                         return device;
                                     }
+
+                                    @Override
+                                    public Mono getDevice(String deviceId) {
+                                        return registry.getDevice(deviceId);
+                                    }
                                 })
                                 .doOnNext(messageRef::set)
                                 .doOnError(Throwable::printStackTrace)
@@ -90,11 +101,14 @@ public class JetLinksCoapDeviceMessageCodecTest {
                 .setPort(12341).build();
         server.addEndpoint(endpoint);
         server.start();
+    }
 
+    @Test
+    @SneakyThrows
+    public void test() {
 
         CoapClient coapClient = new CoapClient();
 
-
         Request request = Request.newPost();
         String payload = "{\"data\":1}";
 
@@ -104,11 +118,31 @@ public class JetLinksCoapDeviceMessageCodecTest {
 
         CoapResponse response = coapClient.advanced(request);
         Assert.assertTrue(response.isSuccess());
-        Thread.sleep(1000);
-        Assert.assertNotNull(messageRef.get());
 
+        Assert.assertNotNull(messageRef.get());
+        Assert.assertTrue(messageRef.get() instanceof EventMessage);
         System.out.println(messageRef.get());
     }
 
+    @Test
+    @SneakyThrows
+    public void testTimeSync() {
+
+        CoapClient coapClient = new CoapClient();
+
+        Request request = Request.newPost();
+        String payload = "{\"messageId\":1}";
+
+        request.setURI("coap://localhost:12341/test/test/time-sync");
+        request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
+//        request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
+
+        CoapResponse response = coapClient.advanced(request);
+        Assert.assertTrue(response.isSuccess());
+
+
+       Assert.assertTrue(response.getResponseText().contains("timestamp"));
+    }
+
 
 }
\ No newline at end of file
diff --git a/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java b/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java
index 44d60fd..6143cdc 100644
--- a/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java
+++ b/src/test/java/org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodecTest.java
@@ -15,11 +15,14 @@ import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.*;
+import org.jetlinks.core.server.session.DeviceSession;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import reactor.core.publisher.Mono;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,19 +34,21 @@ public class JetLinksMqttDeviceMessageCodecTest {
 
     TestDeviceRegistry registry;
 
+    private EncodedMessage currentReply;
+
     @Before
     public void init() {
         registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
 
         registry.register(ProductInfo.builder()
-                .id("product1")
-                .protocol("jetlinks")
-                .build())
+                                     .id("product1")
+                                     .protocol("jetlinks")
+                                     .build())
                 .flatMap(product -> registry.register(DeviceInfo.builder()
-                        .id("device1")
-                        .productId("product1")
-                        .build()))
-                .subscribe();
+                                                                .id("device1")
+                                                                .productId("product1")
+                                                                .build()))
+                .block();
 
     }
 
@@ -83,9 +88,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testReadPropertyReply() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/properties/read/reply")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/properties/read/reply")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ReadPropertyMessageReply);
         ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
@@ -99,17 +105,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testChildReadPropertyReply() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/child/test/properties/read/reply")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/child/test/properties/read/reply")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ChildDeviceMessage);
         ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
 
-        Assert.assertEquals(childReply.getDeviceId(),"device1");
-        Assert.assertEquals(childReply.getMessageId(),"test");
+        Assert.assertEquals(childReply.getDeviceId(), "device1");
+        Assert.assertEquals(childReply.getMessageId(), "test");
 
-        ReadPropertyMessageReply reply = (ReadPropertyMessageReply)childReply.getChildDeviceMessage();;
+        ReadPropertyMessageReply reply = (ReadPropertyMessageReply) childReply.getChildDeviceMessage();
+        ;
         Assert.assertTrue(reply.isSuccess());
         Assert.assertEquals(reply.getDeviceId(), "test");
         Assert.assertEquals(reply.getMessageId(), "test");
@@ -155,9 +163,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testWritePropertyReply() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/properties/write/reply")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/properties/write/reply")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof WritePropertyMessageReply);
         WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
@@ -169,21 +178,22 @@ public class JetLinksMqttDeviceMessageCodecTest {
     }
 
 
-
     @Test
     public void testWriteChildPropertyReply() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/child/test/properties/write/reply")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/child/test/properties/write/reply")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ChildDeviceMessage);
         ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
 
-        Assert.assertEquals(childReply.getDeviceId(),"device1");
-        Assert.assertEquals(childReply.getMessageId(),"test");
+        Assert.assertEquals(childReply.getDeviceId(), "device1");
+        Assert.assertEquals(childReply.getMessageId(), "test");
 
-        WritePropertyMessageReply reply = (WritePropertyMessageReply)childReply.getChildDeviceMessage();;
+        WritePropertyMessageReply reply = (WritePropertyMessageReply) childReply.getChildDeviceMessage();
+        ;
         Assert.assertTrue(reply.isSuccess());
         Assert.assertEquals(reply.getDeviceId(), "test");
         Assert.assertEquals(reply.getMessageId(), "test");
@@ -227,9 +237,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testInvokeFunctionReply() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/function/invoke/reply")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/function/invoke/reply")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
         FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
@@ -243,17 +254,19 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testInvokeChildFunctionReply() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/child/test/function/invoke/reply")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/child/test/function/invoke/reply")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ChildDeviceMessage);
         ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
 
-        Assert.assertEquals(childReply.getDeviceId(),"device1");
-        Assert.assertEquals(childReply.getMessageId(),"test");
+        Assert.assertEquals(childReply.getDeviceId(), "device1");
+        Assert.assertEquals(childReply.getMessageId(), "test");
 
-        FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply)childReply.getChildDeviceMessage();;
+        FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply) childReply.getChildDeviceMessage();
+        ;
         Assert.assertTrue(reply.isSuccess());
         Assert.assertEquals(reply.getDeviceId(), "test");
         Assert.assertEquals(reply.getMessageId(), "test");
@@ -264,9 +277,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testEvent() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/event/temp")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/event/temp")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof EventMessage);
         EventMessage reply = ((EventMessage) message);
@@ -279,9 +293,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testChildEvent() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/child/test/event/temp")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/child/test/event/temp")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ChildDeviceMessage);
 
@@ -295,9 +310,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testPropertiesReport() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/properties/report")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/properties/report")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ReportPropertyMessage);
         ReportPropertyMessage reply = ((ReportPropertyMessage) message);
@@ -311,9 +327,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testChildPropertiesReport() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/child/test/properties/report")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/child/test/properties/report")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ChildDeviceMessage);
 
@@ -328,9 +345,10 @@ public class JetLinksMqttDeviceMessageCodecTest {
     @Test
     public void testMetadataDerived() {
         Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/metadata/derived")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
-                .build())).blockFirst();
+                                                                             .topic("/product1/device1/metadata/derived")
+                                                                             .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
+                                                                                                                     .getBytes()))
+                                                                             .build())).blockFirst();
 
         Assert.assertTrue(message instanceof DerivedMetadataMessage);
         DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
@@ -342,10 +360,12 @@ public class JetLinksMqttDeviceMessageCodecTest {
 
     @Test
     public void testChildMetadataDerived() {
-        Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
-                .topic("/product1/device1/child/test/metadata/derived")
-                .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}".getBytes()))
-                .build())).blockFirst();
+        Message message = codec.decode(createMessageContext(SimpleMqttMessage
+                                                                    .builder()
+                                                                    .topic("/product1/device1/child/test/metadata/derived")
+                                                                    .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
+                                                                                                            .getBytes()))
+                                                                    .build())).blockFirst();
 
         Assert.assertTrue(message instanceof ChildDeviceMessage);
 
@@ -356,6 +376,25 @@ public class JetLinksMqttDeviceMessageCodecTest {
         System.out.println(reply);
     }
 
+    @Test
+    public void testTimeSync() {
+        Message message = codec.decode(createMessageContext(SimpleMqttMessage
+                                                                    .builder()
+                                                                    .topic("/product1/device1/time-sync")
+                                                                    .payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\"}"
+                                                                                                            .getBytes()))
+                                                                    .build()))
+                               .blockFirst();
+
+        Assert.assertNull(message);
+
+        Assert.assertNotNull(currentReply);
+
+        Assert.assertEquals(((MqttMessage) currentReply).getTopic(),"/product1/device1/time-sync/reply");
+        Assert.assertTrue(currentReply.payloadAsString().contains("timestamp"));
+
+    }
+
     public MessageEncodeContext createMessageContext(Message message) {
         System.out.println(message.toString());
         return new MessageEncodeContext() {
@@ -369,25 +408,99 @@ public class JetLinksMqttDeviceMessageCodecTest {
             public DeviceOperator getDevice() {
                 return registry.getDevice("device1").block();
             }
+
+            @Override
+            public Mono getDevice(String deviceId) {
+                return registry.getDevice(deviceId);
+            }
         };
     }
 
 
     public MessageDecodeContext createMessageContext(EncodedMessage message) {
         System.out.println(message.toString());
-        return new MessageDecodeContext() {
+        return new FromDeviceMessageContext() {
             @Nonnull
             @Override
             public EncodedMessage getMessage() {
                 return message;
             }
 
+            @Override
+            public DeviceSession getSession() {
+                return new MockSession();
+            }
+
             @Override
             public DeviceOperator getDevice() {
                 return registry.getDevice("device1").block();
             }
+
+            @Override
+            public Mono getDevice(String deviceId) {
+                return registry.getDevice(deviceId);
+            }
         };
     }
 
+    class MockSession implements DeviceSession {
+
+        @Override
+        public String getId() {
+            return "device1";
+        }
+
+        @Override
+        public String getDeviceId() {
+            return "device1";
+        }
+
+        @Nullable
+        @Override
+        public DeviceOperator getOperator() {
+            return registry.getDevice("device1").block();
+        }
+
+        @Override
+        public long lastPingTime() {
+            return 0;
+        }
+
+        @Override
+        public long connectTime() {
+            return 0;
+        }
+
+        @Override
+        public Mono send(EncodedMessage encodedMessage) {
+            currentReply = encodedMessage;
+            return Mono.just(true);
+        }
+
+        @Override
+        public Transport getTransport() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void ping() {
+
+        }
+
+        @Override
+        public boolean isAlive() {
+            return false;
+        }
+
+        @Override
+        public void onClose(Runnable call) {
+
+        }
+    }
 
 }
\ No newline at end of file