diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf
index 1a5511640..b6dbff834 100644
--- a/apps/emqx_gateway/etc/emqx_gateway.conf
+++ b/apps/emqx_gateway/etc/emqx_gateway.conf
@@ -27,6 +27,32 @@ gateway: {
}
}
+ coap.1: {
+ enable_stats: false
+ authenticator: allow_anonymous
+ heartbeat: 30s
+ resource: mqtt
+ notify_type: qos
+ subscribe_qos: qos0
+ publish_qos: qos1
+ listener.udp.1: {
+ bind: 5687
+ }
+ }
+
+ coap.2: {
+ enable_stats: false
+ authenticator: allow_anonymous
+ heartbeat: 30s
+ resource: pubsub
+ notify_type: non
+ subscribe_qos: qos2
+ publish_qos: coap
+ listener.udp.1: {
+ bind: 5683
+ }
+ }
+
mqttsn.1: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1
diff --git a/apps/emqx_gateway/rebar.config b/apps/emqx_gateway/rebar.config
index a209d9723..8a0ad51e8 100644
--- a/apps/emqx_gateway/rebar.config
+++ b/apps/emqx_gateway/rebar.config
@@ -1,6 +1,5 @@
{erl_opts, [debug_info]}.
{deps, [
- {gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.2"}}},
{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v2.0.0"}}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}}
]}.
diff --git a/apps/emqx_gateway/src/coap/README.md b/apps/emqx_gateway/src/coap/README.md
index d4415a951..f71938c92 100644
--- a/apps/emqx_gateway/src/coap/README.md
+++ b/apps/emqx_gateway/src/coap/README.md
@@ -1,248 +1,183 @@
-# emqx-coap
+# Table of Contents
-emqx-coap is a CoAP Gateway for EMQ X Broker. It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
+1. [EMQX 5.0 CoAP Gateway](#org6feb6de)
+2. [CoAP Message Processing Flow](#org8458c1a)
+ 1. [Request Timing Diagram](#orgeaa4f53)
+ 1. [Transport && Transport Manager](#org88207b8)
+ 2. [Resource](#orgb32ce94)
+3. [Resource](#org8956f90)
+ 1. [MQTT Resource](#orge8c21b1)
+ 2. [PubSub Resource](#org68ddce7)
+4. [Heartbeat](#orgffdfecd)
+5. [Command](#org43004c2)
+6. [MQTT QOS <=> CoAP non/con](#org0157b5c)
-### Client Usage Example
-libcoap is an excellent coap library which has a simple client tool. It is recommended to use libcoap as a coap client.
-To compile libcoap, do following steps:
+
+
+# EMQX 5.0 CoAP Gateway
+
+emqx-coap is a CoAP Gateway for EMQ X Broker.
+It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
+
+
+
+
+# CoAP Message Processing Flow
+
+
+
+
+## Request Timing Diagram
+
+
+ ,------. ,------------. ,-----------------. ,---------. ,--------.
+ |client| |coap_gateway| |transport_manager| |transport| |resource|
+ `--+---' `-----+------' `--------+--------' `----+----' `---+----'
+ | | | | |
+ | -------------------> | | |
+ | | | | |
+ | | | | |
+ | | ------------------------>| | |
+ | | | | |
+ | | | | |
+ | | |----------------------->| |
+ | | | | |
+ | | | | |
+ | | | |------------------>|
+ | | | | |
+ | | | | |
+ | | | |<------------------|
+ | | | | |
+ | | | | |
+ | | |<-----------------------| |
+ | | | | |
+ | | | | |
+ | | <------------------------| | |
+ | | | | |
+ | | | | |
+ | <------------------- | | |
+ ,--+---. ,-----+------. ,--------+--------. ,----+----. ,---+----.
+ |client| |coap_gateway| |transport_manager| |transport| |resource|
+ `------' `------------' `-----------------' `---------' `--------'
+
+
+
+
+### Transport && Transport Manager
+
+Transport is a module that manages the life cycle and behaviour of CoAP messages\
+And the transport manager is to manage all transport which in this gateway
+
+
+
+
+### Resource
+
+The Resource is a behaviour that must implement GET/PUT/POST/DELETE method\
+Different Resources can have different implementations of this four method\
+Each gateway can only use one Resource module to process CoAP Request Message
+
+
+
+
+# Resource
+
+
+
+
+## MQTT Resource
+
+The MQTT Resource is a simple CoAP to MQTT adapter, the implementation of each method is as follows:
+
+- use uri path as topic
+- GET: subscribe the topic
+- PUT: publish message to this topic
+- POST: like PUT
+- DELETE: unsubscribe the topic
+
+
+
+
+## PubSub Resource
+
+The PubSub Resource like the MQTT Resource, but has a retained topic's message database\
+This Resource is shared, only can has one instance. The implementation:
+
+- use uri path as topic
+- GET:
+ - GET with observe = 0: subscribe the topic
+ - GET with observe = 1: unsubscribe the topic
+ - GET without observe: read lastest message from the message database, key is the topic
+- PUT:
+ insert message into the message database, key is the topic
+- POST:
+ like PUT, but will publish the message
+- DELETE:
+ delete message from the database, key is topic
+
+
+
+
+# Heartbeat
+
+At present, the CoAP gateway only supports UDP/DTLS connection, don't support UDP over TCP and UDP over WebSocket.
+Because UDP is connectionless, so the client needs to send heartbeat ping to the server interval. Otherwise, the server will close related resources
+Use ****POST with empty uri path**** as a heartbeat ping
+
+example:
```
-git clone http://github.com/obgm/libcoap
-cd libcoap
-./autogen.sh
-./configure --enable-documentation=no --enable-tests=no
-make
+coap-client -m post coap://127.0.0.1
```
-### Publish example:
+
+
+# Command
+
+Command is means the operation which outside the CoAP protocol, like authorization
+The Command format:
+
+1. use ****POST**** method
+2. uri path is empty
+3. query string is like ****action=comandX&argX=valuex&argY=valueY****
+
+example:
+1. connect:
```
-libcoap/examples/coap-client -m put -e 1234 "coap://127.0.0.1/mqtt/topic1?c=client1&u=tom&p=secret"
+coap-client -m post coap://127.0.0.1?action=connect&clientid=XXX&username=XXX&password=XXX
```
-- topic name is "topic1", NOT "/topic1"
-- client id is client1
-- username is tom
-- password is secret
-- payload is a text string "1234"
-
-A mqtt message with topic="topic1", payload="1234" has been published. Any mqtt client or coap client, who has subscribed this topic could receive this message immediately.
-
-### Subscribe example:
-
+2. disconnect:
```
-libcoap/examples/coap-client -m get -s 10 "coap://127.0.0.1/mqtt/topic1?c=client1&u=tom&p=secret"
-```
-- topic name is "topic1", NOT "/topic1"
-- client id is client1
-- username is tom
-- password is secret
-- subscribe time is 10 seconds
-
-And you will get following result if any mqtt client or coap client sent message with text "1234567" to "topic1":
-
-```
-v:1 t:CON c:GET i:31ae {} [ ]
-1234567v:1 t:CON c:GET i:31af {} [ Observe:1, Uri-Path:mqtt, Uri-Path:topic1, Uri-Query:c=client1, Uri-Query:u=tom, Uri-Query:p=secret ]
-```
-The output message is not well formatted which hide "1234567" at the head of the 2nd line.
-
-### Configure
-
-#### Common
-
-File: etc/emqx_coap.conf
-
-```properties
-
-## The UDP port that CoAP is listening on.
-##
-## Value: Port
-coap.port = 5683
-
-## Interval for keepalive, specified in seconds.
-##
-## Value: Duration
-## -s: seconds
-## -m: minutes
-## -h: hours
-coap.keepalive = 120s
-
-## Whether to enable statistics for CoAP clients.
-##
-## Value: on | off
-coap.enable_stats = off
-
+coap-client -m post coap://127.0.0.1?action=disconnect
```
-#### DTLS
+
-emqx_coap enable one-way authentication by default.
+# MQTT QOS <=> CoAP non/con
-If you want to disable it, comment these lines.
+CoAP gateway uses some options to control the conversion between MQTT qos and coap non/con:
-File: etc/emqx_coap.conf
+1.notify_type
+Control the type of notify messages when the observed object has changed.Can be:
-```properties
+- non
+- con
+- qos
+ in this value, MQTT QOS0 -> non, QOS1/QOS2 -> con
-## The DTLS port that CoAP is listening on.
-##
-## Value: Port
-coap.dtls.port = 5684
+2.subscribe_qos
+Control the qos of subscribe.Can be:
-## Private key file for DTLS
-##
-## Value: File
-coap.dtls.keyfile = {{ platform_etc_dir }}/certs/key.pem
+- qos0
+- qos1
+- qos2
+- coap
+ in this value, CoAP non -> qos0, con -> qos1
-## Server certificate for DTLS.
-##
-## Value: File
-coap.dtls.certfile = {{ platform_etc_dir }}/certs/cert.pem
-
-```
-
-##### Enable two-way autentication
-
-For two-way autentication:
-
-```properties
-
-## A server only does x509-path validation in mode verify_peer,
-## as it then sends a certificate request to the client (this
-## message is not sent if the verify option is verify_none).
-## You can then also want to specify option fail_if_no_peer_cert.
-## More information at: http://erlang.org/doc/man/ssl.html
-##
-## Value: verify_peer | verify_none
-## coap.dtls.verify = verify_peer
-
-## PEM-encoded CA certificates for DTLS
-##
-## Value: File
-## coap.dtls.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
-
-## Used together with {verify, verify_peer} by an SSL server. If set to true,
-## the server fails if the client does not have a certificate to send, that is,
-## sends an empty certificate.
-##
-## Value: true | false
-## coap.dtls.fail_if_no_peer_cert = false
-
-```
-
-### Load emqx-coap
-
-```bash
-./bin/emqx_ctl plugins load emqx_coap
-```
-
-CoAP Client Observe Operation (subscribe topic)
------------------------------------------------
-To subscribe any topic, issue following command:
-
-```
- GET coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password} with OBSERVE=0
-```
-
-- "mqtt" in the path is mandatory.
-- replace {topicname}, {clientid}, {username} and {password} with your true values.
-- {topicname} and {clientid} is mandatory.
-- if clientid is absent, a "bad_request" will be returned.
-- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
-- {username} and {password} are optional.
-- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
-- topic is subscribed with qos1.
-- if the subscription failed due to Authorization deny, the error code `forbidden` will be returned.
-
-CoAP Client Unobserve Operation (unsubscribe topic)
----------------------------------------------------
-To cancel observation, issue following command:
-
-```
- GET coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password} with OBSERVE=1
-```
-
-- "mqtt" in the path is mandatory.
-- replace {topicname}, {clientid}, {username} and {password} with your true values.
-- {topicname} and {clientid} is mandatory.
-- if clientid is absent, a "bad_request" will be returned.
-- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
-- {username} and {password} are optional.
-- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
-
-CoAP Client Notification Operation (subscribed Message)
--------------------------------------------------------
-Server will issue an observe-notification as a subscribed message.
-
-- Its payload is exactly the mqtt payload.
-- payload data type is "application/octet-stream".
-
-CoAP Client Publish Operation
------------------------------
-Issue a coap put command to publish messages. For example:
-
-```
- PUT coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password}
-```
-
-- "mqtt" in the path is mandatory.
-- replace {topicname}, {clientid}, {username} and {password} with your true values.
-- {topicname} and {clientid} is mandatory.
-- if clientid is absent, a "bad_request" will be returned.
-- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
-- {username} and {password} are optional.
-- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
-- payload could be any binary data.
-- payload data type is "application/octet-stream".
-- publish message will be sent with qos0.
-- if the publishing failed due to Authorization deny, the error code `forbidden` will be returned.
-
-CoAP Client Keep Alive
-----------------------
-Device should issue a get command periodically, serve as a ping to keep mqtt session online.
-
-```
- GET coap://localhost/mqtt/{any_topicname}?c={clientid}&u={username}&p={password}
-```
-
-- "mqtt" in the path is mandatory.
-- replace {any_topicname}, {clientid}, {username} and {password} with your true values.
-- {any_topicname} is optional, and should be percent-encoded to prevent special characters.
-- {clientid} is mandatory. If clientid is absent, a "bad_request" will be returned.
-- {username} and {password} are optional.
-- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
-- coap client should do keepalive work periodically to keep mqtt session online, especially those devices in a NAT network.
-
-
-CoAP Client NOTES
------------------
-emqx-coap gateway does not accept POST and DELETE requests.
-
-Topics in URI should be percent-encoded, but corresponding uri_path option has percent-encoding converted. Please refer to RFC 7252 section 6.4, "Decomposing URIs into Options":
-
-> Note that these rules completely resolve any percent-encoding.
-
-That implies coap client is responsible to convert any percert-encoding into true character while assembling coap packet.
-
-
-ClientId, Username, Password and Topic
---------------------------------------
-ClientId/username/password/topic in the coap URI are the concepts in mqtt. That is to say, emqx-coap is trying to fit coap message into mqtt system, by borrowing the client/username/password/topic from mqtt.
-
-The Auth/Authorization/Hook features in mqtt also applies on coap stuff. For example:
-- If username/password is not authorized, coap client will get an unauthorized error.
-- If username or clientid is not allowed to published specific topic, coap message will be dropped in fact, although coap client will get an acknoledgement from emqx-coap.
-- If a coap message is published, a 'message.publish' hook is able to capture this message as well.
-
-well-known locations
---------------------
-Discovery always return ","
-
-For example
-```
-libcoap/examples/coap-client -m get "coap://127.0.0.1/.well-known/core"
-```
+3.publish_qos
+like subscribe_qos, but control the qos of the publish MQTT message
License
-------
@@ -253,4 +188,3 @@ Author
------
EMQ X Team.
-
diff --git a/apps/emqx_gateway/src/coap/emqx_coap.app.src b/apps/emqx_gateway/src/coap/emqx_coap.app.src
deleted file mode 100644
index bb6d0431f..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap.app.src
+++ /dev/null
@@ -1,14 +0,0 @@
-{application, emqx_coap,
- [{description, "EMQ X CoAP Gateway"},
- {vsn, "5.0.0"}, % strict semver, bump manually!
- {modules, []},
- {registered, []},
- {applications, [kernel,stdlib,gen_coap]},
- {mod, {emqx_coap_app, []}},
- {env, []},
- {licenses, ["Apache-2.0"]},
- {maintainers, ["EMQ X Team "]},
- {links, [{"Homepage", "https://emqx.io/"},
- {"Github", "https://github.com/emqx/emqx-coap"}
- ]}
- ]}.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_app.erl b/apps/emqx_gateway/src/coap/emqx_coap_app.erl
deleted file mode 100644
index b73c92269..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_app.erl
+++ /dev/null
@@ -1,40 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_app).
-
--behaviour(application).
-
--emqx_plugin(protocol).
-
--include("src/coap/include/emqx_coap.hrl").
-
--export([ start/2
- , stop/1
- ]).
-
-start(_Type, _Args) ->
- {ok, Sup} = emqx_coap_sup:start_link(),
- coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
- coap_server_registry:add_handler([<<"ps">>], emqx_coap_pubsub_resource, undefined),
- _ = emqx_coap_pubsub_topics:start_link(),
- emqx_coap_server:start(application:get_all_env(?APP)),
- {ok,Sup}.
-
-stop(_State) ->
- coap_server_registry:remove_handler([<<"mqtt">>], emqx_coap_resource, undefined),
- coap_server_registry:remove_handler([<<"ps">>], emqx_coap_pubsub_resource, undefined),
- emqx_coap_server:stop(application:get_all_env(?APP)).
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl
new file mode 100644
index 000000000..d5b9b7293
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl
@@ -0,0 +1,387 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_channel).
+
+-behavior(emqx_gateway_channel).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+%% API
+-export([]).
+
+-export([ info/1
+ , info/2
+ , stats/1
+ , auth_publish/2
+ , auth_subscribe/2
+ , reply/4
+ , ack/4
+ , transfer_result/3]).
+
+-export([ init/2
+ , handle_in/2
+ , handle_deliver/2
+ , handle_timeout/3
+ , terminate/2
+ ]).
+
+-export([ handle_call/2
+ , handle_cast/2
+ , handle_info/2
+ ]).
+
+-export_type([channel/0]).
+
+-record(channel, {
+ %% Context
+ ctx :: emqx_gateway_ctx:context(),
+ %% Connection Info
+ conninfo :: emqx_types:conninfo(),
+ %% Client Info
+ clientinfo :: emqx_types:clientinfo(),
+ %% Session
+ session :: emqx_coap_session:session() | undefined,
+ %% Keepalive
+ keepalive :: emqx_keepalive:keepalive() | undefined,
+ %% Timer
+ timers :: #{atom() => disable | undefined | reference()},
+ config :: hocon:config()
+ }).
+
+-type channel() :: #channel{}.
+-define(DISCONNECT_WAIT_TIME, timer:seconds(10)).
+-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+info(Channel) ->
+ maps:from_list(info(?INFO_KEYS, Channel)).
+
+info(Keys, Channel) when is_list(Keys) ->
+ [{Key, info(Key, Channel)} || Key <- Keys];
+
+info(conninfo, #channel{conninfo = ConnInfo}) ->
+ ConnInfo;
+info(conn_state, _) ->
+ connected;
+info(clientinfo, #channel{clientinfo = ClientInfo}) ->
+ ClientInfo;
+info(session, #channel{session = Session}) ->
+ emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
+info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
+ ClientId;
+info(ctx, #channel{ctx = Ctx}) ->
+ Ctx.
+
+stats(_) ->
+ [].
+
+init(ConnInfo = #{peername := {PeerHost, _},
+ sockname := {_, SockPort}},
+ #{ctx := Ctx} = Config) ->
+ Peercert = maps:get(peercert, ConnInfo, undefined),
+ Mountpoint = maps:get(mountpoint, Config, undefined),
+ ClientInfo = set_peercert_infos(
+ Peercert,
+ #{ zone => default
+ , protocol => 'mqtt-coap'
+ , peerhost => PeerHost
+ , sockport => SockPort
+ , clientid => emqx_guid:to_base62(emqx_guid:gen())
+ , username => undefined
+ , is_bridge => false
+ , is_superuser => false
+ , mountpoint => Mountpoint
+ }
+ ),
+
+ #channel{ ctx = Ctx
+ , conninfo = ConnInfo
+ , clientinfo = ClientInfo
+ , timers = #{}
+ , session = emqx_coap_session:new()
+ , config = Config#{clientinfo => ClientInfo,
+ ctx => Ctx}
+ , keepalive = emqx_keepalive:init(maps:get(heartbeat, Config))
+ }.
+
+auth_publish(Topic,
+ #{ctx := Ctx,
+ clientinfo := ClientInfo}) ->
+ emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic).
+
+auth_subscribe(Topic,
+ #{ctx := Ctx,
+ clientinfo := ClientInfo}) ->
+ emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic).
+
+transfer_result(Result, From, Value) ->
+ ?TRANSFER_RESULT(Result, [out], From, Value).
+
+%%--------------------------------------------------------------------
+%% Handle incoming packet
+%%--------------------------------------------------------------------
+%% treat post to root path as a heartbeat
+%% treat post to root path with query string as a command
+handle_in(#coap_message{method = post,
+ options = Options} = Msg, ChannelT) ->
+ Channel = ensure_keepalive_timer(ChannelT),
+ case maps:get(uri_path, Options, <<>>) of
+ <<>> ->
+ handle_command(Msg, Channel);
+ _ ->
+ call_session(Channel, received, [Msg])
+ end;
+
+handle_in(Msg, Channel) ->
+ call_session(ensure_keepalive_timer(Channel), received, [Msg]).
+
+%%--------------------------------------------------------------------
+%% Handle Delivers from broker to client
+%%--------------------------------------------------------------------
+handle_deliver(Delivers, Channel) ->
+ call_session(Channel, deliver, [Delivers]).
+
+%%--------------------------------------------------------------------
+%% Handle timeout
+%%--------------------------------------------------------------------
+handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
+ case emqx_keepalive:check(NewVal, KeepAlive) of
+ {ok, NewKeepAlive} ->
+ Channel2 = ensure_keepalive_timer(Channel, fun make_timer/4),
+ {ok, Channel2#channel{keepalive = NewKeepAlive}};
+ {error, timeout} ->
+ {shutdown, timeout, Channel}
+ end;
+
+handle_timeout(_, {transport, Msg}, Channel) ->
+ call_session(Channel, timeout, [Msg]);
+
+handle_timeout(_, disconnect, Channel) ->
+ {shutdown, normal, Channel};
+
+handle_timeout(_, _, Channel) ->
+ {ok, Channel}.
+
+%%--------------------------------------------------------------------
+%% Handle call
+%%--------------------------------------------------------------------
+handle_call(Req, Channel) ->
+ ?LOG(error, "Unexpected call: ~p", [Req]),
+ {reply, ignored, Channel}.
+
+%%--------------------------------------------------------------------
+%% Handle Cast
+%%--------------------------------------------------------------------
+handle_cast(Req, Channel) ->
+ ?LOG(error, "Unexpected cast: ~p", [Req]),
+ {ok, Channel}.
+
+%%--------------------------------------------------------------------
+%% Handle Info
+%%--------------------------------------------------------------------
+handle_info(Info, Channel) ->
+ ?LOG(error, "Unexpected info: ~p", [Info]),
+ {ok, Channel}.
+
+%%--------------------------------------------------------------------
+%% Terminate
+%%--------------------------------------------------------------------
+terminate(_Reason, _Channel) ->
+ ok.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+set_peercert_infos(NoSSL, ClientInfo)
+ when NoSSL =:= nossl;
+ NoSSL =:= undefined ->
+ ClientInfo;
+set_peercert_infos(Peercert, ClientInfo) ->
+ {DN, CN} = {esockd_peercert:subject(Peercert),
+ esockd_peercert:common_name(Peercert)},
+ ClientInfo#{dn => DN, cn => CN}.
+
+ensure_timer(Name, Time, Msg, #channel{timers = Timers} = Channel) ->
+ case maps:get(Name, Timers, undefined) of
+ undefined ->
+ make_timer(Name, Time, Msg, Channel);
+ _ ->
+ Channel
+ end.
+
+make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
+ TRef = emqx_misc:start_timer(Time, Msg),
+ Channel#channel{timers = Timers#{Name => TRef}}.
+
+ensure_keepalive_timer(Channel) ->
+ ensure_keepalive_timer(Channel, fun ensure_timer/4).
+
+ensure_keepalive_timer(#channel{config = Cfg} = Channel, Fun) ->
+ Interval = maps:get(heartbeat, Cfg),
+ Fun(keepalive, Interval, keepalive, Channel).
+
+handle_command(#coap_message{options = Options} = Msg, Channel) ->
+ case maps:get(uri_query, Options, []) of
+ [] ->
+ %% heartbeat
+ ack(Channel, {ok, valid}, <<>>, Msg);
+ QueryPairs ->
+ Queries = lists:foldl(fun(Pair, Acc) ->
+ [{K, V}] = cow_qs:parse_qs(Pair),
+ Acc#{K => V}
+ end,
+ #{},
+ QueryPairs),
+ case maps:get(<<"action">>, Queries, undefined) of
+ undefined ->
+ ack(Channel, {error, bad_request}, <<"command without actions">>, Msg);
+ Action ->
+ handle_command(Action, Queries, Msg, Channel)
+ end
+ end.
+
+handle_command(<<"connect">>, Queries, Msg, Channel) ->
+ case emqx_misc:pipeline(
+ [ fun run_conn_hooks/2
+ , fun enrich_clientinfo/2
+ , fun set_log_meta/2
+ , fun auth_connect/2
+ ],
+ {Queries, Msg},
+ Channel) of
+ {ok, _Input, NChannel} ->
+ process_connect(ensure_connected(NChannel), Msg);
+ {error, ReasonCode, NChannel} ->
+ ErrMsg = io_lib:format("Login Failed: ~s", [ReasonCode]),
+ ack(NChannel, {error, bad_request}, ErrMsg, Msg)
+ end;
+
+handle_command(<<"disconnect">>, _, Msg, Channel) ->
+ Channel2 = ensure_timer(disconnect, ?DISCONNECT_WAIT_TIME, disconnect, Channel),
+ ack(Channel2, {ok, deleted}, <<>>, Msg);
+
+handle_command(_, _, Msg, Channel) ->
+ ack(Channel, {error, bad_request}, <<"invalid action">>, Msg).
+
+run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
+ conninfo = ConnInfo}) ->
+ ConnProps = #{},
+ case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
+ Error = {error, _Reason} -> Error;
+ _NConnProps ->
+ {ok, Input, Channel}
+ end.
+
+enrich_clientinfo({Queries, Msg},
+ Channel = #channel{clientinfo = ClientInfo0,
+ config = Cfg}) ->
+ case Queries of
+ #{<<"username">> := UserName,
+ <<"password">> := Password,
+ <<"clientid">> := ClientId} ->
+ ClientInfo = ClientInfo0#{username => UserName,
+ password => Password,
+ clientid => ClientId},
+ {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
+ {ok, Channel#channel{clientinfo = NClientInfo,
+ config = Cfg#{clientinfo := NClientInfo}}};
+ _ ->
+ {error, "invalid queries", Channel}
+ end.
+
+set_log_meta(_Input, #channel{clientinfo = #{clientid := ClientId}}) ->
+ emqx_logger:set_metadata_clientid(ClientId),
+ ok.
+
+auth_connect(_Input, Channel = #channel{ctx = Ctx,
+ clientinfo = ClientInfo}) ->
+ #{clientid := ClientId,
+ username := Username} = ClientInfo,
+ case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of
+ {ok, NClientInfo} ->
+ {ok, Channel#channel{clientinfo = NClientInfo}};
+ {error, Reason} ->
+ ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
+ [ClientId, Username, Reason]),
+ {error, Reason}
+ end.
+
+fix_mountpoint(_Packet, #{mountpoint := undefined}) -> ok;
+fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
+ %% TODO: Enrich the varibale replacement????
+ %% i.e: ${ClientInfo.auth_result.productKey}
+ Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
+ {ok, ClientInfo#{mountpoint := Mountpoint1}}.
+
+ensure_connected(Channel = #channel{ctx = Ctx,
+ conninfo = ConnInfo,
+ clientinfo = ClientInfo}) ->
+ NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
+ ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
+ Channel#channel{conninfo = NConnInfo}.
+
+process_connect(Channel = #channel{ctx = Ctx,
+ session = Session,
+ conninfo = ConnInfo,
+ clientinfo = ClientInfo},
+ Msg) ->
+ SessFun = fun(_,_) -> Session end,
+ case emqx_gateway_ctx:open_session(
+ Ctx,
+ true,
+ ClientInfo,
+ ConnInfo,
+ SessFun
+ ) of
+ {ok, _Sess} ->
+ ack(Channel, {ok, created}, <<"connected">>, Msg);
+ {error, Reason} ->
+ ?LOG(error, "Failed to open session du to ~p", [Reason]),
+ ack(Channel, {error, bad_request}, <<>>, Msg)
+ end.
+
+run_hooks(Ctx, Name, Args) ->
+ emqx_gateway_ctx:metrics_inc(Ctx, Name),
+ emqx_hooks:run(Name, Args).
+
+run_hooks(Ctx, Name, Args, Acc) ->
+ emqx_gateway_ctx:metrics_inc(Ctx, Name),
+ emqx_hooks:run_fold(Name, Args, Acc).
+
+reply(Channel, Method, Payload, Req) ->
+ call_session(Channel, reply, [Req, Method, Payload]).
+
+ack(Channel, Method, Payload, Req) ->
+ call_session(Channel, piggyback, [Req, Method, Payload]).
+
+call_session(#channel{session = Session,
+ config = Cfg} = Channel, F, A) ->
+ case erlang:apply(emqx_coap_session, F, [Session, Cfg | A]) of
+ #{out := Out,
+ session := Session2} ->
+ {ok, {outgoing, Out}, Channel#channel{session = Session2}};
+ #{out := Out} ->
+ {ok, {outgoing, Out}, Channel};
+ #{session := Session2} ->
+ {ok, Channel#channel{session = Session2}};
+ _ ->
+ {ok, Channel}
+ end.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl
new file mode 100644
index 000000000..039190646
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl
@@ -0,0 +1,423 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_frame).
+
+-behavior(emqx_gateway_frame).
+
+%% emqx_gateway_frame API
+-export([ initial_parse_state/1
+ , serialize_opts/0
+ , serialize_pkt/2
+ , parse/2
+ , format/1
+ , type/1
+ , is_message/1]).
+
+%% API
+-export([]).
+
+-include("include/emqx_coap.hrl").
+-include("apps/emqx/include/types.hrl").
+
+-define(VERSION, 1).
+
+-define(OPTION_IF_MATCH, 1).
+-define(OPTION_URI_HOST, 3).
+-define(OPTION_ETAG, 4).
+-define(OPTION_IF_NONE_MATCH, 5).
+-define(OPTION_OBSERVE, 6). % draft-ietf-core-observe-16
+-define(OPTION_URI_PORT, 7).
+-define(OPTION_LOCATION_PATH, 8).
+-define(OPTION_URI_PATH, 11).
+-define(OPTION_CONTENT_FORMAT, 12).
+-define(OPTION_MAX_AGE, 14).
+-define(OPTION_URI_QUERY, 15).
+-define(OPTION_ACCEPT, 17).
+-define(OPTION_LOCATION_QUERY, 20).
+-define(OPTION_BLOCK2, 23). % draft-ietf-core-block-17
+-define(OPTION_BLOCK1, 27).
+-define(OPTION_PROXY_URI, 35).
+-define(OPTION_PROXY_SCHEME, 39).
+-define(OPTION_SIZE1, 60).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+initial_parse_state(_) ->
+ #{}.
+
+serialize_opts() ->
+ #{}.
+
+%%%===================================================================
+%%% serialize_pkt
+%%%===================================================================
+%% empty message
+serialize_pkt(#coap_message{type = Type, method = undefined, id = MsgId}, _Opts) ->
+ <>;
+
+serialize_pkt(#coap_message{ type = Type
+ , method = Method
+ , id = MsgId
+ , token = Token
+ , options = Options
+ , payload = Payload
+ },
+ _Opts) ->
+ TKL = byte_size(Token),
+ {Class, Code} = method_to_class_code(Method),
+ Head = <>,
+ FlatOpts = flatten_options(Options),
+ encode_option_list(FlatOpts, 0, Head, Payload).
+
+-spec encode_type(message_type()) -> 0 .. 3.
+encode_type(con) -> 0;
+encode_type(non) -> 1;
+encode_type(ack) -> 2;
+encode_type(reset) -> 3.
+
+flatten_options(Opts) ->
+ flatten_options(maps:to_list(Opts), []).
+
+flatten_options([{_OptId, undefined} | T], Acc) ->
+ flatten_options(T, Acc);
+
+flatten_options([{OptId, OptVal} | T], Acc) ->
+ flatten_options(T,
+ case is_repeatable_option(OptId) of
+ false ->
+ [encode_option(OptId, OptVal) | Acc];
+ _ ->
+ lists:foldl(fun(undefined, InnerAcc) ->
+ InnerAcc;
+ (E, InnerAcc) ->
+ [encode_option(OptId, E) | InnerAcc]
+ end, Acc, OptVal)
+ end);
+
+flatten_options([], Acc) ->
+ %% sort by option id for calculate the deltas
+ lists:keysort(1, Acc).
+
+encode_option_list([{OptNum, OptVal} | OptionList], LastNum, Acc, Payload) ->
+ NumDiff = OptNum - LastNum,
+ {Delta, ExtNum} = if
+ NumDiff >= 269 ->
+ {14, <<(NumDiff - 269):16>>};
+ OptNum - LastNum >= 13 ->
+ {13, <<(NumDiff - 13)>>};
+ true ->
+ {NumDiff, <<>>}
+ end,
+ Binaryize = byte_size(OptVal),
+ {Len, ExtLen} = if
+ Binaryize >= 269 ->
+ {14, <<(Binaryize - 269):16>>};
+ Binaryize >= 13 ->
+ {13, <<(Binaryize - 13)>>};
+ true ->
+ {Binaryize, <<>>}
+ end,
+ Acc2 = <>,
+ encode_option_list(OptionList, OptNum, Acc2, Payload);
+
+encode_option_list([], _LastNum, Acc, <<>>) ->
+ Acc;
+encode_option_list([], _, Acc, Payload) ->
+ <>.
+
+%% RFC 7252
+encode_option(if_match, OptVal) -> {?OPTION_IF_MATCH, OptVal};
+encode_option(uri_host, OptVal) -> {?OPTION_URI_HOST, OptVal};
+encode_option(etag, OptVal) -> {?OPTION_ETAG, OptVal};
+encode_option(if_none_match, true) -> {?OPTION_IF_NONE_MATCH, <<>>};
+encode_option(uri_port, OptVal) -> {?OPTION_URI_PORT, binary:encode_unsigned(OptVal)};
+encode_option(location_path, OptVal) -> {?OPTION_LOCATION_PATH, OptVal};
+encode_option(uri_path, OptVal) -> {?OPTION_URI_PATH, OptVal};
+encode_option(content_format, OptVal) when is_integer(OptVal) ->
+ {?OPTION_CONTENT_FORMAT, binary:encode_unsigned(OptVal)};
+encode_option(content_format, OptVal) ->
+ Num = content_format_to_code(OptVal),
+ {?OPTION_CONTENT_FORMAT, binary:encode_unsigned(Num)};
+encode_option(max_age, OptVal) -> {?OPTION_MAX_AGE, binary:encode_unsigned(OptVal)};
+encode_option(uri_query, OptVal) -> {?OPTION_URI_QUERY, OptVal};
+encode_option('accept', OptVal) -> {?OPTION_ACCEPT, binary:encode_unsigned(OptVal)};
+encode_option(location_query, OptVal) -> {?OPTION_LOCATION_QUERY, OptVal};
+encode_option(proxy_uri, OptVal) -> {?OPTION_PROXY_URI, OptVal};
+encode_option(proxy_scheme, OptVal) -> {?OPTION_PROXY_SCHEME, OptVal};
+encode_option(size1, OptVal) -> {?OPTION_SIZE1, binary:encode_unsigned(OptVal)};
+%% draft-ietf-ore-observe-16
+encode_option(observe, OptVal) -> {?OPTION_OBSERVE, binary:encode_unsigned(OptVal)};
+%% draft-ietf-ore-block-17
+encode_option(block2, OptVal) -> {?OPTION_BLOCK2, encode_block(OptVal)};
+encode_option(block1, OptVal) -> {?OPTION_BLOCK1, encode_block(OptVal)};
+%% unknown opton
+encode_option(Option, Value) ->
+ erlang:throw({bad_option, Option, Value}).
+
+encode_block({Num, More, Size}) ->
+ encode_block1(Num,
+ if More -> 1; true -> 0 end,
+ trunc(math:log2(Size))-4).
+
+encode_block1(Num, M, SizEx) when Num < 16 ->
+ <>;
+encode_block1(Num, M, SizEx) when Num < 4096 ->
+ <>;
+encode_block1(Num, M, SizEx) ->
+ <>.
+
+-spec content_format_to_code(binary()) -> non_neg_integer().
+content_format_to_code(<<"text/plain">>) -> 0;
+content_format_to_code(<<"application/link-format">>) -> 40;
+content_format_to_code(<<"application/xml">>) ->41;
+content_format_to_code(<<"application/octet-stream">>) -> 42;
+content_format_to_code(<<"application/exi">>) -> 47;
+content_format_to_code(<<"application/json">>) -> 50;
+content_format_to_code(<<"application/cbor">>) -> 60;
+content_format_to_code(_) -> 42. %% use octet-stream as default
+
+method_to_class_code(get) -> {0, 01};
+method_to_class_code(post) -> {0, 02};
+method_to_class_code(put) -> {0, 03};
+method_to_class_code(delete) -> {0, 04};
+method_to_class_code({ok, created}) -> {2, 01};
+method_to_class_code({ok, deleted}) -> {2, 02};
+method_to_class_code({ok, valid}) -> {2, 03};
+method_to_class_code({ok, changed}) -> {2, 04};
+method_to_class_code({ok, content}) -> {2, 05};
+method_to_class_code({ok, nocontent}) -> {2, 07};
+method_to_class_code({ok, continue}) -> {2, 31};
+method_to_class_code({error, bad_request}) -> {4, 00};
+method_to_class_code({error, unauthorized}) -> {4, 01};
+method_to_class_code({error, bad_option}) -> {4, 02};
+method_to_class_code({error, forbidden}) -> {4, 03};
+method_to_class_code({error, not_found}) -> {4, 04};
+method_to_class_code({error, method_not_allowed}) -> {4, 05};
+method_to_class_code({error, not_acceptable}) -> {4, 06};
+method_to_class_code({error, request_entity_incomplete}) -> {4, 08};
+method_to_class_code({error, precondition_failed}) -> {4, 12};
+method_to_class_code({error, request_entity_too_large}) -> {4, 13};
+method_to_class_code({error, unsupported_content_format}) -> {4, 15};
+method_to_class_code({error, internal_server_error}) -> {5, 00};
+method_to_class_code({error, not_implemented}) -> {5, 01};
+method_to_class_code({error, bad_gateway}) -> {5, 02};
+method_to_class_code({error, service_unavailable}) -> {5, 03};
+method_to_class_code({error, gateway_timeout}) -> {5, 04};
+method_to_class_code({error, proxying_not_supported}) -> {5, 05};
+method_to_class_code(Method) ->
+ erlang:throw({bad_method, Method}).
+
+%%%===================================================================
+%%% parse
+%%%===================================================================
+parse(<>, ParseState) ->
+ {ok,
+ #coap_message{ type = decode_type(Type)
+ , id = MsgId},
+ <<>>,
+ ParseState};
+
+parse(<>,
+ ParseState) ->
+ {Options, Payload} = decode_option_list(Tail),
+ Options2 = maps:fold(fun(K, V, Acc) ->
+ case is_repeatable_option(K) of
+ true ->
+ Acc#{K => lists:reverse(V)};
+ _ ->
+ Acc#{K => V}
+ end
+ end,
+ #{},
+ Options),
+ {ok,
+ #coap_message{ type = decode_type(Type)
+ , method = class_code_to_method({Class, Code})
+ , id = MsgId
+ , token = Token
+ , options = Options2
+ , payload = Payload
+ },
+ <<>>,
+ ParseState}.
+
+-spec decode_type(X) -> message_type()
+ when X :: 0 .. 3.
+decode_type(0) -> con;
+decode_type(1) -> non;
+decode_type(2) -> ack;
+decode_type(3) -> reset.
+
+-spec decode_option_list(binary()) -> {message_options(), binary()}.
+decode_option_list(Bin) ->
+ decode_option_list(Bin, 0, #{}).
+
+decode_option_list(<<>>, _OptNum, OptMap) ->
+ {OptMap, <<>>};
+
+decode_option_list(<<16#FF, Payload/binary>>, _OptNum, OptMap) ->
+ {OptMap, Payload};
+
+decode_option_list(<>, OptNum, OptMap) ->
+ case Delta of
+ Any when Any < 13 ->
+ decode_option_len(Bin, OptNum + Delta, Len, OptMap);
+ 13 ->
+ <> = Bin,
+ decode_option_len(NewBin, OptNum + ExtOptNum + 13, Len, OptMap);
+ 14 ->
+ <> = Bin,
+ decode_option_len(NewBin, OptNum + ExtOptNum + 269, Len, OptMap)
+ end.
+
+decode_option_len(<>, OptNum, Len, OptMap) ->
+ case Len of
+ Any when Any < 13 ->
+ decode_option_value(Bin, OptNum, Len, OptMap);
+ 13 ->
+ <> = Bin,
+ decode_option_value(NewBin, OptNum, ExtOptLen + 13, OptMap);
+ 14 ->
+ <> = Bin,
+ decode_option_value(NewBin, OptNum, ExtOptLen + 269, OptMap)
+ end.
+
+decode_option_value(<>, OptNum, OptLen, OptMap) ->
+ case Bin of
+ <> ->
+ decode_option_list(NewBin, OptNum, append_option(OptNum, OptVal, OptMap));
+ <<>> ->
+ decode_option_list(<<>>, OptNum, append_option(OptNum, <<>>, OptMap))
+ end.
+
+append_option(OptNum, RawOptVal, OptMap) ->
+ {OptId, OptVal} = decode_option(OptNum, RawOptVal),
+ case is_repeatable_option(OptId) of
+ false ->
+ OptMap#{OptId => OptVal};
+ _ ->
+ case maps:get(OptId, OptMap, undefined) of
+ undefined ->
+ OptMap#{OptId => [OptVal]};
+ OptVals ->
+ OptMap#{OptId => [OptVal | OptVals]}
+ end
+ end.
+
+%% RFC 7252
+decode_option(?OPTION_IF_MATCH, OptVal) -> {if_match, OptVal};
+decode_option(?OPTION_URI_HOST, OptVal) -> {uri_host, OptVal};
+decode_option(?OPTION_ETAG, OptVal) -> {etag, OptVal};
+decode_option(?OPTION_IF_NONE_MATCH, <<>>) -> {if_none_match, true};
+decode_option(?OPTION_URI_PORT, OptVal) -> {uri_port, binary:decode_unsigned(OptVal)};
+decode_option(?OPTION_LOCATION_PATH, OptVal) -> {location_path, OptVal};
+decode_option(?OPTION_URI_PATH, OptVal) -> {uri_path, OptVal};
+decode_option(?OPTION_CONTENT_FORMAT, OptVal) ->
+ Num = binary:decode_unsigned(OptVal),
+ {content_format, content_code_to_format(Num)};
+decode_option(?OPTION_MAX_AGE, OptVal) -> {max_age, binary:decode_unsigned(OptVal)};
+decode_option(?OPTION_URI_QUERY, OptVal) -> {uri_query, OptVal};
+decode_option(?OPTION_ACCEPT, OptVal) -> {'accept', binary:decode_unsigned(OptVal)};
+decode_option(?OPTION_LOCATION_QUERY, OptVal) -> {location_query, OptVal};
+decode_option(?OPTION_PROXY_URI, OptVal) -> {proxy_uri, OptVal};
+decode_option(?OPTION_PROXY_SCHEME, OptVal) -> {proxy_scheme, OptVal};
+decode_option(?OPTION_SIZE1, OptVal) -> {size1, binary:decode_unsigned(OptVal)};
+%% draft-ietf-core-observe-16
+decode_option(?OPTION_OBSERVE, OptVal) -> {observe, binary:decode_unsigned(OptVal)};
+%% draft-ietf-core-block-17
+decode_option(?OPTION_BLOCK2, OptVal) -> {block2, decode_block(OptVal)};
+decode_option(?OPTION_BLOCK1, OptVal) -> {block1, decode_block(OptVal)};
+%% unknown option
+decode_option(OptNum, OptVal) -> {OptNum, OptVal}.
+
+decode_block(<>) -> decode_block1(Num, M, SizEx);
+decode_block(<>) -> decode_block1(Num, M, SizEx);
+decode_block(<>) -> decode_block1(Num, M, SizEx).
+
+decode_block1(Num, M, SizEx) ->
+ {Num, M =/= 0, trunc(math:pow(2, SizEx + 4))}.
+
+-spec content_code_to_format(non_neg_integer()) -> binary().
+content_code_to_format(0) -> <<"text/plain">>;
+content_code_to_format(40) -> <<"application/link-format">>;
+content_code_to_format(41) -> <<"application/xml">>;
+content_code_to_format(42) -> <<"application/octet-stream">>;
+content_code_to_format(47) -> <<"application/exi">>;
+content_code_to_format(50) -> <<"application/json">>;
+content_code_to_format(60) -> <<"application/cbor">>;
+content_code_to_format(_) -> <<"application/octet-stream">>. %% use octet as default
+
+%% RFC 7252
+%% atom indicate a request
+class_code_to_method({0, 01}) -> get;
+class_code_to_method({0, 02}) -> post;
+class_code_to_method({0, 03}) -> put;
+class_code_to_method({0, 04}) -> delete;
+
+%% success is a tuple {ok, ...}
+class_code_to_method({2, 01}) -> {ok, created};
+class_code_to_method({2, 02}) -> {ok, deleted};
+class_code_to_method({2, 03}) -> {ok, valid};
+class_code_to_method({2, 04}) -> {ok, changed};
+class_code_to_method({2, 05}) -> {ok, content};
+class_code_to_method({2, 07}) -> {ok, nocontent};
+class_code_to_method({2, 31}) -> {ok, continue}; % block
+
+%% error is a tuple {error, ...}
+class_code_to_method({4, 00}) -> {error, bad_request};
+class_code_to_method({4, 01}) -> {error, unauthorized};
+class_code_to_method({4, 02}) -> {error, bad_option};
+class_code_to_method({4, 03}) -> {error, forbidden};
+class_code_to_method({4, 04}) -> {error, not_found};
+class_code_to_method({4, 05}) -> {error, method_not_allowed};
+class_code_to_method({4, 06}) -> {error, not_acceptable};
+class_code_to_method({4, 08}) -> {error, request_entity_incomplete}; % block
+class_code_to_method({4, 12}) -> {error, precondition_failed};
+class_code_to_method({4, 13}) -> {error, request_entity_too_large};
+class_code_to_method({4, 15}) -> {error, unsupported_content_format};
+class_code_to_method({5, 00}) -> {error, internal_server_error};
+class_code_to_method({5, 01}) -> {error, not_implemented};
+class_code_to_method({5, 02}) -> {error, bad_gateway};
+class_code_to_method({5, 03}) -> {error, service_unavailable};
+class_code_to_method({5, 04}) -> {error, gateway_timeout};
+class_code_to_method({5, 05}) -> {error, proxying_not_supported};
+class_code_to_method(_) -> undefined.
+
+format(Msg) ->
+ io_lib:format("~p", [Msg]).
+
+type(_) ->
+ coap.
+
+is_message(#coap_message{}) ->
+ true;
+is_message(_) ->
+ false.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec is_repeatable_option(message_option_name()) -> boolean().
+is_repeatable_option(if_match) -> true;
+is_repeatable_option(etag) -> true;
+is_repeatable_option(location_path) -> true;
+is_repeatable_option(uri_path) -> true;
+is_repeatable_option(uri_query) -> true;
+is_repeatable_option(location_query) -> true;
+is_repeatable_option(_) -> false.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl
new file mode 100644
index 000000000..62cdd3bf2
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl
@@ -0,0 +1,156 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_impl).
+
+-include_lib("emqx_gateway/include/emqx_gateway.hrl").
+
+-behavior(emqx_gateway_impl).
+
+%% APIs
+-export([ load/0
+ , unload/0
+ ]).
+
+-export([ init/1
+ , on_insta_create/3
+ , on_insta_update/4
+ , on_insta_destroy/3
+ ]).
+
+-dialyzer({nowarn_function, [load/0]}).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+load() ->
+ RegistryOptions = [ {cbkmod, ?MODULE}
+ ],
+ Options = [],
+ emqx_gateway_registry:load(coap, RegistryOptions, Options).
+
+unload() ->
+ emqx_gateway_registry:unload(coap).
+
+init([]) ->
+ GwState = #{},
+ {ok, GwState}.
+
+%%--------------------------------------------------------------------
+%% emqx_gateway_registry callbacks
+%%--------------------------------------------------------------------
+
+on_insta_create(_Insta = #{id := InstaId,
+ rawconf := #{resource := Resource} = RawConf
+ }, Ctx, _GwState) ->
+ ResourceMod = get_resource_mod(Resource),
+ Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
+ ListenerPids = lists:map(fun(Lis) ->
+ start_listener(InstaId, Ctx, ResourceMod, Lis)
+ end, Listeners),
+
+ {ok, ResCtx} = ResourceMod:init(RawConf),
+ {ok, ListenerPids, #{ctx => Ctx,
+ res_ctx => ResCtx}}.
+
+on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
+ InstaId = maps:get(id, NewInsta),
+ try
+ %% XXX: 1. How hot-upgrade the changes ???
+ %% XXX: 2. Check the New confs first before destroy old instance ???
+ on_insta_destroy(OldInsta, GwInstaState, GwState),
+ on_insta_create(NewInsta, Ctx, GwState)
+ catch
+ Class : Reason : Stk ->
+ logger:error("Failed to update coap instance ~s; "
+ "reason: {~0p, ~0p} stacktrace: ~0p",
+ [InstaId, Class, Reason, Stk]),
+ {error, {Class, Reason}}
+ end.
+
+on_insta_destroy(_Insta = #{ id := InstaId,
+ rawconf := #{resource := Resource} = RawConf
+ },
+ #{res_ctx := ResCtx} = _GwInstaState,
+ _GWState) ->
+ ResourceMod = get_resource_mod(Resource),
+ ok = ResourceMod:stop(ResCtx),
+ Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
+ lists:foreach(fun(Lis) ->
+ stop_listener(InstaId, Lis)
+ end, Listeners).
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+start_listener(InstaId, Ctx, ResourceMod, {Type, ListenOn, SocketOpts, Cfg}) ->
+ ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
+ Cfg2 = Cfg#{resource => ResourceMod},
+ case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg2) of
+ {ok, Pid} ->
+ io:format("Start coap ~s:~s listener on ~s successfully.~n",
+ [InstaId, Type, ListenOnStr]),
+ Pid;
+ {error, Reason} ->
+ io:format(standard_error,
+ "Failed to start coap ~s:~s listener on ~s: ~0p~n",
+ [InstaId, Type, ListenOnStr, Reason]),
+ throw({badconf, Reason})
+ end.
+
+start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
+ Name = name(InstaId, Type),
+ NCfg = Cfg#{
+ ctx => Ctx,
+ frame_mod => emqx_coap_frame,
+ chann_mod => emqx_coap_channel
+ },
+ MFA = {emqx_gateway_conn, start_link, [NCfg]},
+ do_start_listener(Type, Name, ListenOn, SocketOpts, MFA).
+
+do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
+ esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
+
+do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
+ esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
+
+name(InstaId, Type) ->
+ list_to_atom(lists:concat([InstaId, ":", Type])).
+
+stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
+ StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
+ ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
+ case StopRet of
+ ok -> io:format("Stop coap ~s:~s listener on ~s successfully.~n",
+ [InstaId, Type, ListenOnStr]);
+ {error, Reason} ->
+ io:format(standard_error,
+ "Failed to stop coap ~s:~s listener on ~s: ~0p~n",
+ [InstaId, Type, ListenOnStr, Reason]
+ )
+ end,
+ StopRet.
+
+stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
+ Name = name(InstaId, Type),
+ esockd:close(Name, ListenOn).
+
+get_resource_mod(mqtt) ->
+ emqx_coap_mqtt_resource;
+get_resource_mod(pubsub) ->
+ emqx_coap_pubsub_resource.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_message.erl b/apps/emqx_gateway/src/coap/emqx_coap_message.erl
new file mode 100644
index 000000000..52a03c418
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_message.erl
@@ -0,0 +1,146 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Copyright (c) 2015 Petr Gotthard
+
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% convenience functions for message construction
+-module(emqx_coap_message).
+
+-export([request/2, request/3, request/4, ack/1, response/1, response/2, response/3]).
+-export([set/3, set_payload/2, get_content/1, set_content/2, set_content/3, get_option/2]).
+
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+request(Type, Method) ->
+ request(Type, Method, <<>>, []).
+
+request(Type, Method, Payload) ->
+ request(Type, Method, Payload, []).
+
+request(Type, Method, Payload, Options) when is_binary(Payload) ->
+ #coap_message{type = Type, method = Method, payload = Payload, options = Options};
+
+request(Type, Method, Content=#coap_content{}, Options) ->
+ set_content(Content,
+ #coap_message{type = Type, method = Method, options = Options}).
+
+ack(Request = #coap_message{}) ->
+ #coap_message{type = ack,
+ id = Request#coap_message.id}.
+
+response(#coap_message{type = Type,
+ id = Id,
+ token = Token}) ->
+ #coap_message{type = Type,
+ id = Id,
+ token = Token}.
+
+response(Method, Request) ->
+ set_method(Method, response(Request)).
+
+response(Method, Payload, Request) ->
+ set_method(Method,
+ set_payload(Payload,
+ response(Request))).
+
+%% omit option for its default value
+set(max_age, ?DEFAULT_MAX_AGE, Msg) -> Msg;
+
+%% set non-default value
+set(Option, Value, Msg = #coap_message{options = Options}) ->
+ Msg#coap_message{options = Options#{Option => Value}}.
+
+get_option(Option, #coap_message{options = Options}) ->
+ maps:get(Option, Options, undefined).
+
+set_method(Method, Msg) ->
+ Msg#coap_message{method = Method}.
+
+set_payload(Payload = #coap_content{}, Msg) ->
+ set_content(Payload, undefined, Msg);
+
+set_payload(Payload, Msg) when is_binary(Payload) ->
+ Msg#coap_message{payload = Payload};
+
+set_payload(Payload, Msg) when is_list(Payload) ->
+ Msg#coap_message{payload = list_to_binary(Payload)}.
+
+get_content(#coap_message{options = Options, payload = Payload}) ->
+ #coap_content{etag = maps:get(etag, Options, undefined),
+ max_age = maps:get(max_age, Options, ?DEFAULT_MAX_AGE),
+ format = maps:get(content_format, Options, undefined),
+ location_path = maps:get(location_path, Options, []),
+ payload = Payload}.
+
+set_content(Content, Msg) ->
+ set_content(Content, undefined, Msg).
+
+%% segmentation not requested and not required
+set_content(#coap_content{etag = ETag,
+ max_age = MaxAge,
+ format = Format,
+ location_path = LocPath,
+ payload = Payload},
+ undefined,
+ Msg)
+ when byte_size(Payload) =< ?MAX_BLOCK_SIZE ->
+ #coap_message{options = Options} = Msg2 = set_payload(Payload, Msg),
+ Options2 = Options#{etag => [ETag],
+ max_age => MaxAge,
+ content_format => Format,
+ location_path => LocPath},
+ Msg2#coap_message{options = Options2};
+
+%% segmentation not requested, but required (late negotiation)
+set_content(Content, undefined, Msg) ->
+ set_content(Content, {0, true, ?MAX_BLOCK_SIZE}, Msg);
+
+%% segmentation requested (early negotiation)
+set_content(#coap_content{etag = ETag,
+ max_age = MaxAge,
+ format = Format,
+ payload = Payload},
+ Block,
+ Msg) ->
+ #coap_message{options = Options} = Msg2 = set_payload_block(Payload, Block, Msg),
+ Options2 = Options#{etag => [ETag],
+ max => MaxAge,
+ content_format => Format},
+ Msg2#coap_message{options = Options2}.
+
+set_payload_block(Content, Block, Msg = #coap_message{method = Method}) when is_atom(Method) ->
+ set_payload_block(Content, block1, Block, Msg);
+
+set_payload_block(Content, Block, Msg = #coap_message{}) ->
+ set_payload_block(Content, block2, Block, Msg).
+
+set_payload_block(Content, BlockId, {Num, _, Size}, Msg) ->
+ ContentSize = erlang:byte_size(Content),
+ OffsetBegin = Size * Num,
+ OffsetEnd = OffsetBegin + Size,
+ case ContentSize > OffsetEnd of
+ true ->
+ set(BlockId, {Num, true, Size},
+ set_payload(binary:part(Content, OffsetBegin, Size), Msg));
+ _ ->
+ set(BlockId, {Num, false, Size},
+ set_payload(binary:part(Content, OffsetBegin, ContentSize - OffsetBegin), Msg))
+ end.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_mqtt_adapter.erl b/apps/emqx_gateway/src/coap/emqx_coap_mqtt_adapter.erl
deleted file mode 100644
index 0734d2e71..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_mqtt_adapter.erl
+++ /dev/null
@@ -1,387 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_mqtt_adapter).
-
--behaviour(gen_server).
-
--include("src/coap/include/emqx_coap.hrl").
-
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-
--logger_header("[CoAP-Adpter]").
-
-%% API.
--export([ subscribe/2
- , unsubscribe/2
- , publish/3
- ]).
-
--export([ client_pid/4
- , stop/1
- ]).
-
--export([ call/2
- , call/3
- ]).
-
-%% gen_server.
--export([ init/1
- , handle_call/3
- , handle_cast/2
- , handle_info/2
- , terminate/2
- , code_change/3
- ]).
-
--record(state, {peername, clientid, username, password, sub_topics = [], connected_at}).
-
--define(ALIVE_INTERVAL, 20000).
-
--define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-
--define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}).
-
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
-client_pid(undefined, _Username, _Password, _Channel) ->
- {error, bad_request};
-client_pid(ClientId, Username, Password, Channel) ->
- % check authority
- case start(ClientId, Username, Password, Channel) of
- {ok, Pid1} -> {ok, Pid1};
- {error, {already_started, Pid2}} -> {ok, Pid2};
- {error, auth_failure} -> {error, auth_failure};
- Other -> {error, Other}
- end.
-
-start(ClientId, Username, Password, Channel) ->
- % DO NOT use start_link, since multiple coap_reponsder may have relation with one mqtt adapter,
- % one coap_responder crashes should not make mqtt adapter crash too
- % And coap_responder is not a system process
- % it is dangerous to link mqtt adapter to coap_responder
- gen_server:start({via, emqx_coap_registry, {ClientId, Username, Password}},
- ?MODULE, {ClientId, Username, Password, Channel}, []).
-
-stop(Pid) ->
- gen_server:stop(Pid).
-
-subscribe(Pid, Topic) ->
- gen_server:call(Pid, {subscribe, Topic, self()}).
-
-unsubscribe(Pid, Topic) ->
- gen_server:call(Pid, {unsubscribe, Topic, self()}).
-
-publish(Pid, Topic, Payload) ->
- gen_server:call(Pid, {publish, Topic, Payload}).
-
-%% For emqx_management plugin
-call(Pid, Msg) ->
- call(Pid, Msg, infinity).
-
-call(Pid, Msg, _) ->
- Pid ! Msg, ok.
-
-%%--------------------------------------------------------------------
-%% gen_server Callbacks
-%%--------------------------------------------------------------------
-
-init({ClientId, Username, Password, Channel}) ->
- ?LOG(debug, "try to start adapter ClientId=~p, Username=~p, "
- "Channel=~0p", [ClientId, Username, Channel]),
- State0 = #state{peername = Channel,
- clientid = ClientId,
- username = Username,
- password = Password},
- _ = run_hooks('client.connect', [conninfo(State0)], undefined),
- case emqx_access_control:authenticate(clientinfo(State0)) of
- ok ->
- ok = emqx_cm:discard_session(ClientId),
-
- _ = run_hooks('client.connack', [conninfo(State0), success], undefined),
-
- State = State0#state{connected_at = erlang:system_time(millisecond)},
-
- run_hooks('client.connected', [clientinfo(State), conninfo(State)]),
-
- Self = self(),
- erlang:send_after(?ALIVE_INTERVAL, Self, check_alive),
- _ = emqx_cm_locker:trans(ClientId, fun(_) ->
- emqx_cm:register_channel(ClientId, Self, conninfo(State))
- end),
- emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
- {ok, State};
- {error, Reason} ->
- ?LOG(debug, "authentication faild: ~p", [Reason]),
- _ = run_hooks('client.connack', [conninfo(State0), not_authorized], undefined),
- {stop, {shutdown, Reason}}
- end.
-
-handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
- NewTopics = proplists:delete(Topic, TopicList),
- IsWild = emqx_topic:wildcard(Topic),
- {reply, chann_subscribe(Topic, State), State#state{sub_topics =
- [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
-
-handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
- NewTopics = proplists:delete(Topic, TopicList),
- chann_unsubscribe(Topic, State),
- {reply, ok, State#state{sub_topics = NewTopics}, hibernate};
-
-handle_call({publish, Topic, Payload}, _From, State) ->
- {reply, chann_publish(Topic, Payload, State), State};
-
-handle_call(info, _From, State) ->
- {reply, info(State), State};
-
-handle_call(stats, _From, State) ->
- {reply, stats(State), State, hibernate};
-
-handle_call(kick, _From, State) ->
- {stop, {shutdown, kick}, ok, State};
-
-handle_call({set_rate_limit, _Rl}, _From, State) ->
- ?LOG(error, "set_rate_limit is not support", []),
- {reply, ok, State};
-
-handle_call(get_rate_limit, _From, State) ->
- ?LOG(error, "get_rate_limit is not support", []),
- {reply, ok, State};
-
-handle_call(Request, _From, State) ->
- ?LOG(error, "adapter unexpected call ~p", [Request]),
- {reply, ignored, State, hibernate}.
-
-handle_cast(Msg, State) ->
- ?LOG(error, "broker_api unexpected cast ~p", [Msg]),
- {noreply, State, hibernate}.
-
-handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}},
- State = #state{sub_topics = Subscribers}) ->
- deliver([{Topic, Payload}], Subscribers),
- {noreply, State, hibernate};
-
-handle_info(check_alive, State = #state{sub_topics = []}) ->
- {stop, {shutdown, check_alive}, State};
-handle_info(check_alive, State) ->
- erlang:send_after(?ALIVE_INTERVAL, self(), check_alive),
- {noreply, State, hibernate};
-
-handle_info({shutdown, Error}, State) ->
- {stop, {shutdown, Error}, State};
-
-handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
- ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
- {stop, {shutdown, conflict}, State};
-
-handle_info(discard, State) ->
- ?LOG(warning, "the connection is discarded. " ++
- "possibly there is another client with the same clientid", []),
- {stop, {shutdown, discarded}, State};
-
-handle_info(kick, State) ->
- ?LOG(info, "Kicked", []),
- {stop, {shutdown, kick}, State};
-
-handle_info(Info, State) ->
- ?LOG(error, "adapter unexpected info ~p", [Info]),
- {noreply, State, hibernate}.
-
-terminate(Reason, State = #state{clientid = ClientId, sub_topics = SubTopics}) ->
- ?LOG(debug, "unsubscribe ~p while exiting for ~p", [SubTopics, Reason]),
- [chann_unsubscribe(Topic, State) || {Topic, _} <- SubTopics],
- emqx_cm:unregister_channel(ClientId),
-
- ConnInfo0 = conninfo(State),
- ConnInfo = ConnInfo0#{disconnected_at => erlang:system_time(millisecond)},
- run_hooks('client.disconnected', [clientinfo(State), Reason, ConnInfo]).
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%% Channel adapter functions
-
-chann_subscribe(Topic, State = #state{clientid = ClientId}) ->
- ?LOG(debug, "subscribe Topic=~p", [Topic]),
- case emqx_access_control:authorize(clientinfo(State), subscribe, Topic) of
- allow ->
- emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS),
- emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]),
- ok;
- deny ->
- ?LOG(warning, "subscribe to ~p by clientid ~p failed due to authz check.",
- [Topic, ClientId]),
- {error, forbidden}
- end.
-
-chann_unsubscribe(Topic, State) ->
- ?LOG(debug, "unsubscribe Topic=~p", [Topic]),
- Opts = #{rh => 0, rap => 0, nl => 0, qos => 0},
- emqx_broker:unsubscribe(Topic),
- emqx_hooks:run('session.unsubscribed', [clientinfo(State), Topic, Opts]).
-
-chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
- ?LOG(debug, "publish Topic=~p, Payload=~p", [Topic, Payload]),
- case emqx_access_control:authorize(clientinfo(State), publish, Topic) of
- allow ->
- _ = emqx_broker:publish(
- emqx_message:set_flag(retain, false,
- emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
- ok;
- deny ->
- ?LOG(warning, "publish to ~p by clientid ~p failed due to authz check.",
- [Topic, ClientId]),
- {error, forbidden}
- end.
-
-
-%%--------------------------------------------------------------------
-%% Deliver
-
-deliver([], _) -> ok;
-deliver([Pub | More], Subscribers) ->
- ok = do_deliver(Pub, Subscribers),
- deliver(More, Subscribers).
-
-do_deliver({Topic, Payload}, Subscribers) ->
- %% handle PUBLISH packet from broker
- ?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
- deliver_to_coap(Topic, Payload, Subscribers),
- ok.
-
-deliver_to_coap(_TopicName, _Payload, []) ->
- ok;
-deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
- Matched = case IsWild of
- true -> emqx_topic:match(TopicName, TopicFilter);
- false -> TopicName =:= TopicFilter
- end,
- %?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p",
- % [Matched, CoapPid, TopicName, Payload, T]),
- Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
- deliver_to_coap(TopicName, Payload, T).
-
-%%--------------------------------------------------------------------
-%% Helper funcs
-
--compile({inline, [run_hooks/2, run_hooks/3]}).
-run_hooks(Name, Args) ->
- ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
-
-run_hooks(Name, Args, Acc) ->
- ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
-
-%%--------------------------------------------------------------------
-%% Info & Stats
-
-info(State) ->
- ChannInfo = chann_info(State),
- ChannInfo#{sockinfo => sockinfo(State)}.
-
-%% copies from emqx_connection:info/1
-sockinfo(#state{peername = Peername}) ->
- #{socktype => udp,
- peername => Peername,
- sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock?
- sockstate => running,
- active_n => 1
- }.
-
-%% copies from emqx_channel:info/1
-chann_info(State) ->
- #{conninfo => conninfo(State),
- conn_state => connected,
- clientinfo => clientinfo(State),
- session => maps:from_list(session_info(State)),
- will_msg => undefined
- }.
-
-conninfo(#state{peername = Peername,
- clientid = ClientId,
- connected_at = ConnectedAt}) ->
- #{socktype => udp,
- sockname => {{127, 0, 0, 1}, 5683},
- peername => Peername,
- peercert => nossl, %% TODO: dtls
- conn_mod => ?MODULE,
- proto_name => <<"CoAP">>,
- proto_ver => 1,
- clean_start => true,
- clientid => ClientId,
- username => undefined,
- conn_props => undefined,
- connected => true,
- connected_at => ConnectedAt,
- keepalive => 0,
- receive_maximum => 0,
- expiry_interval => 0
- }.
-
-%% copies from emqx_session:info/1
-session_info(#state{sub_topics = SubTopics, connected_at = ConnectedAt}) ->
- Subs = lists:foldl(
- fun({Topic, _}, Acc) ->
- Acc#{Topic => ?SUBOPTS}
- end, #{}, SubTopics),
- [{subscriptions, Subs},
- {upgrade_qos, false},
- {retry_interval, 0},
- {await_rel_timeout, 0},
- {created_at, ConnectedAt}
- ].
-
-%% The stats keys copied from emqx_connection:stats/1
-stats(#state{sub_topics = SubTopics}) ->
- SockStats = [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, {send_pend, 0}],
- ConnStats = emqx_pd:get_counters(?CONN_STATS),
- ChanStats = [{subscriptions_cnt, length(SubTopics)},
- {subscriptions_max, length(SubTopics)},
- {inflight_cnt, 0},
- {inflight_max, 0},
- {mqueue_len, 0},
- {mqueue_max, 0},
- {mqueue_dropped, 0},
- {next_pkt_id, 0},
- {awaiting_rel_cnt, 0},
- {awaiting_rel_max, 0}
- ],
- ProcStats = emqx_misc:proc_stats(),
- lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
-
-clientinfo(#state{peername = {PeerHost, _},
- clientid = ClientId,
- username = Username,
- password = Password}) ->
- #{zone => default,
- listener => mqtt_tcp, %% FIXME: this won't work
- protocol => coap,
- peerhost => PeerHost,
- sockport => 5683, %% FIXME:
- clientid => ClientId,
- username => Username,
- password => Password,
- peercert => nossl,
- is_bridge => false,
- is_superuser => false,
- mountpoint => undefined,
- ws_cookie => undefined
- }.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl
new file mode 100644
index 000000000..199ad0658
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl
@@ -0,0 +1,92 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_observe_res).
+
+%% API
+-export([ new/0, insert/3, remove/2
+ , res_changed/2, foreach/2]).
+-export_type([manager/0]).
+
+-define(MAX_SEQ_ID, 16777215).
+
+-type topic() :: binary().
+-type token() :: binary().
+-type seq_id() :: 0 .. ?MAX_SEQ_ID.
+-type res() :: #{ token := token()
+ , seq_id := seq_id()
+ }.
+
+-type manager() :: #{topic => res()}.
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+-spec new() -> manager().
+new() ->
+ #{}.
+
+-spec insert(manager(), topic(), token()) -> manager().
+insert(Manager, Topic, Token) ->
+ case maps:get(Topic, Manager, undefined) of
+ undefined ->
+ Manager#{Topic => new_res(Token)};
+ _ ->
+ Manager
+ end.
+
+-spec remove(manager(), topic()) -> manager().
+remove(Manager, Topic) ->
+ maps:remove(Topic, Manager).
+
+-spec res_changed(manager(), topic()) -> undefined | {token(), seq_id(), manager()}.
+res_changed(Manager, Topic) ->
+ case maps:get(Topic, Manager, undefined) of
+ undefined ->
+ undefined;
+ Res ->
+ #{token := Token,
+ seq_id := SeqId} = Res2 = res_changed(Res),
+ {Token, SeqId, Manager#{Topic := Res2}}
+ end.
+
+foreach(F, Manager) ->
+ maps:fold(fun(K, V, _) ->
+ F(K, V)
+ end,
+ ok,
+ Manager),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+-spec new_res(token()) -> res().
+new_res(Token) ->
+ #{token => Token,
+ seq_id => 0}.
+
+-spec res_changed(res()) -> res().
+res_changed(#{seq_id := SeqId} = Res) ->
+ NewSeqId = SeqId + 1,
+ NewSeqId2 =
+ case NewSeqId > ?MAX_SEQ_ID of
+ true ->
+ 1;
+ _ ->
+ NewSeqId
+ end,
+ Res#{seq_id := NewSeqId2}.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_pubsub_resource.erl b/apps/emqx_gateway/src/coap/emqx_coap_pubsub_resource.erl
deleted file mode 100644
index b7eb65f55..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_pubsub_resource.erl
+++ /dev/null
@@ -1,322 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_pubsub_resource).
-
-% -behaviour(coap_resource).
-
--include("src/coap/include/emqx_coap.hrl").
--include_lib("gen_coap/include/coap.hrl").
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("emqx/include/logger.hrl").
-
--logger_header("[CoAP-PS-RES]").
-
--export([ coap_discover/2
- , coap_get/5
- , coap_post/4
- , coap_put/4
- , coap_delete/3
- , coap_observe/5
- , coap_unobserve/1
- , handle_info/2
- , coap_ack/2
- ]).
-
--ifdef(TEST).
--export([topic/1]).
--endif.
-
--define(PS_PREFIX, [<<"ps">>]).
-
-%%--------------------------------------------------------------------
-%% Resource Callbacks
-%%--------------------------------------------------------------------
-coap_discover(_Prefix, _Args) ->
- [{absolute, [<<"ps">>], []}].
-
-coap_get(ChId, ?PS_PREFIX, TopicPath, Query, Content=#coap_content{format = Format}) when TopicPath =/= [] ->
- Topic = topic(TopicPath),
- ?LOG(debug, "coap_get() Topic=~p, Query=~p~n", [Topic, Query]),
- #coap_mqtt_auth{clientid = Clientid, username = Usr, password = Passwd} = get_auth(Query),
- case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
- {ok, Pid} ->
- put(mqtt_client_pid, Pid),
- case Format of
- <<"application/link-format">> ->
- Content;
- _Other ->
- %% READ the topic info
- read_last_publish_message(emqx_topic:wildcard(Topic), Topic, Content)
- end;
- {error, auth_failure} ->
- put(mqtt_client_pid, undefined),
- {error, unauthorized};
- {error, bad_request} ->
- put(mqtt_client_pid, undefined),
- {error, bad_request};
- {error, _Other} ->
- put(mqtt_client_pid, undefined),
- {error, internal_server_error}
- end;
-coap_get(ChId, Prefix, TopicPath, Query, _Content) ->
- ?LOG(error, "ignore bad get request ChId=~p, Prefix=~p, TopicPath=~p, Query=~p", [ChId, Prefix, TopicPath, Query]),
- {error, bad_request}.
-
-coap_post(_ChId, ?PS_PREFIX, TopicPath, #coap_content{format = Format, payload = Payload, max_age = MaxAge}) when TopicPath =/= [] ->
- Topic = topic(TopicPath),
- ?LOG(debug, "coap_post() Topic=~p, MaxAge=~p, Format=~p~n", [Topic, MaxAge, Format]),
- case Format of
- %% We treat ct of "application/link-format" as CREATE message
- <<"application/link-format">> ->
- handle_received_create(Topic, MaxAge, Payload);
- %% We treat ct of other values as PUBLISH message
- Other ->
- ?LOG(debug, "coap_post() receive payload format=~p, will process as PUBLISH~n", [Format]),
- handle_received_publish(Topic, MaxAge, Other, Payload)
- end;
-
-coap_post(_ChId, _Prefix, _TopicPath, _Content) ->
- {error, method_not_allowed}.
-
-coap_put(_ChId, ?PS_PREFIX, TopicPath, #coap_content{max_age = MaxAge, format = Format, payload = Payload}) when TopicPath =/= [] ->
- Topic = topic(TopicPath),
- ?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]),
- handle_received_publish(Topic, MaxAge, Format, Payload);
-
-coap_put(_ChId, Prefix, TopicPath, Content) ->
- ?LOG(error, "put has error, Prefix=~p, TopicPath=~p, Content=~p", [Prefix, TopicPath, Content]),
- {error, bad_request}.
-
-coap_delete(_ChId, ?PS_PREFIX, TopicPath) ->
- delete_topic_info(topic(TopicPath));
-
-coap_delete(_ChId, _Prefix, _TopicPath) ->
- {error, method_not_allowed}.
-
-coap_observe(ChId, ?PS_PREFIX, TopicPath, Ack, Content) when TopicPath =/= [] ->
- Topic = topic(TopicPath),
- ?LOG(debug, "observe Topic=~p, Ack=~p,Content=~p", [Topic, Ack, Content]),
- Pid = get(mqtt_client_pid),
- case emqx_coap_mqtt_adapter:subscribe(Pid, Topic) of
- ok ->
- Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of
- true -> nocontent;
- false-> content
- end,
- {ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content};
- {error, Code} ->
- {error, Code}
- end;
-
-coap_observe(ChId, Prefix, TopicPath, Ack, _Content) ->
- ?LOG(error, "unknown observe request ChId=~p, Prefix=~p, TopicPath=~p, Ack=~p", [ChId, Prefix, TopicPath, Ack]),
- {error, bad_request}.
-
-coap_unobserve({state, _ChId, ?PS_PREFIX, TopicPath}) when TopicPath =/= [] ->
- Topic = topic(TopicPath),
- ?LOG(debug, "unobserve ~p", [Topic]),
- Pid = get(mqtt_client_pid),
- emqx_coap_mqtt_adapter:unsubscribe(Pid, Topic),
- ok;
-coap_unobserve({state, ChId, Prefix, TopicPath}) ->
- ?LOG(error, "ignore unknown unobserve request ChId=~p, Prefix=~p, TopicPath=~p", [ChId, Prefix, TopicPath]),
- ok.
-
-handle_info({dispatch, Topic, Payload}, State) ->
- ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
- {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
- ?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
- {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
-handle_info(Message, State) ->
- ?LOG(error, "Unknown Message ~p", [Message]),
- {noreply, State}.
-
-coap_ack(_Ref, State) -> {ok, State}.
-
-
-%%--------------------------------------------------------------------
-%% Internal Functions
-%%--------------------------------------------------------------------
-get_auth(Query) ->
- get_auth(Query, #coap_mqtt_auth{}).
-
-get_auth([], Auth=#coap_mqtt_auth{}) ->
- Auth;
-get_auth([<<$c, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
- get_auth(T, Auth#coap_mqtt_auth{clientid = Rest});
-get_auth([<<$u, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
- get_auth(T, Auth#coap_mqtt_auth{username = Rest});
-get_auth([<<$p, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
- get_auth(T, Auth#coap_mqtt_auth{password = Rest});
-get_auth([Param|T], Auth=#coap_mqtt_auth{}) ->
- ?LOG(error, "ignore unknown parameter ~p", [Param]),
- get_auth(T, Auth).
-
-add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> ->
- case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
- [{_, StoredMaxAge, StoredCT, _, _}] ->
- ?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
- %% check whether the ct value stored matches the ct option in this POST message
- case Format =:= StoredCT of
- true ->
- {ok, Ret} =
- case StoredMaxAge =:= MaxAge of
- true ->
- emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload);
- false ->
- emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload)
- end,
- {changed, Ret};
- false ->
- ?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]),
- {changed, false}
- end;
- [] ->
- ?LOG(debug, "publish topic=~p will be created", [Topic]),
- {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload),
- {created, Ret}
- end;
-
-add_topic_info(create, Topic, MaxAge, Format, _Payload) when is_binary(Topic), Topic =/= <<>> ->
- case emqx_coap_pubsub_topics:is_topic_existed(Topic) of
- true ->
- %% Whether we should support CREATE to an existed topic is TBD!!
- ?LOG(debug, "create topic=~p already exists, need reset the topic info", [Topic]),
- {ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Format, <<>>);
- false ->
- ?LOG(debug, "create topic=~p will be created", [Topic]),
- {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, <<>>)
- end,
- {created, Ret};
-
-add_topic_info(_, Topic, _MaxAge, _Format, _Payload) ->
- ?LOG(debug, "create topic=~p info failed", [Topic]),
- {badarg, false}.
-
-concatenate_location_path(List = [TopicPart1, TopicPart2, TopicPart3]) when is_binary(TopicPart1), is_binary(TopicPart2), is_binary(TopicPart3) ->
- list_to_binary(lists:foldl( fun (Element, AccIn) when Element =/= <<>> ->
- AccIn ++ "/" ++ binary_to_list(Element);
- (_Element, AccIn) ->
- AccIn
- end, [], List)).
-
-format_string_to_int(<<"application/octet-stream">>) ->
- <<"42">>;
-format_string_to_int(<<"application/exi">>) ->
- <<"47">>;
-format_string_to_int(<<"application/json">>) ->
- <<"50">>.
-
-handle_received_publish(Topic, MaxAge, Format, Payload) ->
- case add_topic_info(publish, Topic, MaxAge, format_string_to_int(Format), Payload) of
- {Ret, true} ->
- Pid = get(mqtt_client_pid),
- case emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload) of
- ok ->
- {ok, Ret, case Ret of
- changed -> #coap_content{};
- created ->
- #coap_content{location_path = [
- concatenate_location_path([<<"ps">>, Topic, <<>>])]}
- end};
- {error, Code} ->
- {error, Code}
- end;
- {_, false} ->
- ?LOG(debug, "add_topic_info failed, will return bad_request", []),
- {error, bad_request}
- end.
-
-handle_received_create(TopicPrefix, MaxAge, Payload) ->
- case core_link:decode(Payload) of
- [{rootless, [Topic], [{ct, CT}]}] when is_binary(Topic), Topic =/= <<>> ->
- TrueTopic = emqx_http_lib:uri_decode(Topic),
- ?LOG(debug, "decoded link-format payload, the Topic=~p, CT=~p~n", [TrueTopic, CT]),
- LocPath = concatenate_location_path([<<"ps">>, TopicPrefix, TrueTopic]),
- FullTopic = binary:part(LocPath, 4, byte_size(LocPath)-4),
- ?LOG(debug, "the location path is ~p, the full topic is ~p~n", [LocPath, FullTopic]),
- case add_topic_info(create, FullTopic, MaxAge, CT, <<>>) of
- {_, true} ->
- ?LOG(debug, "create topic info successfully, will return LocPath=~p", [LocPath]),
- {ok, created, #coap_content{location_path = [LocPath]}};
- {_, false} ->
- ?LOG(debug, "create topic info failed, will return bad_request", []),
- {error, bad_request}
- end;
- Other ->
- ?LOG(debug, "post with bad payload of link-format ~p, will return bad_request", [Other]),
- {error, bad_request}
- end.
-
-%% When topic is timeout, server should return nocontent here,
-%% but gen_coap only receive return value of #coap_content from coap_get, so temporarily we can't give the Code 2.07 {ok, nocontent} out.TBC!!!
-return_resource(Topic, Payload, MaxAge, TimeStamp, Content) ->
- TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000),
- case TimeElapsed < MaxAge of
- true ->
- LeftTime = (MaxAge - TimeElapsed),
- ?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]),
- Content#coap_content{max_age = LeftTime, payload = Payload};
- false ->
- ?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]),
- #coap_content{}
- end.
-
-read_last_publish_message(false, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
- ?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
- case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
- [] ->
- {error, not_found};
- [{_, MaxAge, CT, Payload, TimeStamp}] ->
- case CT =:= format_string_to_int(QueryFormat) of
- true ->
- return_resource(Topic, Payload, MaxAge, TimeStamp, Content);
- false ->
- ?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]),
- {error, bad_request}
- end
- end;
-
-read_last_publish_message(false, Topic, Content) ->
- case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
- [] ->
- {error, not_found};
- [{_, MaxAge, _, Payload, TimeStamp}] ->
- return_resource(Topic, Payload, MaxAge, TimeStamp, Content)
- end;
-
-read_last_publish_message(true, Topic, _Content) ->
- ?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]),
- {error, bad_request}.
-
-delete_topic_info(Topic) ->
- case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
- [] ->
- {error, not_found};
- [{_, _, _, _, _}] ->
- emqx_coap_pubsub_topics:delete_sub_topics(Topic)
- end.
-
-topic(Topic) when is_binary(Topic) -> Topic;
-topic([]) -> <<>>;
-topic([Path | TopicPath]) ->
- case topic(TopicPath) of
- <<>> -> Path;
- RemTopic ->
- <>
- end.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_registry.erl b/apps/emqx_gateway/src/coap/emqx_coap_registry.erl
deleted file mode 100644
index 18faa6673..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_registry.erl
+++ /dev/null
@@ -1,154 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_registry).
-
--author("Feng Lee ").
-
--include("src/coap/include/emqx_coap.hrl").
--include_lib("emqx/include/logger.hrl").
-
--logger_header("[CoAP-Registry]").
-
--behaviour(gen_server).
-
-%% API.
--export([ start_link/0
- , register_name/2
- , unregister_name/1
- , whereis_name/1
- , send/2
- , stop/0
- ]).
-
-%% gen_server.
--export([ init/1
- , handle_call/3
- , handle_cast/2
- , handle_info/2
- , terminate/2
- , code_change/3
- ]).
-
--record(state, {}).
-
--define(RESPONSE_TAB, coap_response_process).
--define(RESPONSE_REF_TAB, coap_response_process_ref).
-
-%% ------------------------------------------------------------------
-%% API Function Definitions
-%% ------------------------------------------------------------------
-
-
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-register_name(Name, Pid) ->
- gen_server:call(?MODULE, {register_name, Name, Pid}).
-
-unregister_name(Name) ->
- gen_server:call(?MODULE, {unregister_name, Name}).
-
-whereis_name(Name) ->
- case ets:lookup(?RESPONSE_TAB, Name) of
- [] -> undefined;
- [{Name, Pid, _MRef}] -> Pid
- end.
-
-send(Name, Msg) ->
- case whereis_name(Name) of
- undefined ->
- exit({badarg, {Name, Msg}});
- Pid when is_pid(Pid) ->
- Pid ! Msg,
- Pid
- end.
-
-stop() ->
- gen_server:stop(?MODULE).
-
-
-%% ------------------------------------------------------------------
-%% gen_server Function Definitions
-%% ------------------------------------------------------------------
-
-init([]) ->
- _ = ets:new(?RESPONSE_TAB, [set, named_table, protected]),
- _ = ets:new(?RESPONSE_REF_TAB, [set, named_table, protected]),
- {ok, #state{}}.
-
-handle_call({register_name, Name, Pid}, _From, State) ->
- case ets:member(?RESPONSE_TAB, Name) of
- false ->
- MRef = monitor_client(Pid),
- ets:insert(?RESPONSE_TAB, {Name, Pid, MRef}),
- ets:insert(?RESPONSE_REF_TAB, {MRef, Name, Pid}),
- {reply, yes, State};
- true -> {reply, no, State}
- end;
-
-handle_call({unregister_name, Name}, _From, State) ->
- case ets:lookup(?RESPONSE_TAB, Name) of
- [] ->
- ok;
- [{Name, _Pid, MRef}] ->
- erase_monitor(MRef),
- ets:delete(?RESPONSE_TAB, Name),
- ets:delete(?RESPONSE_REF_TAB, MRef)
- end,
- {reply, ok, State};
-
-handle_call(_Request, _From, State) ->
- {reply, ignored, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-
-handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
- case ets:lookup(?RESPONSE_REF_TAB, MRef) of
- [{MRef, Name, _Pid}] ->
- ets:delete(?RESPONSE_TAB, Name),
- ets:delete(?RESPONSE_REF_TAB, MRef),
- erase_monitor(MRef);
- [] ->
- ?LOG(error, "MRef of client ~p not found", [DownPid])
- end,
- {noreply, State};
-
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ets:delete(?RESPONSE_TAB),
- ets:delete(?RESPONSE_REF_TAB),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-monitor_client(Pid) ->
- erlang:monitor(process, Pid).
-
-erase_monitor(MRef) ->
- catch erlang:demonitor(MRef, [flush]).
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_resource.erl b/apps/emqx_gateway/src/coap/emqx_coap_resource.erl
index 26bcd6fcd..93fe82aba 100644
--- a/apps/emqx_gateway/src/coap/emqx_coap_resource.erl
+++ b/apps/emqx_gateway/src/coap/emqx_coap_resource.erl
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -16,122 +16,22 @@
-module(emqx_coap_resource).
-% -behaviour(coap_resource).
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
--include("src/coap/include/emqx_coap.hrl").
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/logger.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("gen_coap/include/coap.hrl").
+-type context() :: any().
+-type topic() :: binary().
+-type token() :: token().
--logger_header("[CoAP-RES]").
+-type register() :: {topic(), token()}
+ | topic()
+ | undefined.
--export([ coap_discover/2
- , coap_get/5
- , coap_post/4
- , coap_put/4
- , coap_delete/3
- , coap_observe/5
- , coap_unobserve/1
- , handle_info/2
- , coap_ack/2
- ]).
-
--ifdef(TEST).
--export([topic/1]).
--endif.
-
--define(MQTT_PREFIX, [<<"mqtt">>]).
-
-% resource operations
-coap_discover(_Prefix, _Args) ->
- [{absolute, [<<"mqtt">>], []}].
-
-coap_get(ChId, ?MQTT_PREFIX, Path, Query, _Content) ->
- ?LOG(debug, "coap_get() Path=~p, Query=~p~n", [Path, Query]),
- #coap_mqtt_auth{clientid = Clientid, username = Usr, password = Passwd} = get_auth(Query),
- case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
- {ok, Pid} ->
- put(mqtt_client_pid, Pid),
- #coap_content{};
- {error, auth_failure} ->
- put(mqtt_client_pid, undefined),
- {error, unauthorized};
- {error, bad_request} ->
- put(mqtt_client_pid, undefined),
- {error, bad_request};
- {error, _Other} ->
- put(mqtt_client_pid, undefined),
- {error, internal_server_error}
- end;
-coap_get(ChId, Prefix, Path, Query, _Content) ->
- ?LOG(error, "ignore bad get request ChId=~p, Prefix=~p, Path=~p, Query=~p", [ChId, Prefix, Path, Query]),
- {error, bad_request}.
-
-coap_post(_ChId, _Prefix, _Topic, _Content) ->
- {error, method_not_allowed}.
-
-coap_put(_ChId, ?MQTT_PREFIX, Topic, #coap_content{payload = Payload}) when Topic =/= [] ->
- ?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]),
- Pid = get(mqtt_client_pid),
- emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload);
-coap_put(_ChId, Prefix, Topic, Content) ->
- ?LOG(error, "put has error, Prefix=~p, Topic=~p, Content=~p", [Prefix, Topic, Content]),
- {error, bad_request}.
-
-coap_delete(_ChId, _Prefix, _Topic) ->
- {error, method_not_allowed}.
-
-coap_observe(ChId, ?MQTT_PREFIX, Topic, Ack, Content) when Topic =/= [] ->
- TrueTopic = topic(Topic),
- ?LOG(debug, "observe Topic=~p, Ack=~p", [TrueTopic, Ack]),
- Pid = get(mqtt_client_pid),
- case emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic) of
- ok -> {ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content};
- {error, Code} -> {error, Code}
- end;
-coap_observe(ChId, Prefix, Topic, Ack, _Content) ->
- ?LOG(error, "unknown observe request ChId=~p, Prefix=~p, Topic=~p, Ack=~p", [ChId, Prefix, Topic, Ack]),
- {error, bad_request}.
-
-coap_unobserve({state, _ChId, ?MQTT_PREFIX, Topic}) when Topic =/= [] ->
- ?LOG(debug, "unobserve ~p", [Topic]),
- Pid = get(mqtt_client_pid),
- emqx_coap_mqtt_adapter:unsubscribe(Pid, topic(Topic)),
- ok;
-coap_unobserve({state, ChId, Prefix, Topic}) ->
- ?LOG(error, "ignore unknown unobserve request ChId=~p, Prefix=~p, Topic=~p", [ChId, Prefix, Topic]),
- ok.
-
-handle_info({dispatch, Topic, Payload}, State) ->
- ?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
- {notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
-handle_info(Message, State) ->
- emqx_coap_mqtt_adapter:handle_info(Message, State).
-
-coap_ack(_Ref, State) -> {ok, State}.
-
-get_auth(Query) ->
- get_auth(Query, #coap_mqtt_auth{}).
-
-get_auth([], Auth=#coap_mqtt_auth{}) ->
- Auth;
-get_auth([<<$c, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
- get_auth(T, Auth#coap_mqtt_auth{clientid = Rest});
-get_auth([<<$u, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
- get_auth(T, Auth#coap_mqtt_auth{username = Rest});
-get_auth([<<$p, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
- get_auth(T, Auth#coap_mqtt_auth{password = Rest});
-get_auth([Param|T], Auth=#coap_mqtt_auth{}) ->
- ?LOG(error, "ignore unknown parameter ~p", [Param]),
- get_auth(T, Auth).
-
-topic(Topic) when is_binary(Topic) -> Topic;
-topic([]) -> <<>>;
-topic([Path | TopicPath]) ->
- case topic(TopicPath) of
- <<>> -> Path;
- RemTopic ->
- <>
- end.
+-type result() :: emqx_coap_message()
+ | {has_sub, emqx_coap_message(), register()}.
+-callback init(hocon:confg()) -> context().
+-callback stop(context()) -> ok.
+-callback get(emqx_coap_message(), hocon:config()) -> result().
+-callback put(emqx_coap_message(), hocon:config()) -> result().
+-callback post(emqx_coap_message(), hocon:config()) -> result().
+-callback delete(emqx_coap_message(), hocon:config()) -> result().
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_server.erl b/apps/emqx_gateway/src/coap/emqx_coap_server.erl
deleted file mode 100644
index 73625f65a..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_server.erl
+++ /dev/null
@@ -1,106 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_server).
-
--include("src/coap/include/emqx_coap.hrl").
-
--export([ start/1
- , stop/1
- ]).
-
--export([ start_listener/1
- , start_listener/3
- , stop_listener/1
- , stop_listener/2
- ]).
-
-%%--------------------------------------------------------------------
-%% APIs
-%%--------------------------------------------------------------------
-
-start(Envs) ->
- {ok, _} = application:ensure_all_started(gen_coap),
- start_listeners(Envs).
-
-stop(Envs) ->
- stop_listeners(Envs).
-
-%%--------------------------------------------------------------------
-%% Internal funcs
-%%--------------------------------------------------------------------
-
-start_listeners(Envs) ->
- lists:foreach(fun start_listener/1, listeners_confs(Envs)).
-
-stop_listeners(Envs) ->
- lists:foreach(fun stop_listener/1, listeners_confs(Envs)).
-
-start_listener({Proto, ListenOn, Opts}) ->
- case start_listener(Proto, ListenOn, Opts) of
- {ok, _Pid} ->
- io:format("Start coap:~s listener on ~s successfully.~n",
- [Proto, format(ListenOn)]);
- {error, Reason} ->
- io:format(standard_error, "Failed to start coap:~s listener on ~s: ~0p~n",
- [Proto, format(ListenOn), Reason]),
- error(Reason)
- end.
-
-start_listener(udp, ListenOn, Opts) ->
- coap_server:start_udp('coap:udp', ListenOn, Opts);
-start_listener(dtls, ListenOn, Opts) ->
- coap_server:start_dtls('coap:dtls', ListenOn, Opts).
-
-stop_listener({Proto, ListenOn, _Opts}) ->
- Ret = stop_listener(Proto, ListenOn),
- case Ret of
- ok -> io:format("Stop coap:~s listener on ~s successfully.~n",
- [Proto, format(ListenOn)]);
- {error, Reason} ->
- io:format(standard_error, "Failed to stop coap:~s listener on ~s: ~0p~n.",
- [Proto, format(ListenOn), Reason])
- end,
- Ret.
-
-stop_listener(udp, ListenOn) ->
- coap_server:stop_udp('coap:udp', ListenOn);
-stop_listener(dtls, ListenOn) ->
- coap_server:stop_dtls('coap:dtls', ListenOn).
-
-%% XXX: It is a temporary func to convert conf format for esockd
-listeners_confs(Envs) ->
- listeners_confs(udp, Envs) ++ listeners_confs(dtls, Envs).
-
-listeners_confs(udp, Envs) ->
- Udps = proplists:get_value(bind_udp, Envs, []),
- [{udp, Port, [{udp_options, InetOpts}]} || {Port, InetOpts} <- Udps];
-
-listeners_confs(dtls, Envs) ->
- case proplists:get_value(dtls_opts, Envs, []) of
- [] -> [];
- DtlsOpts ->
- BindDtls = proplists:get_value(bind_dtls, Envs, []),
- [{dtls, Port, [{dtls_options, InetOpts ++ DtlsOpts}]} || {Port, InetOpts} <- BindDtls]
- end.
-
-format(Port) when is_integer(Port) ->
- io_lib:format("0.0.0.0:~w", [Port]);
-format({Addr, Port}) when is_list(Addr) ->
- io_lib:format("~s:~w", [Addr, Port]);
-format({Addr, Port}) when is_tuple(Addr) ->
- io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
-
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl
new file mode 100644
index 000000000..dac4ac924
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl
@@ -0,0 +1,195 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_coap_session).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+%% API
+-export([new/0, transfer_result/3]).
+
+-export([ received/3
+ , reply/4
+ , reply/5
+ , ack/3
+ , piggyback/4
+ , deliver/3
+ , timeout/3]).
+
+-export_type([session/0]).
+
+-record(session, { transport_manager :: emqx_coap_tm:manager()
+ , observe_manager :: emqx_coap_observe_res:manager()
+ , next_msg_id :: coap_message_id()
+ }).
+
+-type session() :: #session{}.
+
+%%%-------------------------------------------------------------------
+%%% API
+%%%-------------------------------------------------------------------
+-spec new() -> session().
+new() ->
+ _ = emqx_misc:rand_seed(),
+ #session{ transport_manager = emqx_coap_tm:new()
+ , observe_manager = emqx_coap_observe_res:new()
+ , next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}.
+
+%%%-------------------------------------------------------------------
+%%% Process Message
+%%%-------------------------------------------------------------------
+received(Session, Cfg, #coap_message{type = ack} = Msg) ->
+ handle_response(Session, Cfg, Msg);
+
+received(Session, Cfg, #coap_message{type = reset} = Msg) ->
+ handle_response(Session, Cfg, Msg);
+
+received(Session, Cfg, #coap_message{method = Method} = Msg) when is_atom(Method) ->
+ handle_request(Session, Cfg, Msg);
+
+received(Session, Cfg, Msg) ->
+ handle_response(Session, Cfg, Msg).
+
+reply(Session, Cfg, Req, Method) ->
+ reply(Session, Cfg, Req, Method, <<>>).
+
+reply(Session, Cfg, Req, Method, Payload) ->
+ Response = emqx_coap_message:response(Method, Payload, Req),
+ handle_out(Session, Cfg, Response).
+
+ack(Session, Cfg, Req) ->
+ piggyback(Session, Cfg, Req, <<>>).
+
+piggyback(Session, Cfg, Req, Payload) ->
+ Response = emqx_coap_message:ack(Req),
+ Response2 = emqx_coap_message:set_payload(Payload, Response),
+ handle_out(Session, Cfg, Response2).
+
+deliver(Session, Cfg, Delivers) ->
+ Fun = fun({_, Topic, Message},
+ #{out := OutAcc,
+ session := #session{observe_manager = OM,
+ next_msg_id = MsgId} = SAcc} = Acc) ->
+ case emqx_coap_observe_res:res_changed(OM, Topic) of
+ undefined ->
+ Acc;
+ {Token, SeqId, OM2} ->
+ Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg),
+ SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId),
+ observe_manager = OM2},
+ #{out := Out} = Result = call_transport_manager(SAcc2, Cfg, Msg, handle_out),
+ Result#{out := [Out | OutAcc]}
+ end
+ end,
+ lists:foldl(Fun,
+ #{out => [],
+ session => Session},
+ Delivers).
+
+timeout(Session, Cfg, Timer) ->
+ call_transport_manager(Session, Cfg, Timer, ?FUNCTION_NAME).
+
+transfer_result(Result, From, Value) ->
+ ?TRANSFER_RESULT(Result, [out, subscribe], From, Value).
+
+%%%-------------------------------------------------------------------
+%%% Internal functions
+%%%-------------------------------------------------------------------
+handle_request(Session, Cfg, Msg) ->
+ call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME).
+
+handle_response(Session, Cfg, Msg) ->
+ call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME).
+
+handle_out(Session, Cfg, Msg) ->
+ call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME).
+
+call_transport_manager(#session{transport_manager = TM} = Session,
+ Cfg,
+ Msg,
+ Fun) ->
+ try
+ Result = emqx_coap_tm:Fun(Msg, TM, Cfg),
+ {ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2,
+ fun process_subscribe/2],
+ Result,
+ Session),
+ emqx_coap_channel:transfer_result(Result, session, Session2)
+ catch Type:Reason:Stack ->
+ ?ERROR("process transmission with, message:~p failed~n
+Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]),
+ #{out => emqx_coap_message:response({error, internal_server_error}, Msg)}
+ end.
+
+process_tm(#{tm := TM}, Session) ->
+ {ok, Session#session{transport_manager = TM}};
+process_tm(_, Session) ->
+ {ok, Session}.
+
+process_subscribe(#{subscribe := Sub}, #session{observe_manager = OM} = Session) ->
+ case Sub of
+ undefined ->
+ {ok, Session};
+ {Topic, Token} ->
+ OM2 = emqx_coap_observe_res:insert(OM, Topic, Token),
+ {ok, Session#session{observe_manager = OM2}};
+ Topic ->
+ OM2 = emqx_coap_observe_res:remove(OM, Topic),
+ {ok, Session#session{observe_manager = OM2}}
+ end;
+process_subscribe(_, Session) ->
+ {ok, Session}.
+
+mqtt_to_coap(MQTT, MsgId, Token, SeqId, Cfg) ->
+ #message{payload = Payload} = MQTT,
+ #coap_message{type = get_notify_type(MQTT, Cfg),
+ method = {ok, content},
+ id = MsgId,
+ token = Token,
+ payload = Payload,
+ options = #{observe => SeqId,
+ max_age => get_max_age(MQTT)}}.
+
+get_notify_type(#message{qos = Qos}, #{notify_type := Type}) ->
+ case Type of
+ qos ->
+ case Qos of
+ ?QOS_0 ->
+ non;
+ _ ->
+ con
+ end;
+ Other ->
+ Other
+ end.
+
+-spec get_max_age(#message{}) -> max_age().
+get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
+ ?MAXIMUM_MAX_AGE;
+get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
+ timestamp = Ts}) ->
+ Now = erlang:system_time(millisecond),
+ Diff = (Now - Ts + Interval * 1000) / 1000,
+ erlang:max(1, erlang:floor(Diff));
+get_max_age(_) ->
+ ?DEFAULT_MAX_AGE.
+
+next_msg_id(MsgId) when MsgId >= ?MAX_MESSAGE_ID ->
+ 1;
+next_msg_id(MsgId) ->
+ MsgId + 1.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_sup.erl b/apps/emqx_gateway/src/coap/emqx_coap_sup.erl
deleted file mode 100644
index 94e9a1c77..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_sup.erl
+++ /dev/null
@@ -1,42 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_sup).
-
--behaviour(supervisor).
-
--export([ start_link/0
- , init/1
- ]).
-
-start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-init(_Args) ->
- Registry = #{id => emqx_coap_registry,
- start => {emqx_coap_registry, start_link, []},
- restart => permanent,
- shutdown => 5000,
- type => worker,
- modules => [emqx_coap_registry]},
- PsTopics = #{id => emqx_coap_pubsub_topics,
- start => {emqx_coap_pubsub_topics, start_link, []},
- restart => permanent,
- shutdown => 5000,
- type => worker,
- modules => [emqx_coap_pubsub_topics]},
- {ok, {{one_for_all, 10, 3600}, [Registry, PsTopics]}}.
-
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_timer.erl b/apps/emqx_gateway/src/coap/emqx_coap_timer.erl
deleted file mode 100644
index 7e090a74d..000000000
--- a/apps/emqx_gateway/src/coap/emqx_coap_timer.erl
+++ /dev/null
@@ -1,59 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_coap_timer).
-
--include("src/coap/include/emqx_coap.hrl").
-
--export([ cancel_timer/1
- , start_timer/2
- , restart_timer/1
- , kick_timer/1
- , is_timeout/1
- , get_timer_length/1
- ]).
-
--record(timer_state, {interval, kickme, tref, message}).
-
--define(LOG(Level, Format, Args),
- emqx_logger:Level("CoAP-Timer: " ++ Format, Args)).
-
-cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) ->
- catch erlang:cancel_timer(TRef),
- ok;
-cancel_timer(_) ->
- ok.
-
-kick_timer(State=#timer_state{kickme = false}) ->
- State#timer_state{kickme = true};
-kick_timer(State=#timer_state{kickme = true}) ->
- State.
-
-start_timer(Sec, Msg) ->
- ?LOG(debug, "emqx_coap_timer:start_timer ~p", [Sec]),
- TRef = erlang:send_after(timer:seconds(Sec), self(), Msg),
- #timer_state{interval = Sec, kickme = false, tref = TRef, message = Msg}.
-
-restart_timer(State=#timer_state{interval = Sec, message = Msg}) ->
- ?LOG(debug, "emqx_coap_timer:restart_timer ~p", [Sec]),
- TRef = erlang:send_after(timer:seconds(Sec), self(), Msg),
- State#timer_state{kickme = false, tref = TRef}.
-
-is_timeout(#timer_state{kickme = Bool}) ->
- not Bool.
-
-get_timer_length(#timer_state{interval = Interval}) ->
- Interval.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl
new file mode 100644
index 000000000..677292529
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl
@@ -0,0 +1,196 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% transport manager
+-module(emqx_coap_tm).
+
+-export([ new/0
+ , handle_request/3
+ , handle_response/3
+ , handle_out/3
+ , timeout/3]).
+
+-export_type([manager/0, event_result/2]).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+-type direction() :: in | out.
+-type transport_id() :: {direction(), non_neg_integer()}.
+
+-record(transport, { id :: transport_id()
+ , state :: atom()
+ , timers :: maps:map()
+ , data :: any()}).
+-type transport() :: #transport{}.
+
+-type message_id() :: 0 .. ?MAX_MESSAGE_ID.
+
+-type manager() :: #{message_id() => transport()}.
+
+-type ttimeout() :: {state_timeout, pos_integer(), any()}
+ | {stop_timeout, pos_integer()}.
+
+-type topic() :: binary().
+-type token() :: binary().
+-type sub_register() :: {topic(), token()} | topic().
+
+-type event_result(State, Data) ::
+ #{next => State,
+ outgoing => emqx_coap_message(),
+ timeouts => list(ttimeout()),
+ has_sub => undefined | sub_register(),
+ data => Data}.
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+new() ->
+ #{}.
+
+handle_request(#coap_message{id = MsgId} = Msg, TM, Cfg) ->
+ Id = {in, MsgId},
+ case maps:get(Id, TM, undefined) of
+ undefined ->
+ Data = emqx_coap_transport:new(),
+ Transport = new_transport(Id, Data),
+ process_event(in, Msg, TM, Transport, Cfg);
+ TP ->
+ process_event(in, Msg, TM, TP, Cfg)
+ end.
+
+handle_response(#coap_message{type = Type, id = MsgId} = Msg, TM, Cfg) ->
+ Id = {out, MsgId},
+ case maps:get(Id, TM, undefined) of
+ undefined ->
+ case Type of
+ reset ->
+ ?EMPTY_RESULT;
+ _ ->
+ #{out => #coap_message{type = reset,
+ id = MsgId}}
+ end;
+ TP ->
+ process_event(in, Msg, TM, TP, Cfg)
+ end.
+
+handle_out(#coap_message{id = MsgId} = Msg, TM, Cfg) ->
+ Id = {out, MsgId},
+ case maps:get(Id, TM, undefined) of
+ undefined ->
+ Data = emqx_coap_transport:new(),
+ Transport = new_transport(Id, Data),
+ process_event(out, Msg, TM, Transport, Cfg);
+ _ ->
+ ?WARN("Repeat sending message with id:~p~n", [Id]),
+ ?EMPTY_RESULT
+ end.
+
+timeout({Id, Type, Msg}, TM, Cfg) ->
+ case maps:get(Id, TM, undefined) of
+ undefined ->
+ ?EMPTY_RESULT;
+ #transport{timers = Timers} = TP ->
+ %% maybe timer has been canceled
+ case maps:is_key(Type, Timers) of
+ true ->
+ process_event(Type, Msg, TM, TP, Cfg);
+ _ ->
+ ?EMPTY_RESULT
+ end
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc
+%% @spec
+%% @end
+%%--------------------------------------------------------------------
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+new_transport(Id, Data) ->
+ #transport{id = Id,
+ state = idle,
+ timers = #{},
+ data = Data}.
+
+process_event(stop_timeout,
+ _,
+ TM,
+ #transport{id = Id,
+ timers = Timers},
+ _) ->
+ lists:foreach(fun({_, Ref}) ->
+ emqx_misc:cancel_timer(Ref)
+ end,
+ maps:to_list(Timers)),
+ #{tm => maps:remove(Id, TM)};
+
+process_event(Event,
+ Msg,
+ TM,
+ #transport{id = Id,
+ state = State,
+ data = Data} = TP,
+ Cfg) ->
+ Result = emqx_coap_transport:State(Event, Msg, Data, Cfg),
+ {ok, _, TP2} = emqx_misc:pipeline([fun process_state_change/2,
+ fun process_data_change/2,
+ fun process_timeouts/2],
+ Result,
+ TP),
+ TM2 = TM#{Id => TP2},
+ emqx_coap_session:transfer_result(Result, tm, TM2).
+
+process_state_change(#{next := Next}, TP) ->
+ {ok, cancel_state_timer(TP#transport{state = Next})};
+process_state_change(_, TP) ->
+ {ok, TP}.
+
+cancel_state_timer(#transport{timers = Timers} = TP) ->
+ case maps:get(state_timer, Timers, undefined) of
+ undefined ->
+ TP;
+ Ref ->
+ _ = emqx_misc:cancel_timer(Ref),
+ TP#transport{timers = maps:remove(state_timer, Timers)}
+ end.
+
+process_data_change(#{data := Data}, TP) ->
+ {ok, TP#transport{data = Data}};
+process_data_change(_, TP) ->
+ {ok, TP}.
+
+process_timeouts(#{timeouts := []}, TP) ->
+ {ok, TP};
+process_timeouts(#{timeouts := Timeouts},
+ #transport{id = Id, timers = Timers} = TP) ->
+ NewTimers = lists:foldl(fun({state_timeout, _, _} = Timer, Acc) ->
+ process_timer(Id, Timer, Acc);
+ ({stop_timeout, I}, Acc) ->
+ process_timer(Id, {stop_timeout, I, stop}, Acc)
+ end,
+ Timers,
+ Timeouts),
+ {ok, TP#transport{timers = NewTimers}};
+
+process_timeouts(_, TP) ->
+ {ok, TP}.
+
+process_timer(Id, {Type, Interval, Msg}, Timers) ->
+ Ref = emqx_misc:start_timer(Interval, {transport, {Id, Type, Msg}}),
+ Timers#{Type => Ref}.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl
new file mode 100644
index 000000000..7363b6254
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl
@@ -0,0 +1,133 @@
+-module(emqx_coap_transport).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+-define(ACK_TIMEOUT, 2000).
+-define(ACK_RANDOM_FACTOR, 1000).
+-define(MAX_RETRANSMIT, 4).
+-define(EXCHANGE_LIFETIME, 247000).
+-define(NON_LIFETIME, 145000).
+
+-record(data, { cache :: undefined | emqx_coap_message()
+ , retry_interval :: non_neg_integer()
+ , retry_count :: non_neg_integer()
+ }).
+
+-type data() :: #data{}.
+
+-export([ new/0, idle/4, maybe_reset/4
+ , maybe_resend/4, wait_ack/4, until_stop/4]).
+
+-spec new() -> data().
+new() ->
+ #data{cache = undefined,
+ retry_interval = 0,
+ retry_count = 0}.
+
+idle(in,
+ #coap_message{type = non, id = MsgId, method = Method} = Msg,
+ _,
+ #{resource := Resource} = Cfg) ->
+ Ret = #{next => until_stop,
+ timeouts => [{stop_timeout, ?NON_LIFETIME}]},
+ case Method of
+ undefined ->
+ Ret#{out => #coap_message{type = reset, id = MsgId}};
+ _ ->
+ case erlang:apply(Resource, Method, [Msg, Cfg]) of
+ #coap_message{} = Result ->
+ Ret#{out => Result};
+ {has_sub, Result, Sub} ->
+ Ret#{out => Result, subscribe => Sub};
+ error ->
+ Ret#{out =>
+ emqx_coap_message:response({error, internal_server_error}, Msg)}
+ end
+ end;
+
+idle(in,
+ #coap_message{id = MsgId,
+ type = con,
+ method = Method} = Msg,
+ Data,
+ #{resource := Resource} = Cfg) ->
+ Ret = #{next => maybe_resend,
+ timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]},
+ case Method of
+ undefined ->
+ ResetMsg = #coap_message{type = reset, id = MsgId},
+ Ret#{data => Data#data{cache = ResetMsg},
+ out => ResetMsg};
+ _ ->
+ {RetMsg, SubInfo} =
+ case erlang:apply(Resource, Method, [Msg, Cfg]) of
+ #coap_message{} = Result ->
+ {Result, undefined};
+ {has_sub, Result, Sub} ->
+ {Result, Sub};
+ error ->
+ {emqx_coap_message:response({error, internal_server_error}, Msg),
+ undefined}
+ end,
+ RetMsg2 = RetMsg#coap_message{type = ack},
+ Ret#{out => RetMsg2,
+ data => Data#data{cache = RetMsg2},
+ subscribe => SubInfo}
+ end;
+
+idle(out, #coap_message{type = non} = Msg, _, _) ->
+ #{next => maybe_reset,
+ out => Msg,
+ timeouts => [{stop_timeout, ?NON_LIFETIME}]};
+
+idle(out, Msg, Data, _) ->
+ _ = emqx_misc:rand_seed(),
+ Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
+ #{next => wait_ack,
+ data => Data#data{cache = Msg},
+ out => Msg,
+ timeouts => [ {state_timeout, Timeout, ack_timeout}
+ , {stop_timeout, ?EXCHANGE_LIFETIME}]}.
+
+maybe_reset(in, Message, _, _) ->
+ case Message of
+ #coap_message{type = reset} ->
+ ?INFO("Reset Message:~p~n", [Message]);
+ _ ->
+ ok
+ end,
+ ?EMPTY_RESULT.
+
+maybe_resend(in, _, _, #data{cache = Cache}) ->
+ #{out => Cache}.
+
+wait_ack(in, #coap_message{type = Type}, _, _) ->
+ case Type of
+ ack ->
+ #{next => until_stop};
+ reset ->
+ #{next => until_stop};
+ _ ->
+ ?EMPTY_RESULT
+ end;
+
+wait_ack(state_timeout,
+ ack_timeout,
+ _,
+ #data{cache = Msg,
+ retry_interval = Timeout,
+ retry_count = Count} =Data) ->
+ case Count < ?MAX_RETRANSMIT of
+ true ->
+ Timeout2 = Timeout * 2,
+ #{data => Data#data{retry_interval = Timeout2,
+ retry_count = Count + 1},
+ out => Msg,
+ timeouts => [{state_timeout, Timeout2, ack_timeout}]};
+ _ ->
+ #{next_state => until_stop}
+ end.
+
+until_stop(_, _, _, _) ->
+ ?EMPTY_RESULT.
diff --git a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl
index 963feca6b..0e0b33365 100644
--- a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl
+++ b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl
@@ -15,6 +15,72 @@
%%--------------------------------------------------------------------
-define(APP, emqx_coap).
+-define(DEFAULT_COAP_PORT, 5683).
+-define(DEFAULT_COAPS_PORT, 5684).
+-define(MAX_MESSAGE_ID, 65535).
+-define(MAX_BLOCK_SIZE, 1024).
+-define(DEFAULT_MAX_AGE, 60).
+-define(MAXIMUM_MAX_AGE, 4294967295).
+
+-define(EMPTY_RESULT, #{}).
+-define(TRANSFER_RESULT(R1, Keys, From, Value),
+ begin
+ R2 = maps:with(Keys, R1),
+ R2#{From => Value}
+ end).
+
+-type coap_message_id() :: 1 .. ?MAX_MESSAGE_ID.
+-type message_type() :: con | non | ack | reset.
+-type max_age() :: 1 .. ?MAXIMUM_MAX_AGE.
+
+-type message_option_name() :: if_match
+ | uri_host
+ | etag
+ | if_none_match
+ | uri_port
+ | location_path
+ | uri_path
+ | content_format
+ | max_age
+ | uri_query
+ | 'accept'
+ | location_query
+ | proxy_uri
+ | proxy_scheme
+ | size1
+ | observer
+ | block1
+ | block2.
+
+-type message_options() :: #{ if_match => list(binary())
+ , uri_host => binary()
+ , etag => list(binary())
+ , if_none_match => boolean()
+ , uri_port => 0 .. 65535
+ , location_path => list(binary())
+ , uri_path => list(binary())
+ , content_format => 0 .. 65535
+ , max_age => non_neg_integer()
+ , uri_query => list(binary())
+ , 'accept' => 0 .. 65535
+ , location_query => list(binary())
+ , proxy_uri => binary()
+ , proxy_scheme => binary()
+ , size1 => non_neg_integer()
+ , observer => non_neg_integer()
+ , block1 => {non_neg_integer(), boolean(), non_neg_integer()}
+ , block2 => {non_neg_integer(), boolean(), non_neg_integer()}}.
-record(coap_mqtt_auth, {clientid, username, password}).
+-record(coap_message, { type :: message_type()
+ , method
+ , id
+ , token = <<>>
+ , options = #{}
+ , payload = <<>>}).
+
+-record(coap_content, {etag, max_age = ?DEFAULT_MAX_AGE, format, location_path = [], payload = <<>>}).
+
+-type emqx_coap_message() :: #coap_message{}.
+-type coap_content() :: #coap_content{}.
diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl
new file mode 100644
index 000000000..f52610492
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl
@@ -0,0 +1,154 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% a coap to mqtt adapter
+-module(emqx_coap_mqtt_resource).
+
+-behaviour(emqx_coap_resource).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+-logger_header("[CoAP-RES]").
+
+-export([ init/1
+ , stop/1
+ , get/2
+ , put/2
+ , post/2
+ , delete/2
+ ]).
+
+-export([ check_topic/1
+ , publish/3
+ , subscribe/3
+ , unsubscribe/3]).
+
+-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, is_new => false}).
+
+init(_) ->
+ {ok, undefined}.
+
+stop(_) ->
+ ok.
+
+%% get: subscribe, ignore observe option
+get(#coap_message{token = Token} = Msg, Cfg) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ case Token of
+ <<>> ->
+ emqx_coap_message:response({error, bad_request}, <<"observer without token">>, Msg);
+ _ ->
+ Ret = subscribe(Msg, Topic, Cfg),
+ RetMsg = emqx_coap_message:response(Ret, Msg),
+ case Ret of
+ {ok, _} ->
+ {has_sub, RetMsg, {Topic, Token}};
+ _ ->
+ RetMsg
+ end
+ end;
+ Any ->
+ Any
+ end.
+
+%% put: equal post
+put(Msg, Cfg) ->
+ post(Msg, Cfg).
+
+%% post: publish a message
+post(Msg, Cfg) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ emqx_coap_message:response(publish(Msg, Topic, Cfg), Msg);
+ Any ->
+ Any
+ end.
+
+%% delete: ubsubscribe
+delete(Msg, Cfg) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ unsubscribe(Msg, Topic, Cfg),
+ {has_sub, emqx_coap_message:response({ok, deleted}, Msg), Topic};
+ Any ->
+ Any
+ end.
+
+check_topic(#coap_message{options = Options} = Msg) ->
+ case maps:get(uri_path, Options, []) of
+ [] ->
+ emqx_coap_message:response({error, bad_request}, <<"invalid topic">> , Msg);
+ UriPath ->
+ Sep = <<"/">>,
+ {ok, lists:foldl(fun(Part, Acc) ->
+ <>
+ end,
+ <<>>,
+ UriPath)}
+ end.
+
+publish(#coap_message{payload = Payload} = Msg,
+ Topic,
+ #{clientinfo := ClientInfo,
+ publish_qos := QOS} = Cfg) ->
+ case emqx_coap_channel:auth_publish(Topic, Cfg) of
+ allow ->
+ #{clientid := ClientId} = ClientInfo,
+ MQTTMsg = emqx_message:make(ClientId, type_to_qos(QOS, Msg), Topic, Payload),
+ MQTTMsg2 = emqx_message:set_flag(retain, false, MQTTMsg),
+ _ = emqx_broker:publish(MQTTMsg2),
+ {ok, changed};
+ _ ->
+ {error, unauthorized}
+ end.
+
+subscribe(Msg, Topic, #{clientinfo := ClientInfo}= Cfg) ->
+ case emqx_topic:wildcard(Topic) of
+ false ->
+ case emqx_coap_channel:auth_subscribe(Topic, Cfg) of
+ allow ->
+ #{clientid := ClientId} = ClientInfo,
+ SubOpts = get_sub_opts(Msg, Cfg),
+ emqx_broker:subscribe(Topic, ClientId, SubOpts),
+ emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
+ {ok, created};
+ _ ->
+ {error, unauthorized}
+ end;
+ _ ->
+ %% now, we don't support wildcard in subscribe topic
+ {error, bad_request, <<"">>}
+ end.
+
+unsubscribe(Msg, Topic, #{clientinfo := ClientInfo} = Cfg) ->
+ emqx_broker:unsubscribe(Topic),
+ emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, get_sub_opts(Msg, Cfg)]).
+
+get_sub_opts(Msg, #{subscribe_qos := Type}) ->
+ ?SUBOPTS#{qos => type_to_qos(Type, Msg)}.
+
+type_to_qos(qos0, _) -> ?QOS_0;
+type_to_qos(qos1, _) -> ?QOS_1;
+type_to_qos(qos2, _) -> ?QOS_2;
+type_to_qos(coap, #coap_message{type = Type}) ->
+ case Type of
+ non ->
+ ?QOS_0;
+ _ ->
+ ?QOS_1
+ end.
diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl
new file mode 100644
index 000000000..c7c13da0c
--- /dev/null
+++ b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl
@@ -0,0 +1,220 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% a coap to mqtt adapter with a retained topic message database
+-module(emqx_coap_pubsub_resource).
+
+-behaviour(emqx_coap_resource).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
+
+-logger_header("[CoAP-PS-RES]").
+
+-export([ init/1
+ , stop/1
+ , get/2
+ , put/2
+ , post/2
+ , delete/2
+ ]).
+-import(emqx_coap_mqtt_resource, [ check_topic/1, subscribe/3, unsubscribe/3
+ , publish/3]).
+
+-import(emqx_coap_message, [response/2, response/3, set_content/2]).
+%%--------------------------------------------------------------------
+%% Resource Callbacks
+%%--------------------------------------------------------------------
+init(_) ->
+ emqx_coap_pubsub_topics:start_link().
+
+stop(Pid) ->
+ emqx_coap_pubsub_topics:stop(Pid).
+
+%% get: read last publish message
+%% get with observe 0: subscribe
+%% get with observe 1: unsubscribe
+get(#coap_message{token = Token} = Msg, Cfg) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ case emqx_coap_message:get_option(observe, Msg) of
+ undefined ->
+ Content = emqx_coap_message:get_content(Msg),
+ read_last_publish_message(emqx_topic:wildcard(Topic), Msg, Topic, Content);
+ 0 ->
+ case Token of
+ <<>> ->
+ response({error, bad_reuqest}, <<"observe without token">>, Msg);
+ _ ->
+ Ret = subscribe(Msg, Topic, Cfg),
+ RetMsg = response(Ret, Msg),
+ case Ret of
+ {ok, _} ->
+ {has_sub, RetMsg, {Topic, Token}};
+ _ ->
+ RetMsg
+ end
+ end;
+ 1 ->
+ unsubscribe(Msg, Topic, Cfg),
+ {has_sub, response({ok, deleted}, Msg), Topic}
+ end;
+ Any ->
+ Any
+ end.
+
+%% put: insert a message into topic database
+put(Msg, _) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ Content = emqx_coap_message:get_content(Msg),
+ #coap_content{payload = Payload,
+ format = Format,
+ max_age = MaxAge} = Content,
+ handle_received_create(Msg, Topic, MaxAge, Format, Payload);
+ Any ->
+ Any
+ end.
+
+%% post: like put, but will publish the inserted message
+post(Msg, Cfg) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ Content = emqx_coap_message:get_content(Msg),
+ #coap_content{max_age = MaxAge,
+ format = Format,
+ payload = Payload} = Content,
+ handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg);
+ Any ->
+ Any
+ end.
+
+%% delete: delete a message from topic database
+delete(Msg, _) ->
+ case check_topic(Msg) of
+ {ok, Topic} ->
+ delete_topic_info(Msg, Topic);
+ Any ->
+ Any
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal Functions
+%%--------------------------------------------------------------------
+add_topic_info(Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> ->
+ case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
+ [{_, StoredMaxAge, StoredCT, _, _}] ->
+ ?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
+ %% check whether the ct value stored matches the ct option in this POST message
+ case Format =:= StoredCT of
+ true ->
+ {ok, Ret} =
+ case StoredMaxAge =:= MaxAge of
+ true ->
+ emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload);
+ false ->
+ emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload)
+ end,
+ {changed, Ret};
+ false ->
+ ?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]),
+ {changed, false}
+ end;
+ [] ->
+ ?LOG(debug, "publish topic=~p will be created", [Topic]),
+ {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload),
+ {created, Ret}
+ end;
+
+add_topic_info(Topic, _MaxAge, _Format, _Payload) ->
+ ?LOG(debug, "create topic=~p info failed", [Topic]),
+ {badarg, false}.
+
+format_string_to_int(<<"application/octet-stream">>) ->
+ <<"42">>;
+format_string_to_int(<<"application/exi">>) ->
+ <<"47">>;
+format_string_to_int(<<"application/json">>) ->
+ <<"50">>;
+format_string_to_int(_) ->
+ <<"42">>.
+
+handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg) ->
+ case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of
+ {_, true} ->
+ response(publish(Msg, Topic, Cfg), Msg);
+ {_, false} ->
+ ?LOG(debug, "add_topic_info failed, will return bad_request", []),
+ response({error, bad_request}, Msg)
+ end.
+
+handle_received_create(Msg, Topic, MaxAge, Format, Payload) ->
+ case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of
+ {Ret, true} ->
+ response({ok, Ret}, Msg);
+ {_, false} ->
+ ?LOG(debug, "add_topic_info failed, will return bad_request", []),
+ response({error, bad_request}, Msg)
+ end.
+
+return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content) ->
+ TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000),
+ case TimeElapsed < MaxAge of
+ true ->
+ LeftTime = (MaxAge - TimeElapsed),
+ ?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]),
+ set_content(Content#coap_content{max_age = LeftTime, payload = Payload},
+ response({ok, content}, Msg));
+ false ->
+ ?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]),
+ response({ok, nocontent}, Msg)
+ end.
+
+read_last_publish_message(false, Msg, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
+ ?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
+ case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
+ [] ->
+ response({error, not_found}, Msg);
+ [{_, MaxAge, CT, Payload, TimeStamp}] ->
+ case CT =:= format_string_to_int(QueryFormat) of
+ true ->
+ return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content);
+ false ->
+ ?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]),
+ response({error, bad_request}, Msg)
+ end
+ end;
+
+read_last_publish_message(false, Msg, Topic, Content) ->
+ case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
+ [] ->
+ response({error, not_found}, Msg);
+ [{_, MaxAge, _, Payload, TimeStamp}] ->
+ return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content)
+ end;
+
+read_last_publish_message(true, Msg, Topic, _Content) ->
+ ?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]),
+ response({error, bad_request}, Msg).
+
+delete_topic_info(Msg, Topic) ->
+ case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
+ [] ->
+ response({error, not_found}, Msg);
+ [{_, _, _, _, _}] ->
+ emqx_coap_pubsub_topics:delete_sub_topics(Topic),
+ response({ok, deleted}, Msg)
+ end.
diff --git a/apps/emqx_gateway/src/coap/emqx_coap_pubsub_topics.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl
similarity index 98%
rename from apps/emqx_gateway/src/coap/emqx_coap_pubsub_topics.erl
rename to apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl
index c8afc294e..4ebada566 100644
--- a/apps/emqx_gateway/src/coap/emqx_coap_pubsub_topics.erl
+++ b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl
@@ -18,10 +18,8 @@
-behaviour(gen_server).
--include("src/coap/include/emqx_coap.hrl").
--include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-logger_header("[CoAP-PS-TOPICS]").
diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl
index d1feb7725..a25640c9b 100644
--- a/apps/emqx_gateway/src/emqx_gateway_app.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_app.erl
@@ -45,7 +45,7 @@ load_default_gateway_applications() ->
gateway_type_searching() ->
%% FIXME: Hardcoded apps
- [emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl].
+ [emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl, emqx_coap_impl].
load(Mod) ->
try
diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl
index ff4815e36..f5f2b9ab0 100644
--- a/apps/emqx_gateway/src/emqx_gateway_schema.erl
+++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl
@@ -34,7 +34,8 @@ structs() -> ["gateway"].
fields("gateway") ->
[{stomp, t(ref(stomp))},
{mqttsn, t(ref(mqttsn))},
- {exproto, t(ref(exproto))}
+ {exproto, t(ref(exproto))},
+ {coap, t(ref(coap))}
];
fields(stomp) ->
@@ -154,8 +155,8 @@ fields(listener_settings) ->
];
fields(tcp_listener_settings) ->
- [
- %% some special confs for tcp listener
+ [
+ %% some special confs for tcp listener
] ++ fields(listener_settings);
fields(ssl_listener_settings) ->
@@ -168,12 +169,12 @@ fields(ssl_listener_settings) ->
fields(udp_listener_settings) ->
[
- %% some special confs for udp listener
+ %% some special confs for udp listener
] ++ fields(listener_settings);
fields(dtls_listener_settings) ->
[
- %% some special confs for dtls listener
+ %% some special confs for dtls listener
] ++
ssl(undefined, #{handshake_timeout => "15s"
, depth => 10
@@ -183,6 +184,20 @@ fields(access) ->
[ {"$id", #{type => string(),
nullable => true}}];
+fields(coap) ->
+ [{"$id", t(ref(coap_structs))}];
+
+fields(coap_structs) ->
+ [ {enable_stats, t(boolean(), undefined, true)}
+ , {authenticator, t(union([allow_anonymous]))}
+ , {heartbeat, t(duration(), undefined, "15s")}
+ , {resource, t(union([mqtt, pubsub]), undefined, mqtt)}
+ , {notify_type, t(union([non, con, qos]), undefined, qos)}
+ , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
+ , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
+ , {listener, t(ref(udp_listener_group))}
+ ];
+
fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"),
Mod:fields(ExtraField).