refactor(eqmx_gateway): port emqx_coap into emqx_gateway framework

This commit is contained in:
lafirest 2021-07-21 17:35:04 +08:00 committed by JianBo He
parent cb2909dfbf
commit 137294db8f
26 changed files with 2392 additions and 1476 deletions

View File

@ -27,6 +27,32 @@ gateway: {
}
}
coap.1: {
enable_stats: false
authenticator: allow_anonymous
heartbeat: 30s
resource: mqtt
notify_type: qos
subscribe_qos: qos0
publish_qos: qos1
listener.udp.1: {
bind: 5687
}
}
coap.2: {
enable_stats: false
authenticator: allow_anonymous
heartbeat: 30s
resource: pubsub
notify_type: non
subscribe_qos: qos2
publish_qos: coap
listener.udp.1: {
bind: 5683
}
}
mqttsn.1: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1

View File

@ -1,6 +1,5 @@
{erl_opts, [debug_info]}.
{deps, [
{gen_coap, {git, "https://github.com/emqx/gen_coap", {tag, "v0.3.2"}}},
{lwm2m_coap, {git, "https://github.com/emqx/lwm2m-coap", {tag, "v2.0.0"}}},
{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}}
]}.

View File

@ -1,248 +1,183 @@
# emqx-coap
# Table of Contents
emqx-coap is a CoAP Gateway for EMQ X Broker. It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
1. [EMQX 5.0 CoAP Gateway](#org6feb6de)
2. [CoAP Message Processing Flow](#org8458c1a)
1. [Request Timing Diagram](#orgeaa4f53)
1. [Transport && Transport Manager](#org88207b8)
2. [Resource](#orgb32ce94)
3. [Resource](#org8956f90)
1. [MQTT Resource](#orge8c21b1)
2. [PubSub Resource](#org68ddce7)
4. [Heartbeat](#orgffdfecd)
5. [Command](#org43004c2)
6. [MQTT QOS <=> CoAP non/con](#org0157b5c)
### Client Usage Example
libcoap is an excellent coap library which has a simple client tool. It is recommended to use libcoap as a coap client.
To compile libcoap, do following steps:
<a id="org6feb6de"></a>
# EMQX 5.0 CoAP Gateway
emqx-coap is a CoAP Gateway for EMQ X Broker.
It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
<a id="org8458c1a"></a>
# CoAP Message Processing Flow
<a id="orgeaa4f53"></a>
## Request Timing Diagram
,------. ,------------. ,-----------------. ,---------. ,--------.
|client| |coap_gateway| |transport_manager| |transport| |resource|
`--+---' `-----+------' `--------+--------' `----+----' `---+----'
| | | | |
| -------------------> | | |
| | | | |
| | | | |
| | ------------------------>| | |
| | | | |
| | | | |
| | |----------------------->| |
| | | | |
| | | | |
| | | |------------------>|
| | | | |
| | | | |
| | | |<------------------|
| | | | |
| | | | |
| | |<-----------------------| |
| | | | |
| | | | |
| | <------------------------| | |
| | | | |
| | | | |
| <------------------- | | |
,--+---. ,-----+------. ,--------+--------. ,----+----. ,---+----.
|client| |coap_gateway| |transport_manager| |transport| |resource|
`------' `------------' `-----------------' `---------' `--------'
<a id="org88207b8"></a>
### Transport && Transport Manager
Transport is a module that manages the life cycle and behaviour of CoAP messages\
And the transport manager is to manage all transport which in this gateway
<a id="orgb32ce94"></a>
### Resource
The Resource is a behaviour that must implement GET/PUT/POST/DELETE method\
Different Resources can have different implementations of this four method\
Each gateway can only use one Resource module to process CoAP Request Message
<a id="org8956f90"></a>
# Resource
<a id="orge8c21b1"></a>
## MQTT Resource
The MQTT Resource is a simple CoAP to MQTT adapter, the implementation of each method is as follows:
- use uri path as topic
- GET: subscribe the topic
- PUT: publish message to this topic
- POST: like PUT
- DELETE: unsubscribe the topic
<a id="org68ddce7"></a>
## PubSub Resource
The PubSub Resource like the MQTT Resource, but has a retained topic's message database\
This Resource is shared, only can has one instance. The implementation:
- use uri path as topic
- GET:
- GET with observe = 0: subscribe the topic
- GET with observe = 1: unsubscribe the topic
- GET without observe: read lastest message from the message database, key is the topic
- PUT:
insert message into the message database, key is the topic
- POST:
like PUT, but will publish the message
- DELETE:
delete message from the database, key is topic
<a id="orgffdfecd"></a>
# Heartbeat
At present, the CoAP gateway only supports UDP/DTLS connection, don't support UDP over TCP and UDP over WebSocket.
Because UDP is connectionless, so the client needs to send heartbeat ping to the server interval. Otherwise, the server will close related resources
Use ****POST with empty uri path**** as a heartbeat ping
example:
```
git clone http://github.com/obgm/libcoap
cd libcoap
./autogen.sh
./configure --enable-documentation=no --enable-tests=no
make
coap-client -m post coap://127.0.0.1
```
### Publish example:
<a id="org43004c2"></a>
# Command
Command is means the operation which outside the CoAP protocol, like authorization
The Command format:
1. use ****POST**** method
2. uri path is empty
3. query string is like ****action=comandX&argX=valuex&argY=valueY****
example:
1. connect:
```
libcoap/examples/coap-client -m put -e 1234 "coap://127.0.0.1/mqtt/topic1?c=client1&u=tom&p=secret"
coap-client -m post coap://127.0.0.1?action=connect&clientid=XXX&username=XXX&password=XXX
```
- topic name is "topic1", NOT "/topic1"
- client id is client1
- username is tom
- password is secret
- payload is a text string "1234"
A mqtt message with topic="topic1", payload="1234" has been published. Any mqtt client or coap client, who has subscribed this topic could receive this message immediately.
### Subscribe example:
2. disconnect:
```
libcoap/examples/coap-client -m get -s 10 "coap://127.0.0.1/mqtt/topic1?c=client1&u=tom&p=secret"
```
- topic name is "topic1", NOT "/topic1"
- client id is client1
- username is tom
- password is secret
- subscribe time is 10 seconds
And you will get following result if any mqtt client or coap client sent message with text "1234567" to "topic1":
```
v:1 t:CON c:GET i:31ae {} [ ]
1234567v:1 t:CON c:GET i:31af {} [ Observe:1, Uri-Path:mqtt, Uri-Path:topic1, Uri-Query:c=client1, Uri-Query:u=tom, Uri-Query:p=secret ]
```
The output message is not well formatted which hide "1234567" at the head of the 2nd line.
### Configure
#### Common
File: etc/emqx_coap.conf
```properties
## The UDP port that CoAP is listening on.
##
## Value: Port
coap.port = 5683
## Interval for keepalive, specified in seconds.
##
## Value: Duration
## -s: seconds
## -m: minutes
## -h: hours
coap.keepalive = 120s
## Whether to enable statistics for CoAP clients.
##
## Value: on | off
coap.enable_stats = off
coap-client -m post coap://127.0.0.1?action=disconnect
```
#### DTLS
<a id="org0157b5c"></a>
emqx_coap enable one-way authentication by default.
# MQTT QOS <=> CoAP non/con
If you want to disable it, comment these lines.
CoAP gateway uses some options to control the conversion between MQTT qos and coap non/con:
File: etc/emqx_coap.conf
1.notify_type
Control the type of notify messages when the observed object has changed.Can be:
```properties
- non
- con
- qos
in this value, MQTT QOS0 -> non, QOS1/QOS2 -> con
## The DTLS port that CoAP is listening on.
##
## Value: Port
coap.dtls.port = 5684
2.subscribe_qos
Control the qos of subscribe.Can be:
## Private key file for DTLS
##
## Value: File
coap.dtls.keyfile = {{ platform_etc_dir }}/certs/key.pem
- qos0
- qos1
- qos2
- coap
in this value, CoAP non -> qos0, con -> qos1
## Server certificate for DTLS.
##
## Value: File
coap.dtls.certfile = {{ platform_etc_dir }}/certs/cert.pem
```
##### Enable two-way autentication
For two-way autentication:
```properties
## A server only does x509-path validation in mode verify_peer,
## as it then sends a certificate request to the client (this
## message is not sent if the verify option is verify_none).
## You can then also want to specify option fail_if_no_peer_cert.
## More information at: http://erlang.org/doc/man/ssl.html
##
## Value: verify_peer | verify_none
## coap.dtls.verify = verify_peer
## PEM-encoded CA certificates for DTLS
##
## Value: File
## coap.dtls.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## Used together with {verify, verify_peer} by an SSL server. If set to true,
## the server fails if the client does not have a certificate to send, that is,
## sends an empty certificate.
##
## Value: true | false
## coap.dtls.fail_if_no_peer_cert = false
```
### Load emqx-coap
```bash
./bin/emqx_ctl plugins load emqx_coap
```
CoAP Client Observe Operation (subscribe topic)
-----------------------------------------------
To subscribe any topic, issue following command:
```
GET coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password} with OBSERVE=0
```
- "mqtt" in the path is mandatory.
- replace {topicname}, {clientid}, {username} and {password} with your true values.
- {topicname} and {clientid} is mandatory.
- if clientid is absent, a "bad_request" will be returned.
- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
- {username} and {password} are optional.
- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
- topic is subscribed with qos1.
- if the subscription failed due to Authorization deny, the error code `forbidden` will be returned.
CoAP Client Unobserve Operation (unsubscribe topic)
---------------------------------------------------
To cancel observation, issue following command:
```
GET coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password} with OBSERVE=1
```
- "mqtt" in the path is mandatory.
- replace {topicname}, {clientid}, {username} and {password} with your true values.
- {topicname} and {clientid} is mandatory.
- if clientid is absent, a "bad_request" will be returned.
- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
- {username} and {password} are optional.
- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
CoAP Client Notification Operation (subscribed Message)
-------------------------------------------------------
Server will issue an observe-notification as a subscribed message.
- Its payload is exactly the mqtt payload.
- payload data type is "application/octet-stream".
CoAP Client Publish Operation
-----------------------------
Issue a coap put command to publish messages. For example:
```
PUT coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password}
```
- "mqtt" in the path is mandatory.
- replace {topicname}, {clientid}, {username} and {password} with your true values.
- {topicname} and {clientid} is mandatory.
- if clientid is absent, a "bad_request" will be returned.
- {topicname} in URI should be percent-encoded to prevent special characters, such as + and #.
- {username} and {password} are optional.
- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
- payload could be any binary data.
- payload data type is "application/octet-stream".
- publish message will be sent with qos0.
- if the publishing failed due to Authorization deny, the error code `forbidden` will be returned.
CoAP Client Keep Alive
----------------------
Device should issue a get command periodically, serve as a ping to keep mqtt session online.
```
GET coap://localhost/mqtt/{any_topicname}?c={clientid}&u={username}&p={password}
```
- "mqtt" in the path is mandatory.
- replace {any_topicname}, {clientid}, {username} and {password} with your true values.
- {any_topicname} is optional, and should be percent-encoded to prevent special characters.
- {clientid} is mandatory. If clientid is absent, a "bad_request" will be returned.
- {username} and {password} are optional.
- if {username} or {password} is incorrect, the error code `unauthorized` will be returned.
- coap client should do keepalive work periodically to keep mqtt session online, especially those devices in a NAT network.
CoAP Client NOTES
-----------------
emqx-coap gateway does not accept POST and DELETE requests.
Topics in URI should be percent-encoded, but corresponding uri_path option has percent-encoding converted. Please refer to RFC 7252 section 6.4, "Decomposing URIs into Options":
> Note that these rules completely resolve any percent-encoding.
That implies coap client is responsible to convert any percert-encoding into true character while assembling coap packet.
ClientId, Username, Password and Topic
--------------------------------------
ClientId/username/password/topic in the coap URI are the concepts in mqtt. That is to say, emqx-coap is trying to fit coap message into mqtt system, by borrowing the client/username/password/topic from mqtt.
The Auth/Authorization/Hook features in mqtt also applies on coap stuff. For example:
- If username/password is not authorized, coap client will get an unauthorized error.
- If username or clientid is not allowed to published specific topic, coap message will be dropped in fact, although coap client will get an acknoledgement from emqx-coap.
- If a coap message is published, a 'message.publish' hook is able to capture this message as well.
well-known locations
--------------------
Discovery always return "</mqtt>,</ps>"
For example
```
libcoap/examples/coap-client -m get "coap://127.0.0.1/.well-known/core"
```
3.publish_qos
like subscribe_qos, but control the qos of the publish MQTT message
License
-------
@ -253,4 +188,3 @@ Author
------
EMQ X Team.

View File

@ -1,14 +0,0 @@
{application, emqx_coap,
[{description, "EMQ X CoAP Gateway"},
{vsn, "5.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gen_coap]},
{mod, {emqx_coap_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, [{"Homepage", "https://emqx.io/"},
{"Github", "https://github.com/emqx/emqx-coap"}
]}
]}.

View File

@ -1,40 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_app).
-behaviour(application).
-emqx_plugin(protocol).
-include("src/coap/include/emqx_coap.hrl").
-export([ start/2
, stop/1
]).
start(_Type, _Args) ->
{ok, Sup} = emqx_coap_sup:start_link(),
coap_server_registry:add_handler([<<"mqtt">>], emqx_coap_resource, undefined),
coap_server_registry:add_handler([<<"ps">>], emqx_coap_pubsub_resource, undefined),
_ = emqx_coap_pubsub_topics:start_link(),
emqx_coap_server:start(application:get_all_env(?APP)),
{ok,Sup}.
stop(_State) ->
coap_server_registry:remove_handler([<<"mqtt">>], emqx_coap_resource, undefined),
coap_server_registry:remove_handler([<<"ps">>], emqx_coap_pubsub_resource, undefined),
emqx_coap_server:stop(application:get_all_env(?APP)).

View File

@ -0,0 +1,387 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_channel).
-behavior(emqx_gateway_channel).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
%% API
-export([]).
-export([ info/1
, info/2
, stats/1
, auth_publish/2
, auth_subscribe/2
, reply/4
, ack/4
, transfer_result/3]).
-export([ init/2
, handle_in/2
, handle_deliver/2
, handle_timeout/3
, terminate/2
]).
-export([ handle_call/2
, handle_cast/2
, handle_info/2
]).
-export_type([channel/0]).
-record(channel, {
%% Context
ctx :: emqx_gateway_ctx:context(),
%% Connection Info
conninfo :: emqx_types:conninfo(),
%% Client Info
clientinfo :: emqx_types:clientinfo(),
%% Session
session :: emqx_coap_session:session() | undefined,
%% Keepalive
keepalive :: emqx_keepalive:keepalive() | undefined,
%% Timer
timers :: #{atom() => disable | undefined | reference()},
config :: hocon:config()
}).
-type channel() :: #channel{}.
-define(DISCONNECT_WAIT_TIME, timer:seconds(10)).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
%%%===================================================================
%%% API
%%%===================================================================
info(Channel) ->
maps:from_list(info(?INFO_KEYS, Channel)).
info(Keys, Channel) when is_list(Keys) ->
[{Key, info(Key, Channel)} || Key <- Keys];
info(conninfo, #channel{conninfo = ConnInfo}) ->
ConnInfo;
info(conn_state, _) ->
connected;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(ctx, #channel{ctx = Ctx}) ->
Ctx.
stats(_) ->
[].
init(ConnInfo = #{peername := {PeerHost, _},
sockname := {_, SockPort}},
#{ctx := Ctx} = Config) ->
Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Config, undefined),
ClientInfo = set_peercert_infos(
Peercert,
#{ zone => default
, protocol => 'mqtt-coap'
, peerhost => PeerHost
, sockport => SockPort
, clientid => emqx_guid:to_base62(emqx_guid:gen())
, username => undefined
, is_bridge => false
, is_superuser => false
, mountpoint => Mountpoint
}
),
#channel{ ctx = Ctx
, conninfo = ConnInfo
, clientinfo = ClientInfo
, timers = #{}
, session = emqx_coap_session:new()
, config = Config#{clientinfo => ClientInfo,
ctx => Ctx}
, keepalive = emqx_keepalive:init(maps:get(heartbeat, Config))
}.
auth_publish(Topic,
#{ctx := Ctx,
clientinfo := ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic).
auth_subscribe(Topic,
#{ctx := Ctx,
clientinfo := ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic).
transfer_result(Result, From, Value) ->
?TRANSFER_RESULT(Result, [out], From, Value).
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
%% treat post to root path as a heartbeat
%% treat post to root path with query string as a command
handle_in(#coap_message{method = post,
options = Options} = Msg, ChannelT) ->
Channel = ensure_keepalive_timer(ChannelT),
case maps:get(uri_path, Options, <<>>) of
<<>> ->
handle_command(Msg, Channel);
_ ->
call_session(Channel, received, [Msg])
end;
handle_in(Msg, Channel) ->
call_session(ensure_keepalive_timer(Channel), received, [Msg]).
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
handle_deliver(Delivers, Channel) ->
call_session(Channel, deliver, [Delivers]).
%%--------------------------------------------------------------------
%% Handle timeout
%%--------------------------------------------------------------------
handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) ->
case emqx_keepalive:check(NewVal, KeepAlive) of
{ok, NewKeepAlive} ->
Channel2 = ensure_keepalive_timer(Channel, fun make_timer/4),
{ok, Channel2#channel{keepalive = NewKeepAlive}};
{error, timeout} ->
{shutdown, timeout, Channel}
end;
handle_timeout(_, {transport, Msg}, Channel) ->
call_session(Channel, timeout, [Msg]);
handle_timeout(_, disconnect, Channel) ->
{shutdown, normal, Channel};
handle_timeout(_, _, Channel) ->
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle call
%%--------------------------------------------------------------------
handle_call(Req, Channel) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, Channel}.
%%--------------------------------------------------------------------
%% Handle Cast
%%--------------------------------------------------------------------
handle_cast(Req, Channel) ->
?LOG(error, "Unexpected cast: ~p", [Req]),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Handle Info
%%--------------------------------------------------------------------
handle_info(Info, Channel) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{ok, Channel}.
%%--------------------------------------------------------------------
%% Terminate
%%--------------------------------------------------------------------
terminate(_Reason, _Channel) ->
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================
set_peercert_infos(NoSSL, ClientInfo)
when NoSSL =:= nossl;
NoSSL =:= undefined ->
ClientInfo;
set_peercert_infos(Peercert, ClientInfo) ->
{DN, CN} = {esockd_peercert:subject(Peercert),
esockd_peercert:common_name(Peercert)},
ClientInfo#{dn => DN, cn => CN}.
ensure_timer(Name, Time, Msg, #channel{timers = Timers} = Channel) ->
case maps:get(Name, Timers, undefined) of
undefined ->
make_timer(Name, Time, Msg, Channel);
_ ->
Channel
end.
make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
TRef = emqx_misc:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
ensure_keepalive_timer(Channel) ->
ensure_keepalive_timer(Channel, fun ensure_timer/4).
ensure_keepalive_timer(#channel{config = Cfg} = Channel, Fun) ->
Interval = maps:get(heartbeat, Cfg),
Fun(keepalive, Interval, keepalive, Channel).
handle_command(#coap_message{options = Options} = Msg, Channel) ->
case maps:get(uri_query, Options, []) of
[] ->
%% heartbeat
ack(Channel, {ok, valid}, <<>>, Msg);
QueryPairs ->
Queries = lists:foldl(fun(Pair, Acc) ->
[{K, V}] = cow_qs:parse_qs(Pair),
Acc#{K => V}
end,
#{},
QueryPairs),
case maps:get(<<"action">>, Queries, undefined) of
undefined ->
ack(Channel, {error, bad_request}, <<"command without actions">>, Msg);
Action ->
handle_command(Action, Queries, Msg, Channel)
end
end.
handle_command(<<"connect">>, Queries, Msg, Channel) ->
case emqx_misc:pipeline(
[ fun run_conn_hooks/2
, fun enrich_clientinfo/2
, fun set_log_meta/2
, fun auth_connect/2
],
{Queries, Msg},
Channel) of
{ok, _Input, NChannel} ->
process_connect(ensure_connected(NChannel), Msg);
{error, ReasonCode, NChannel} ->
ErrMsg = io_lib:format("Login Failed: ~s", [ReasonCode]),
ack(NChannel, {error, bad_request}, ErrMsg, Msg)
end;
handle_command(<<"disconnect">>, _, Msg, Channel) ->
Channel2 = ensure_timer(disconnect, ?DISCONNECT_WAIT_TIME, disconnect, Channel),
ack(Channel2, {ok, deleted}, <<>>, Msg);
handle_command(_, _, Msg, Channel) ->
ack(Channel, {error, bad_request}, <<"invalid action">>, Msg).
run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
conninfo = ConnInfo}) ->
ConnProps = #{},
case run_hooks(Ctx, 'client.connect', [ConnInfo], ConnProps) of
Error = {error, _Reason} -> Error;
_NConnProps ->
{ok, Input, Channel}
end.
enrich_clientinfo({Queries, Msg},
Channel = #channel{clientinfo = ClientInfo0,
config = Cfg}) ->
case Queries of
#{<<"username">> := UserName,
<<"password">> := Password,
<<"clientid">> := ClientId} ->
ClientInfo = ClientInfo0#{username => UserName,
password => Password,
clientid => ClientId},
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
{ok, Channel#channel{clientinfo = NClientInfo,
config = Cfg#{clientinfo := NClientInfo}}};
_ ->
{error, "invalid queries", Channel}
end.
set_log_meta(_Input, #channel{clientinfo = #{clientid := ClientId}}) ->
emqx_logger:set_metadata_clientid(ClientId),
ok.
auth_connect(_Input, Channel = #channel{ctx = Ctx,
clientinfo = ClientInfo}) ->
#{clientid := ClientId,
username := Username} = ClientInfo,
case emqx_gateway_ctx:authenticate(Ctx, ClientInfo) of
{ok, NClientInfo} ->
{ok, Channel#channel{clientinfo = NClientInfo}};
{error, Reason} ->
?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p",
[ClientId, Username, Reason]),
{error, Reason}
end.
fix_mountpoint(_Packet, #{mountpoint := undefined}) -> ok;
fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
%% TODO: Enrich the varibale replacement????
%% i.e: ${ClientInfo.auth_result.productKey}
Mountpoint1 = emqx_mountpoint:replvar(Mountpoint, ClientInfo),
{ok, ClientInfo#{mountpoint := Mountpoint1}}.
ensure_connected(Channel = #channel{ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo}.
process_connect(Channel = #channel{ctx = Ctx,
session = Session,
conninfo = ConnInfo,
clientinfo = ClientInfo},
Msg) ->
SessFun = fun(_,_) -> Session end,
case emqx_gateway_ctx:open_session(
Ctx,
true,
ClientInfo,
ConnInfo,
SessFun
) of
{ok, _Sess} ->
ack(Channel, {ok, created}, <<"connected">>, Msg);
{error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]),
ack(Channel, {error, bad_request}, <<>>, Msg)
end.
run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args).
run_hooks(Ctx, Name, Args, Acc) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run_fold(Name, Args, Acc).
reply(Channel, Method, Payload, Req) ->
call_session(Channel, reply, [Req, Method, Payload]).
ack(Channel, Method, Payload, Req) ->
call_session(Channel, piggyback, [Req, Method, Payload]).
call_session(#channel{session = Session,
config = Cfg} = Channel, F, A) ->
case erlang:apply(emqx_coap_session, F, [Session, Cfg | A]) of
#{out := Out,
session := Session2} ->
{ok, {outgoing, Out}, Channel#channel{session = Session2}};
#{out := Out} ->
{ok, {outgoing, Out}, Channel};
#{session := Session2} ->
{ok, Channel#channel{session = Session2}};
_ ->
{ok, Channel}
end.

View File

@ -0,0 +1,423 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_frame).
-behavior(emqx_gateway_frame).
%% emqx_gateway_frame API
-export([ initial_parse_state/1
, serialize_opts/0
, serialize_pkt/2
, parse/2
, format/1
, type/1
, is_message/1]).
%% API
-export([]).
-include("include/emqx_coap.hrl").
-include("apps/emqx/include/types.hrl").
-define(VERSION, 1).
-define(OPTION_IF_MATCH, 1).
-define(OPTION_URI_HOST, 3).
-define(OPTION_ETAG, 4).
-define(OPTION_IF_NONE_MATCH, 5).
-define(OPTION_OBSERVE, 6). % draft-ietf-core-observe-16
-define(OPTION_URI_PORT, 7).
-define(OPTION_LOCATION_PATH, 8).
-define(OPTION_URI_PATH, 11).
-define(OPTION_CONTENT_FORMAT, 12).
-define(OPTION_MAX_AGE, 14).
-define(OPTION_URI_QUERY, 15).
-define(OPTION_ACCEPT, 17).
-define(OPTION_LOCATION_QUERY, 20).
-define(OPTION_BLOCK2, 23). % draft-ietf-core-block-17
-define(OPTION_BLOCK1, 27).
-define(OPTION_PROXY_URI, 35).
-define(OPTION_PROXY_SCHEME, 39).
-define(OPTION_SIZE1, 60).
%%%===================================================================
%%% API
%%%===================================================================
initial_parse_state(_) ->
#{}.
serialize_opts() ->
#{}.
%%%===================================================================
%%% serialize_pkt
%%%===================================================================
%% empty message
serialize_pkt(#coap_message{type = Type, method = undefined, id = MsgId}, _Opts) ->
<<?VERSION:2, (encode_type(Type)):2, 0:4, 0:3, 0:5, MsgId:16>>;
serialize_pkt(#coap_message{ type = Type
, method = Method
, id = MsgId
, token = Token
, options = Options
, payload = Payload
},
_Opts) ->
TKL = byte_size(Token),
{Class, Code} = method_to_class_code(Method),
Head = <<?VERSION:2, (encode_type(Type)):2, TKL:4, Class:3, Code:5, MsgId:16, Token:TKL/binary>>,
FlatOpts = flatten_options(Options),
encode_option_list(FlatOpts, 0, Head, Payload).
-spec encode_type(message_type()) -> 0 .. 3.
encode_type(con) -> 0;
encode_type(non) -> 1;
encode_type(ack) -> 2;
encode_type(reset) -> 3.
flatten_options(Opts) ->
flatten_options(maps:to_list(Opts), []).
flatten_options([{_OptId, undefined} | T], Acc) ->
flatten_options(T, Acc);
flatten_options([{OptId, OptVal} | T], Acc) ->
flatten_options(T,
case is_repeatable_option(OptId) of
false ->
[encode_option(OptId, OptVal) | Acc];
_ ->
lists:foldl(fun(undefined, InnerAcc) ->
InnerAcc;
(E, InnerAcc) ->
[encode_option(OptId, E) | InnerAcc]
end, Acc, OptVal)
end);
flatten_options([], Acc) ->
%% sort by option id for calculate the deltas
lists:keysort(1, Acc).
encode_option_list([{OptNum, OptVal} | OptionList], LastNum, Acc, Payload) ->
NumDiff = OptNum - LastNum,
{Delta, ExtNum} = if
NumDiff >= 269 ->
{14, <<(NumDiff - 269):16>>};
OptNum - LastNum >= 13 ->
{13, <<(NumDiff - 13)>>};
true ->
{NumDiff, <<>>}
end,
Binaryize = byte_size(OptVal),
{Len, ExtLen} = if
Binaryize >= 269 ->
{14, <<(Binaryize - 269):16>>};
Binaryize >= 13 ->
{13, <<(Binaryize - 13)>>};
true ->
{Binaryize, <<>>}
end,
Acc2 = <<Acc/binary, Delta:4, Len:4, ExtNum/binary, ExtLen/binary, OptVal/binary>>,
encode_option_list(OptionList, OptNum, Acc2, Payload);
encode_option_list([], _LastNum, Acc, <<>>) ->
Acc;
encode_option_list([], _, Acc, Payload) ->
<<Acc/binary, 16#FF, Payload/binary>>.
%% RFC 7252
encode_option(if_match, OptVal) -> {?OPTION_IF_MATCH, OptVal};
encode_option(uri_host, OptVal) -> {?OPTION_URI_HOST, OptVal};
encode_option(etag, OptVal) -> {?OPTION_ETAG, OptVal};
encode_option(if_none_match, true) -> {?OPTION_IF_NONE_MATCH, <<>>};
encode_option(uri_port, OptVal) -> {?OPTION_URI_PORT, binary:encode_unsigned(OptVal)};
encode_option(location_path, OptVal) -> {?OPTION_LOCATION_PATH, OptVal};
encode_option(uri_path, OptVal) -> {?OPTION_URI_PATH, OptVal};
encode_option(content_format, OptVal) when is_integer(OptVal) ->
{?OPTION_CONTENT_FORMAT, binary:encode_unsigned(OptVal)};
encode_option(content_format, OptVal) ->
Num = content_format_to_code(OptVal),
{?OPTION_CONTENT_FORMAT, binary:encode_unsigned(Num)};
encode_option(max_age, OptVal) -> {?OPTION_MAX_AGE, binary:encode_unsigned(OptVal)};
encode_option(uri_query, OptVal) -> {?OPTION_URI_QUERY, OptVal};
encode_option('accept', OptVal) -> {?OPTION_ACCEPT, binary:encode_unsigned(OptVal)};
encode_option(location_query, OptVal) -> {?OPTION_LOCATION_QUERY, OptVal};
encode_option(proxy_uri, OptVal) -> {?OPTION_PROXY_URI, OptVal};
encode_option(proxy_scheme, OptVal) -> {?OPTION_PROXY_SCHEME, OptVal};
encode_option(size1, OptVal) -> {?OPTION_SIZE1, binary:encode_unsigned(OptVal)};
%% draft-ietf-ore-observe-16
encode_option(observe, OptVal) -> {?OPTION_OBSERVE, binary:encode_unsigned(OptVal)};
%% draft-ietf-ore-block-17
encode_option(block2, OptVal) -> {?OPTION_BLOCK2, encode_block(OptVal)};
encode_option(block1, OptVal) -> {?OPTION_BLOCK1, encode_block(OptVal)};
%% unknown opton
encode_option(Option, Value) ->
erlang:throw({bad_option, Option, Value}).
encode_block({Num, More, Size}) ->
encode_block1(Num,
if More -> 1; true -> 0 end,
trunc(math:log2(Size))-4).
encode_block1(Num, M, SizEx) when Num < 16 ->
<<Num:4, M:1, SizEx:3>>;
encode_block1(Num, M, SizEx) when Num < 4096 ->
<<Num:12, M:1, SizEx:3>>;
encode_block1(Num, M, SizEx) ->
<<Num:28, M:1, SizEx:3>>.
-spec content_format_to_code(binary()) -> non_neg_integer().
content_format_to_code(<<"text/plain">>) -> 0;
content_format_to_code(<<"application/link-format">>) -> 40;
content_format_to_code(<<"application/xml">>) ->41;
content_format_to_code(<<"application/octet-stream">>) -> 42;
content_format_to_code(<<"application/exi">>) -> 47;
content_format_to_code(<<"application/json">>) -> 50;
content_format_to_code(<<"application/cbor">>) -> 60;
content_format_to_code(_) -> 42. %% use octet-stream as default
method_to_class_code(get) -> {0, 01};
method_to_class_code(post) -> {0, 02};
method_to_class_code(put) -> {0, 03};
method_to_class_code(delete) -> {0, 04};
method_to_class_code({ok, created}) -> {2, 01};
method_to_class_code({ok, deleted}) -> {2, 02};
method_to_class_code({ok, valid}) -> {2, 03};
method_to_class_code({ok, changed}) -> {2, 04};
method_to_class_code({ok, content}) -> {2, 05};
method_to_class_code({ok, nocontent}) -> {2, 07};
method_to_class_code({ok, continue}) -> {2, 31};
method_to_class_code({error, bad_request}) -> {4, 00};
method_to_class_code({error, unauthorized}) -> {4, 01};
method_to_class_code({error, bad_option}) -> {4, 02};
method_to_class_code({error, forbidden}) -> {4, 03};
method_to_class_code({error, not_found}) -> {4, 04};
method_to_class_code({error, method_not_allowed}) -> {4, 05};
method_to_class_code({error, not_acceptable}) -> {4, 06};
method_to_class_code({error, request_entity_incomplete}) -> {4, 08};
method_to_class_code({error, precondition_failed}) -> {4, 12};
method_to_class_code({error, request_entity_too_large}) -> {4, 13};
method_to_class_code({error, unsupported_content_format}) -> {4, 15};
method_to_class_code({error, internal_server_error}) -> {5, 00};
method_to_class_code({error, not_implemented}) -> {5, 01};
method_to_class_code({error, bad_gateway}) -> {5, 02};
method_to_class_code({error, service_unavailable}) -> {5, 03};
method_to_class_code({error, gateway_timeout}) -> {5, 04};
method_to_class_code({error, proxying_not_supported}) -> {5, 05};
method_to_class_code(Method) ->
erlang:throw({bad_method, Method}).
%%%===================================================================
%%% parse
%%%===================================================================
parse(<<?VERSION:2, Type:2, 0:4, 0:3, 0:5, MsgId:16>>, ParseState) ->
{ok,
#coap_message{ type = decode_type(Type)
, id = MsgId},
<<>>,
ParseState};
parse(<<?VERSION:2, Type:2, TKL:4, Class:3, Code:5, MsgId:16, Token:TKL/binary, Tail/binary>>,
ParseState) ->
{Options, Payload} = decode_option_list(Tail),
Options2 = maps:fold(fun(K, V, Acc) ->
case is_repeatable_option(K) of
true ->
Acc#{K => lists:reverse(V)};
_ ->
Acc#{K => V}
end
end,
#{},
Options),
{ok,
#coap_message{ type = decode_type(Type)
, method = class_code_to_method({Class, Code})
, id = MsgId
, token = Token
, options = Options2
, payload = Payload
},
<<>>,
ParseState}.
-spec decode_type(X) -> message_type()
when X :: 0 .. 3.
decode_type(0) -> con;
decode_type(1) -> non;
decode_type(2) -> ack;
decode_type(3) -> reset.
-spec decode_option_list(binary()) -> {message_options(), binary()}.
decode_option_list(Bin) ->
decode_option_list(Bin, 0, #{}).
decode_option_list(<<>>, _OptNum, OptMap) ->
{OptMap, <<>>};
decode_option_list(<<16#FF, Payload/binary>>, _OptNum, OptMap) ->
{OptMap, Payload};
decode_option_list(<<Delta:4, Len:4, Bin/binary>>, OptNum, OptMap) ->
case Delta of
Any when Any < 13 ->
decode_option_len(Bin, OptNum + Delta, Len, OptMap);
13 ->
<<ExtOptNum, NewBin/binary>> = Bin,
decode_option_len(NewBin, OptNum + ExtOptNum + 13, Len, OptMap);
14 ->
<<ExtOptNum:16, NewBin/binary>> = Bin,
decode_option_len(NewBin, OptNum + ExtOptNum + 269, Len, OptMap)
end.
decode_option_len(<<Bin/binary>>, OptNum, Len, OptMap) ->
case Len of
Any when Any < 13 ->
decode_option_value(Bin, OptNum, Len, OptMap);
13 ->
<<ExtOptLen, NewBin/binary>> = Bin,
decode_option_value(NewBin, OptNum, ExtOptLen + 13, OptMap);
14 ->
<<ExtOptLen:16, NewBin/binary>> = Bin,
decode_option_value(NewBin, OptNum, ExtOptLen + 269, OptMap)
end.
decode_option_value(<<Bin/binary>>, OptNum, OptLen, OptMap) ->
case Bin of
<<OptVal:OptLen/binary, NewBin/binary>> ->
decode_option_list(NewBin, OptNum, append_option(OptNum, OptVal, OptMap));
<<>> ->
decode_option_list(<<>>, OptNum, append_option(OptNum, <<>>, OptMap))
end.
append_option(OptNum, RawOptVal, OptMap) ->
{OptId, OptVal} = decode_option(OptNum, RawOptVal),
case is_repeatable_option(OptId) of
false ->
OptMap#{OptId => OptVal};
_ ->
case maps:get(OptId, OptMap, undefined) of
undefined ->
OptMap#{OptId => [OptVal]};
OptVals ->
OptMap#{OptId => [OptVal | OptVals]}
end
end.
%% RFC 7252
decode_option(?OPTION_IF_MATCH, OptVal) -> {if_match, OptVal};
decode_option(?OPTION_URI_HOST, OptVal) -> {uri_host, OptVal};
decode_option(?OPTION_ETAG, OptVal) -> {etag, OptVal};
decode_option(?OPTION_IF_NONE_MATCH, <<>>) -> {if_none_match, true};
decode_option(?OPTION_URI_PORT, OptVal) -> {uri_port, binary:decode_unsigned(OptVal)};
decode_option(?OPTION_LOCATION_PATH, OptVal) -> {location_path, OptVal};
decode_option(?OPTION_URI_PATH, OptVal) -> {uri_path, OptVal};
decode_option(?OPTION_CONTENT_FORMAT, OptVal) ->
Num = binary:decode_unsigned(OptVal),
{content_format, content_code_to_format(Num)};
decode_option(?OPTION_MAX_AGE, OptVal) -> {max_age, binary:decode_unsigned(OptVal)};
decode_option(?OPTION_URI_QUERY, OptVal) -> {uri_query, OptVal};
decode_option(?OPTION_ACCEPT, OptVal) -> {'accept', binary:decode_unsigned(OptVal)};
decode_option(?OPTION_LOCATION_QUERY, OptVal) -> {location_query, OptVal};
decode_option(?OPTION_PROXY_URI, OptVal) -> {proxy_uri, OptVal};
decode_option(?OPTION_PROXY_SCHEME, OptVal) -> {proxy_scheme, OptVal};
decode_option(?OPTION_SIZE1, OptVal) -> {size1, binary:decode_unsigned(OptVal)};
%% draft-ietf-core-observe-16
decode_option(?OPTION_OBSERVE, OptVal) -> {observe, binary:decode_unsigned(OptVal)};
%% draft-ietf-core-block-17
decode_option(?OPTION_BLOCK2, OptVal) -> {block2, decode_block(OptVal)};
decode_option(?OPTION_BLOCK1, OptVal) -> {block1, decode_block(OptVal)};
%% unknown option
decode_option(OptNum, OptVal) -> {OptNum, OptVal}.
decode_block(<<Num:4, M:1, SizEx:3>>) -> decode_block1(Num, M, SizEx);
decode_block(<<Num:12, M:1, SizEx:3>>) -> decode_block1(Num, M, SizEx);
decode_block(<<Num:28, M:1, SizEx:3>>) -> decode_block1(Num, M, SizEx).
decode_block1(Num, M, SizEx) ->
{Num, M =/= 0, trunc(math:pow(2, SizEx + 4))}.
-spec content_code_to_format(non_neg_integer()) -> binary().
content_code_to_format(0) -> <<"text/plain">>;
content_code_to_format(40) -> <<"application/link-format">>;
content_code_to_format(41) -> <<"application/xml">>;
content_code_to_format(42) -> <<"application/octet-stream">>;
content_code_to_format(47) -> <<"application/exi">>;
content_code_to_format(50) -> <<"application/json">>;
content_code_to_format(60) -> <<"application/cbor">>;
content_code_to_format(_) -> <<"application/octet-stream">>. %% use octet as default
%% RFC 7252
%% atom indicate a request
class_code_to_method({0, 01}) -> get;
class_code_to_method({0, 02}) -> post;
class_code_to_method({0, 03}) -> put;
class_code_to_method({0, 04}) -> delete;
%% success is a tuple {ok, ...}
class_code_to_method({2, 01}) -> {ok, created};
class_code_to_method({2, 02}) -> {ok, deleted};
class_code_to_method({2, 03}) -> {ok, valid};
class_code_to_method({2, 04}) -> {ok, changed};
class_code_to_method({2, 05}) -> {ok, content};
class_code_to_method({2, 07}) -> {ok, nocontent};
class_code_to_method({2, 31}) -> {ok, continue}; % block
%% error is a tuple {error, ...}
class_code_to_method({4, 00}) -> {error, bad_request};
class_code_to_method({4, 01}) -> {error, unauthorized};
class_code_to_method({4, 02}) -> {error, bad_option};
class_code_to_method({4, 03}) -> {error, forbidden};
class_code_to_method({4, 04}) -> {error, not_found};
class_code_to_method({4, 05}) -> {error, method_not_allowed};
class_code_to_method({4, 06}) -> {error, not_acceptable};
class_code_to_method({4, 08}) -> {error, request_entity_incomplete}; % block
class_code_to_method({4, 12}) -> {error, precondition_failed};
class_code_to_method({4, 13}) -> {error, request_entity_too_large};
class_code_to_method({4, 15}) -> {error, unsupported_content_format};
class_code_to_method({5, 00}) -> {error, internal_server_error};
class_code_to_method({5, 01}) -> {error, not_implemented};
class_code_to_method({5, 02}) -> {error, bad_gateway};
class_code_to_method({5, 03}) -> {error, service_unavailable};
class_code_to_method({5, 04}) -> {error, gateway_timeout};
class_code_to_method({5, 05}) -> {error, proxying_not_supported};
class_code_to_method(_) -> undefined.
format(Msg) ->
io_lib:format("~p", [Msg]).
type(_) ->
coap.
is_message(#coap_message{}) ->
true;
is_message(_) ->
false.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec is_repeatable_option(message_option_name()) -> boolean().
is_repeatable_option(if_match) -> true;
is_repeatable_option(etag) -> true;
is_repeatable_option(location_path) -> true;
is_repeatable_option(uri_path) -> true;
is_repeatable_option(uri_query) -> true;
is_repeatable_option(location_query) -> true;
is_repeatable_option(_) -> false.

View File

@ -0,0 +1,156 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_impl).
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
-behavior(emqx_gateway_impl).
%% APIs
-export([ load/0
, unload/0
]).
-export([ init/1
, on_insta_create/3
, on_insta_update/4
, on_insta_destroy/3
]).
-dialyzer({nowarn_function, [load/0]}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
load() ->
RegistryOptions = [ {cbkmod, ?MODULE}
],
Options = [],
emqx_gateway_registry:load(coap, RegistryOptions, Options).
unload() ->
emqx_gateway_registry:unload(coap).
init([]) ->
GwState = #{},
{ok, GwState}.
%%--------------------------------------------------------------------
%% emqx_gateway_registry callbacks
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{id := InstaId,
rawconf := #{resource := Resource} = RawConf
}, Ctx, _GwState) ->
ResourceMod = get_resource_mod(Resource),
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, ResourceMod, Lis)
end, Listeners),
{ok, ResCtx} = ResourceMod:init(RawConf),
{ok, ListenerPids, #{ctx => Ctx,
res_ctx => ResCtx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
try
%% XXX: 1. How hot-upgrade the changes ???
%% XXX: 2. Check the New confs first before destroy old instance ???
on_insta_destroy(OldInsta, GwInstaState, GwState),
on_insta_create(NewInsta, Ctx, GwState)
catch
Class : Reason : Stk ->
logger:error("Failed to update coap instance ~s; "
"reason: {~0p, ~0p} stacktrace: ~0p",
[InstaId, Class, Reason, Stk]),
{error, {Class, Reason}}
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := #{resource := Resource} = RawConf
},
#{res_ctx := ResCtx} = _GwInstaState,
_GWState) ->
ResourceMod = get_resource_mod(Resource),
ok = ResourceMod:stop(ResCtx),
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
end, Listeners).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, ResourceMod, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
Cfg2 = Cfg#{resource => ResourceMod},
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg2) of
{ok, Pid} ->
io:format("Start coap ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
Pid;
{error, Reason} ->
io:format(standard_error,
"Failed to start coap ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]),
throw({badconf, Reason})
end.
start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) ->
Name = name(InstaId, Type),
NCfg = Cfg#{
ctx => Ctx,
frame_mod => emqx_coap_frame,
chann_mod => emqx_coap_channel
},
MFA = {emqx_gateway_conn, start_link, [NCfg]},
do_start_listener(Type, Name, ListenOn, SocketOpts, MFA).
do_start_listener(udp, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_udp(Name, ListenOn, SocketOpts, MFA);
do_start_listener(dtls, Name, ListenOn, SocketOpts, MFA) ->
esockd:open_dtls(Name, ListenOn, SocketOpts, MFA).
name(InstaId, Type) ->
list_to_atom(lists:concat([InstaId, ":", Type])).
stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
StopRet = stop_listener(InstaId, Type, ListenOn, SocketOpts, Cfg),
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
case StopRet of
ok -> io:format("Stop coap ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]);
{error, Reason} ->
io:format(standard_error,
"Failed to stop coap ~s:~s listener on ~s: ~0p~n",
[InstaId, Type, ListenOnStr, Reason]
)
end,
StopRet.
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
esockd:close(Name, ListenOn).
get_resource_mod(mqtt) ->
emqx_coap_mqtt_resource;
get_resource_mod(pubsub) ->
emqx_coap_pubsub_resource.

View File

@ -0,0 +1,146 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Copyright (c) 2015 Petr Gotthard <petr.gotthard@centrum.cz>
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% convenience functions for message construction
-module(emqx_coap_message).
-export([request/2, request/3, request/4, ack/1, response/1, response/2, response/3]).
-export([set/3, set_payload/2, get_content/1, set_content/2, set_content/3, get_option/2]).
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
request(Type, Method) ->
request(Type, Method, <<>>, []).
request(Type, Method, Payload) ->
request(Type, Method, Payload, []).
request(Type, Method, Payload, Options) when is_binary(Payload) ->
#coap_message{type = Type, method = Method, payload = Payload, options = Options};
request(Type, Method, Content=#coap_content{}, Options) ->
set_content(Content,
#coap_message{type = Type, method = Method, options = Options}).
ack(Request = #coap_message{}) ->
#coap_message{type = ack,
id = Request#coap_message.id}.
response(#coap_message{type = Type,
id = Id,
token = Token}) ->
#coap_message{type = Type,
id = Id,
token = Token}.
response(Method, Request) ->
set_method(Method, response(Request)).
response(Method, Payload, Request) ->
set_method(Method,
set_payload(Payload,
response(Request))).
%% omit option for its default value
set(max_age, ?DEFAULT_MAX_AGE, Msg) -> Msg;
%% set non-default value
set(Option, Value, Msg = #coap_message{options = Options}) ->
Msg#coap_message{options = Options#{Option => Value}}.
get_option(Option, #coap_message{options = Options}) ->
maps:get(Option, Options, undefined).
set_method(Method, Msg) ->
Msg#coap_message{method = Method}.
set_payload(Payload = #coap_content{}, Msg) ->
set_content(Payload, undefined, Msg);
set_payload(Payload, Msg) when is_binary(Payload) ->
Msg#coap_message{payload = Payload};
set_payload(Payload, Msg) when is_list(Payload) ->
Msg#coap_message{payload = list_to_binary(Payload)}.
get_content(#coap_message{options = Options, payload = Payload}) ->
#coap_content{etag = maps:get(etag, Options, undefined),
max_age = maps:get(max_age, Options, ?DEFAULT_MAX_AGE),
format = maps:get(content_format, Options, undefined),
location_path = maps:get(location_path, Options, []),
payload = Payload}.
set_content(Content, Msg) ->
set_content(Content, undefined, Msg).
%% segmentation not requested and not required
set_content(#coap_content{etag = ETag,
max_age = MaxAge,
format = Format,
location_path = LocPath,
payload = Payload},
undefined,
Msg)
when byte_size(Payload) =< ?MAX_BLOCK_SIZE ->
#coap_message{options = Options} = Msg2 = set_payload(Payload, Msg),
Options2 = Options#{etag => [ETag],
max_age => MaxAge,
content_format => Format,
location_path => LocPath},
Msg2#coap_message{options = Options2};
%% segmentation not requested, but required (late negotiation)
set_content(Content, undefined, Msg) ->
set_content(Content, {0, true, ?MAX_BLOCK_SIZE}, Msg);
%% segmentation requested (early negotiation)
set_content(#coap_content{etag = ETag,
max_age = MaxAge,
format = Format,
payload = Payload},
Block,
Msg) ->
#coap_message{options = Options} = Msg2 = set_payload_block(Payload, Block, Msg),
Options2 = Options#{etag => [ETag],
max => MaxAge,
content_format => Format},
Msg2#coap_message{options = Options2}.
set_payload_block(Content, Block, Msg = #coap_message{method = Method}) when is_atom(Method) ->
set_payload_block(Content, block1, Block, Msg);
set_payload_block(Content, Block, Msg = #coap_message{}) ->
set_payload_block(Content, block2, Block, Msg).
set_payload_block(Content, BlockId, {Num, _, Size}, Msg) ->
ContentSize = erlang:byte_size(Content),
OffsetBegin = Size * Num,
OffsetEnd = OffsetBegin + Size,
case ContentSize > OffsetEnd of
true ->
set(BlockId, {Num, true, Size},
set_payload(binary:part(Content, OffsetBegin, Size), Msg));
_ ->
set(BlockId, {Num, false, Size},
set_payload(binary:part(Content, OffsetBegin, ContentSize - OffsetBegin), Msg))
end.

View File

@ -1,387 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_mqtt_adapter).
-behaviour(gen_server).
-include("src/coap/include/emqx_coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-logger_header("[CoAP-Adpter]").
%% API.
-export([ subscribe/2
, unsubscribe/2
, publish/3
]).
-export([ client_pid/4
, stop/1
]).
-export([ call/2
, call/3
]).
%% gen_server.
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {peername, clientid, username, password, sub_topics = [], connected_at}).
-define(ALIVE_INTERVAL, 20000).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
client_pid(undefined, _Username, _Password, _Channel) ->
{error, bad_request};
client_pid(ClientId, Username, Password, Channel) ->
% check authority
case start(ClientId, Username, Password, Channel) of
{ok, Pid1} -> {ok, Pid1};
{error, {already_started, Pid2}} -> {ok, Pid2};
{error, auth_failure} -> {error, auth_failure};
Other -> {error, Other}
end.
start(ClientId, Username, Password, Channel) ->
% DO NOT use start_link, since multiple coap_reponsder may have relation with one mqtt adapter,
% one coap_responder crashes should not make mqtt adapter crash too
% And coap_responder is not a system process
% it is dangerous to link mqtt adapter to coap_responder
gen_server:start({via, emqx_coap_registry, {ClientId, Username, Password}},
?MODULE, {ClientId, Username, Password, Channel}, []).
stop(Pid) ->
gen_server:stop(Pid).
subscribe(Pid, Topic) ->
gen_server:call(Pid, {subscribe, Topic, self()}).
unsubscribe(Pid, Topic) ->
gen_server:call(Pid, {unsubscribe, Topic, self()}).
publish(Pid, Topic, Payload) ->
gen_server:call(Pid, {publish, Topic, Payload}).
%% For emqx_management plugin
call(Pid, Msg) ->
call(Pid, Msg, infinity).
call(Pid, Msg, _) ->
Pid ! Msg, ok.
%%--------------------------------------------------------------------
%% gen_server Callbacks
%%--------------------------------------------------------------------
init({ClientId, Username, Password, Channel}) ->
?LOG(debug, "try to start adapter ClientId=~p, Username=~p, "
"Channel=~0p", [ClientId, Username, Channel]),
State0 = #state{peername = Channel,
clientid = ClientId,
username = Username,
password = Password},
_ = run_hooks('client.connect', [conninfo(State0)], undefined),
case emqx_access_control:authenticate(clientinfo(State0)) of
ok ->
ok = emqx_cm:discard_session(ClientId),
_ = run_hooks('client.connack', [conninfo(State0), success], undefined),
State = State0#state{connected_at = erlang:system_time(millisecond)},
run_hooks('client.connected', [clientinfo(State), conninfo(State)]),
Self = self(),
erlang:send_after(?ALIVE_INTERVAL, Self, check_alive),
_ = emqx_cm_locker:trans(ClientId, fun(_) ->
emqx_cm:register_channel(ClientId, Self, conninfo(State))
end),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
{ok, State};
{error, Reason} ->
?LOG(debug, "authentication faild: ~p", [Reason]),
_ = run_hooks('client.connack', [conninfo(State0), not_authorized], undefined),
{stop, {shutdown, Reason}}
end.
handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
NewTopics = proplists:delete(Topic, TopicList),
IsWild = emqx_topic:wildcard(Topic),
{reply, chann_subscribe(Topic, State), State#state{sub_topics =
[{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
NewTopics = proplists:delete(Topic, TopicList),
chann_unsubscribe(Topic, State),
{reply, ok, State#state{sub_topics = NewTopics}, hibernate};
handle_call({publish, Topic, Payload}, _From, State) ->
{reply, chann_publish(Topic, Payload, State), State};
handle_call(info, _From, State) ->
{reply, info(State), State};
handle_call(stats, _From, State) ->
{reply, stats(State), State, hibernate};
handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State};
handle_call({set_rate_limit, _Rl}, _From, State) ->
?LOG(error, "set_rate_limit is not support", []),
{reply, ok, State};
handle_call(get_rate_limit, _From, State) ->
?LOG(error, "get_rate_limit is not support", []),
{reply, ok, State};
handle_call(Request, _From, State) ->
?LOG(error, "adapter unexpected call ~p", [Request]),
{reply, ignored, State, hibernate}.
handle_cast(Msg, State) ->
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
{noreply, State, hibernate}.
handle_info({deliver, _Topic, #message{topic = Topic, payload = Payload}},
State = #state{sub_topics = Subscribers}) ->
deliver([{Topic, Payload}], Subscribers),
{noreply, State, hibernate};
handle_info(check_alive, State = #state{sub_topics = []}) ->
{stop, {shutdown, check_alive}, State};
handle_info(check_alive, State) ->
erlang:send_after(?ALIVE_INTERVAL, self(), check_alive),
{noreply, State, hibernate};
handle_info({shutdown, Error}, State) ->
{stop, {shutdown, Error}, State};
handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
{stop, {shutdown, conflict}, State};
handle_info(discard, State) ->
?LOG(warning, "the connection is discarded. " ++
"possibly there is another client with the same clientid", []),
{stop, {shutdown, discarded}, State};
handle_info(kick, State) ->
?LOG(info, "Kicked", []),
{stop, {shutdown, kick}, State};
handle_info(Info, State) ->
?LOG(error, "adapter unexpected info ~p", [Info]),
{noreply, State, hibernate}.
terminate(Reason, State = #state{clientid = ClientId, sub_topics = SubTopics}) ->
?LOG(debug, "unsubscribe ~p while exiting for ~p", [SubTopics, Reason]),
[chann_unsubscribe(Topic, State) || {Topic, _} <- SubTopics],
emqx_cm:unregister_channel(ClientId),
ConnInfo0 = conninfo(State),
ConnInfo = ConnInfo0#{disconnected_at => erlang:system_time(millisecond)},
run_hooks('client.disconnected', [clientinfo(State), Reason, ConnInfo]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Channel adapter functions
chann_subscribe(Topic, State = #state{clientid = ClientId}) ->
?LOG(debug, "subscribe Topic=~p", [Topic]),
case emqx_access_control:authorize(clientinfo(State), subscribe, Topic) of
allow ->
emqx_broker:subscribe(Topic, ClientId, ?SUBOPTS),
emqx_hooks:run('session.subscribed', [clientinfo(State), Topic, ?SUBOPTS]),
ok;
deny ->
?LOG(warning, "subscribe to ~p by clientid ~p failed due to authz check.",
[Topic, ClientId]),
{error, forbidden}
end.
chann_unsubscribe(Topic, State) ->
?LOG(debug, "unsubscribe Topic=~p", [Topic]),
Opts = #{rh => 0, rap => 0, nl => 0, qos => 0},
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [clientinfo(State), Topic, Opts]).
chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
?LOG(debug, "publish Topic=~p, Payload=~p", [Topic, Payload]),
case emqx_access_control:authorize(clientinfo(State), publish, Topic) of
allow ->
_ = emqx_broker:publish(
emqx_message:set_flag(retain, false,
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
ok;
deny ->
?LOG(warning, "publish to ~p by clientid ~p failed due to authz check.",
[Topic, ClientId]),
{error, forbidden}
end.
%%--------------------------------------------------------------------
%% Deliver
deliver([], _) -> ok;
deliver([Pub | More], Subscribers) ->
ok = do_deliver(Pub, Subscribers),
deliver(More, Subscribers).
do_deliver({Topic, Payload}, Subscribers) ->
%% handle PUBLISH packet from broker
?LOG(debug, "deliver message from broker Topic=~p, Payload=~p", [Topic, Payload]),
deliver_to_coap(Topic, Payload, Subscribers),
ok.
deliver_to_coap(_TopicName, _Payload, []) ->
ok;
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
Matched = case IsWild of
true -> emqx_topic:match(TopicName, TopicFilter);
false -> TopicName =:= TopicFilter
end,
%?LOG(debug, "deliver_to_coap Matched=~p, CoapPid=~p, TopicName=~p, Payload=~p, T=~p",
% [Matched, CoapPid, TopicName, Payload, T]),
Matched andalso (CoapPid ! {dispatch, TopicName, Payload}),
deliver_to_coap(TopicName, Payload, T).
%%--------------------------------------------------------------------
%% Helper funcs
-compile({inline, [run_hooks/2, run_hooks/3]}).
run_hooks(Name, Args) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
run_hooks(Name, Args, Acc) ->
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
%%--------------------------------------------------------------------
%% Info & Stats
info(State) ->
ChannInfo = chann_info(State),
ChannInfo#{sockinfo => sockinfo(State)}.
%% copies from emqx_connection:info/1
sockinfo(#state{peername = Peername}) ->
#{socktype => udp,
peername => Peername,
sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock?
sockstate => running,
active_n => 1
}.
%% copies from emqx_channel:info/1
chann_info(State) ->
#{conninfo => conninfo(State),
conn_state => connected,
clientinfo => clientinfo(State),
session => maps:from_list(session_info(State)),
will_msg => undefined
}.
conninfo(#state{peername = Peername,
clientid = ClientId,
connected_at = ConnectedAt}) ->
#{socktype => udp,
sockname => {{127, 0, 0, 1}, 5683},
peername => Peername,
peercert => nossl, %% TODO: dtls
conn_mod => ?MODULE,
proto_name => <<"CoAP">>,
proto_ver => 1,
clean_start => true,
clientid => ClientId,
username => undefined,
conn_props => undefined,
connected => true,
connected_at => ConnectedAt,
keepalive => 0,
receive_maximum => 0,
expiry_interval => 0
}.
%% copies from emqx_session:info/1
session_info(#state{sub_topics = SubTopics, connected_at = ConnectedAt}) ->
Subs = lists:foldl(
fun({Topic, _}, Acc) ->
Acc#{Topic => ?SUBOPTS}
end, #{}, SubTopics),
[{subscriptions, Subs},
{upgrade_qos, false},
{retry_interval, 0},
{await_rel_timeout, 0},
{created_at, ConnectedAt}
].
%% The stats keys copied from emqx_connection:stats/1
stats(#state{sub_topics = SubTopics}) ->
SockStats = [{recv_oct, 0}, {recv_cnt, 0}, {send_oct, 0}, {send_cnt, 0}, {send_pend, 0}],
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = [{subscriptions_cnt, length(SubTopics)},
{subscriptions_max, length(SubTopics)},
{inflight_cnt, 0},
{inflight_max, 0},
{mqueue_len, 0},
{mqueue_max, 0},
{mqueue_dropped, 0},
{next_pkt_id, 0},
{awaiting_rel_cnt, 0},
{awaiting_rel_max, 0}
],
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
clientinfo(#state{peername = {PeerHost, _},
clientid = ClientId,
username = Username,
password = Password}) ->
#{zone => default,
listener => mqtt_tcp, %% FIXME: this won't work
protocol => coap,
peerhost => PeerHost,
sockport => 5683, %% FIXME:
clientid => ClientId,
username => Username,
password => Password,
peercert => nossl,
is_bridge => false,
is_superuser => false,
mountpoint => undefined,
ws_cookie => undefined
}.

View File

@ -0,0 +1,92 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_observe_res).
%% API
-export([ new/0, insert/3, remove/2
, res_changed/2, foreach/2]).
-export_type([manager/0]).
-define(MAX_SEQ_ID, 16777215).
-type topic() :: binary().
-type token() :: binary().
-type seq_id() :: 0 .. ?MAX_SEQ_ID.
-type res() :: #{ token := token()
, seq_id := seq_id()
}.
-type manager() :: #{topic => res()}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new() -> manager().
new() ->
#{}.
-spec insert(manager(), topic(), token()) -> manager().
insert(Manager, Topic, Token) ->
case maps:get(Topic, Manager, undefined) of
undefined ->
Manager#{Topic => new_res(Token)};
_ ->
Manager
end.
-spec remove(manager(), topic()) -> manager().
remove(Manager, Topic) ->
maps:remove(Topic, Manager).
-spec res_changed(manager(), topic()) -> undefined | {token(), seq_id(), manager()}.
res_changed(Manager, Topic) ->
case maps:get(Topic, Manager, undefined) of
undefined ->
undefined;
Res ->
#{token := Token,
seq_id := SeqId} = Res2 = res_changed(Res),
{Token, SeqId, Manager#{Topic := Res2}}
end.
foreach(F, Manager) ->
maps:fold(fun(K, V, _) ->
F(K, V)
end,
ok,
Manager),
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec new_res(token()) -> res().
new_res(Token) ->
#{token => Token,
seq_id => 0}.
-spec res_changed(res()) -> res().
res_changed(#{seq_id := SeqId} = Res) ->
NewSeqId = SeqId + 1,
NewSeqId2 =
case NewSeqId > ?MAX_SEQ_ID of
true ->
1;
_ ->
NewSeqId
end,
Res#{seq_id := NewSeqId2}.

View File

@ -1,322 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_pubsub_resource).
% -behaviour(coap_resource).
-include("src/coap/include/emqx_coap.hrl").
-include_lib("gen_coap/include/coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[CoAP-PS-RES]").
-export([ coap_discover/2
, coap_get/5
, coap_post/4
, coap_put/4
, coap_delete/3
, coap_observe/5
, coap_unobserve/1
, handle_info/2
, coap_ack/2
]).
-ifdef(TEST).
-export([topic/1]).
-endif.
-define(PS_PREFIX, [<<"ps">>]).
%%--------------------------------------------------------------------
%% Resource Callbacks
%%--------------------------------------------------------------------
coap_discover(_Prefix, _Args) ->
[{absolute, [<<"ps">>], []}].
coap_get(ChId, ?PS_PREFIX, TopicPath, Query, Content=#coap_content{format = Format}) when TopicPath =/= [] ->
Topic = topic(TopicPath),
?LOG(debug, "coap_get() Topic=~p, Query=~p~n", [Topic, Query]),
#coap_mqtt_auth{clientid = Clientid, username = Usr, password = Passwd} = get_auth(Query),
case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
{ok, Pid} ->
put(mqtt_client_pid, Pid),
case Format of
<<"application/link-format">> ->
Content;
_Other ->
%% READ the topic info
read_last_publish_message(emqx_topic:wildcard(Topic), Topic, Content)
end;
{error, auth_failure} ->
put(mqtt_client_pid, undefined),
{error, unauthorized};
{error, bad_request} ->
put(mqtt_client_pid, undefined),
{error, bad_request};
{error, _Other} ->
put(mqtt_client_pid, undefined),
{error, internal_server_error}
end;
coap_get(ChId, Prefix, TopicPath, Query, _Content) ->
?LOG(error, "ignore bad get request ChId=~p, Prefix=~p, TopicPath=~p, Query=~p", [ChId, Prefix, TopicPath, Query]),
{error, bad_request}.
coap_post(_ChId, ?PS_PREFIX, TopicPath, #coap_content{format = Format, payload = Payload, max_age = MaxAge}) when TopicPath =/= [] ->
Topic = topic(TopicPath),
?LOG(debug, "coap_post() Topic=~p, MaxAge=~p, Format=~p~n", [Topic, MaxAge, Format]),
case Format of
%% We treat ct of "application/link-format" as CREATE message
<<"application/link-format">> ->
handle_received_create(Topic, MaxAge, Payload);
%% We treat ct of other values as PUBLISH message
Other ->
?LOG(debug, "coap_post() receive payload format=~p, will process as PUBLISH~n", [Format]),
handle_received_publish(Topic, MaxAge, Other, Payload)
end;
coap_post(_ChId, _Prefix, _TopicPath, _Content) ->
{error, method_not_allowed}.
coap_put(_ChId, ?PS_PREFIX, TopicPath, #coap_content{max_age = MaxAge, format = Format, payload = Payload}) when TopicPath =/= [] ->
Topic = topic(TopicPath),
?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]),
handle_received_publish(Topic, MaxAge, Format, Payload);
coap_put(_ChId, Prefix, TopicPath, Content) ->
?LOG(error, "put has error, Prefix=~p, TopicPath=~p, Content=~p", [Prefix, TopicPath, Content]),
{error, bad_request}.
coap_delete(_ChId, ?PS_PREFIX, TopicPath) ->
delete_topic_info(topic(TopicPath));
coap_delete(_ChId, _Prefix, _TopicPath) ->
{error, method_not_allowed}.
coap_observe(ChId, ?PS_PREFIX, TopicPath, Ack, Content) when TopicPath =/= [] ->
Topic = topic(TopicPath),
?LOG(debug, "observe Topic=~p, Ack=~pContent=~p", [Topic, Ack, Content]),
Pid = get(mqtt_client_pid),
case emqx_coap_mqtt_adapter:subscribe(Pid, Topic) of
ok ->
Code = case emqx_coap_pubsub_topics:is_topic_timeout(Topic) of
true -> nocontent;
false-> content
end,
{ok, {state, ChId, ?PS_PREFIX, [Topic]}, Code, Content};
{error, Code} ->
{error, Code}
end;
coap_observe(ChId, Prefix, TopicPath, Ack, _Content) ->
?LOG(error, "unknown observe request ChId=~p, Prefix=~p, TopicPath=~p, Ack=~p", [ChId, Prefix, TopicPath, Ack]),
{error, bad_request}.
coap_unobserve({state, _ChId, ?PS_PREFIX, TopicPath}) when TopicPath =/= [] ->
Topic = topic(TopicPath),
?LOG(debug, "unobserve ~p", [Topic]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:unsubscribe(Pid, Topic),
ok;
coap_unobserve({state, ChId, Prefix, TopicPath}) ->
?LOG(error, "ignore unknown unobserve request ChId=~p, Prefix=~p, TopicPath=~p", [ChId, Prefix, TopicPath]),
ok.
handle_info({dispatch, Topic, Payload}, State) ->
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload),
?LOG(debug, "Updated publish info of topic=~p, the Ret is ~p", [Topic, Ret]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
?LOG(error, "Unknown Message ~p", [Message]),
{noreply, State}.
coap_ack(_Ref, State) -> {ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
get_auth(Query) ->
get_auth(Query, #coap_mqtt_auth{}).
get_auth([], Auth=#coap_mqtt_auth{}) ->
Auth;
get_auth([<<$c, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{clientid = Rest});
get_auth([<<$u, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{username = Rest});
get_auth([<<$p, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{password = Rest});
get_auth([Param|T], Auth=#coap_mqtt_auth{}) ->
?LOG(error, "ignore unknown parameter ~p", [Param]),
get_auth(T, Auth).
add_topic_info(publish, Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[{_, StoredMaxAge, StoredCT, _, _}] ->
?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
%% check whether the ct value stored matches the ct option in this POST message
case Format =:= StoredCT of
true ->
{ok, Ret} =
case StoredMaxAge =:= MaxAge of
true ->
emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload);
false ->
emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload)
end,
{changed, Ret};
false ->
?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]),
{changed, false}
end;
[] ->
?LOG(debug, "publish topic=~p will be created", [Topic]),
{ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload),
{created, Ret}
end;
add_topic_info(create, Topic, MaxAge, Format, _Payload) when is_binary(Topic), Topic =/= <<>> ->
case emqx_coap_pubsub_topics:is_topic_existed(Topic) of
true ->
%% Whether we should support CREATE to an existed topic is TBD!!
?LOG(debug, "create topic=~p already exists, need reset the topic info", [Topic]),
{ok, Ret} = emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Format, <<>>);
false ->
?LOG(debug, "create topic=~p will be created", [Topic]),
{ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, <<>>)
end,
{created, Ret};
add_topic_info(_, Topic, _MaxAge, _Format, _Payload) ->
?LOG(debug, "create topic=~p info failed", [Topic]),
{badarg, false}.
concatenate_location_path(List = [TopicPart1, TopicPart2, TopicPart3]) when is_binary(TopicPart1), is_binary(TopicPart2), is_binary(TopicPart3) ->
list_to_binary(lists:foldl( fun (Element, AccIn) when Element =/= <<>> ->
AccIn ++ "/" ++ binary_to_list(Element);
(_Element, AccIn) ->
AccIn
end, [], List)).
format_string_to_int(<<"application/octet-stream">>) ->
<<"42">>;
format_string_to_int(<<"application/exi">>) ->
<<"47">>;
format_string_to_int(<<"application/json">>) ->
<<"50">>.
handle_received_publish(Topic, MaxAge, Format, Payload) ->
case add_topic_info(publish, Topic, MaxAge, format_string_to_int(Format), Payload) of
{Ret, true} ->
Pid = get(mqtt_client_pid),
case emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload) of
ok ->
{ok, Ret, case Ret of
changed -> #coap_content{};
created ->
#coap_content{location_path = [
concatenate_location_path([<<"ps">>, Topic, <<>>])]}
end};
{error, Code} ->
{error, Code}
end;
{_, false} ->
?LOG(debug, "add_topic_info failed, will return bad_request", []),
{error, bad_request}
end.
handle_received_create(TopicPrefix, MaxAge, Payload) ->
case core_link:decode(Payload) of
[{rootless, [Topic], [{ct, CT}]}] when is_binary(Topic), Topic =/= <<>> ->
TrueTopic = emqx_http_lib:uri_decode(Topic),
?LOG(debug, "decoded link-format payload, the Topic=~p, CT=~p~n", [TrueTopic, CT]),
LocPath = concatenate_location_path([<<"ps">>, TopicPrefix, TrueTopic]),
FullTopic = binary:part(LocPath, 4, byte_size(LocPath)-4),
?LOG(debug, "the location path is ~p, the full topic is ~p~n", [LocPath, FullTopic]),
case add_topic_info(create, FullTopic, MaxAge, CT, <<>>) of
{_, true} ->
?LOG(debug, "create topic info successfully, will return LocPath=~p", [LocPath]),
{ok, created, #coap_content{location_path = [LocPath]}};
{_, false} ->
?LOG(debug, "create topic info failed, will return bad_request", []),
{error, bad_request}
end;
Other ->
?LOG(debug, "post with bad payload of link-format ~p, will return bad_request", [Other]),
{error, bad_request}
end.
%% When topic is timeout, server should return nocontent here,
%% but gen_coap only receive return value of #coap_content from coap_get, so temporarily we can't give the Code 2.07 {ok, nocontent} out.TBC!!!
return_resource(Topic, Payload, MaxAge, TimeStamp, Content) ->
TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000),
case TimeElapsed < MaxAge of
true ->
LeftTime = (MaxAge - TimeElapsed),
?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]),
Content#coap_content{max_age = LeftTime, payload = Payload};
false ->
?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]),
#coap_content{}
end.
read_last_publish_message(false, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
{error, not_found};
[{_, MaxAge, CT, Payload, TimeStamp}] ->
case CT =:= format_string_to_int(QueryFormat) of
true ->
return_resource(Topic, Payload, MaxAge, TimeStamp, Content);
false ->
?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]),
{error, bad_request}
end
end;
read_last_publish_message(false, Topic, Content) ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
{error, not_found};
[{_, MaxAge, _, Payload, TimeStamp}] ->
return_resource(Topic, Payload, MaxAge, TimeStamp, Content)
end;
read_last_publish_message(true, Topic, _Content) ->
?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]),
{error, bad_request}.
delete_topic_info(Topic) ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
{error, not_found};
[{_, _, _, _, _}] ->
emqx_coap_pubsub_topics:delete_sub_topics(Topic)
end.
topic(Topic) when is_binary(Topic) -> Topic;
topic([]) -> <<>>;
topic([Path | TopicPath]) ->
case topic(TopicPath) of
<<>> -> Path;
RemTopic ->
<<Path/binary, $/, RemTopic/binary>>
end.

View File

@ -1,154 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_registry).
-author("Feng Lee <feng@emqx.io>").
-include("src/coap/include/emqx_coap.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[CoAP-Registry]").
-behaviour(gen_server).
%% API.
-export([ start_link/0
, register_name/2
, unregister_name/1
, whereis_name/1
, send/2
, stop/0
]).
%% gen_server.
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {}).
-define(RESPONSE_TAB, coap_response_process).
-define(RESPONSE_REF_TAB, coap_response_process_ref).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
register_name(Name, Pid) ->
gen_server:call(?MODULE, {register_name, Name, Pid}).
unregister_name(Name) ->
gen_server:call(?MODULE, {unregister_name, Name}).
whereis_name(Name) ->
case ets:lookup(?RESPONSE_TAB, Name) of
[] -> undefined;
[{Name, Pid, _MRef}] -> Pid
end.
send(Name, Msg) ->
case whereis_name(Name) of
undefined ->
exit({badarg, {Name, Msg}});
Pid when is_pid(Pid) ->
Pid ! Msg,
Pid
end.
stop() ->
gen_server:stop(?MODULE).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init([]) ->
_ = ets:new(?RESPONSE_TAB, [set, named_table, protected]),
_ = ets:new(?RESPONSE_REF_TAB, [set, named_table, protected]),
{ok, #state{}}.
handle_call({register_name, Name, Pid}, _From, State) ->
case ets:member(?RESPONSE_TAB, Name) of
false ->
MRef = monitor_client(Pid),
ets:insert(?RESPONSE_TAB, {Name, Pid, MRef}),
ets:insert(?RESPONSE_REF_TAB, {MRef, Name, Pid}),
{reply, yes, State};
true -> {reply, no, State}
end;
handle_call({unregister_name, Name}, _From, State) ->
case ets:lookup(?RESPONSE_TAB, Name) of
[] ->
ok;
[{Name, _Pid, MRef}] ->
erase_monitor(MRef),
ets:delete(?RESPONSE_TAB, Name),
ets:delete(?RESPONSE_REF_TAB, MRef)
end,
{reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ignored, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case ets:lookup(?RESPONSE_REF_TAB, MRef) of
[{MRef, Name, _Pid}] ->
ets:delete(?RESPONSE_TAB, Name),
ets:delete(?RESPONSE_REF_TAB, MRef),
erase_monitor(MRef);
[] ->
?LOG(error, "MRef of client ~p not found", [DownPid])
end,
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ets:delete(?RESPONSE_TAB),
ets:delete(?RESPONSE_REF_TAB),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
monitor_client(Pid) ->
erlang:monitor(process, Pid).
erase_monitor(MRef) ->
catch erlang:demonitor(MRef, [flush]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,122 +16,22 @@
-module(emqx_coap_resource).
% -behaviour(coap_resource).
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-include("src/coap/include/emqx_coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("gen_coap/include/coap.hrl").
-type context() :: any().
-type topic() :: binary().
-type token() :: token().
-logger_header("[CoAP-RES]").
-type register() :: {topic(), token()}
| topic()
| undefined.
-export([ coap_discover/2
, coap_get/5
, coap_post/4
, coap_put/4
, coap_delete/3
, coap_observe/5
, coap_unobserve/1
, handle_info/2
, coap_ack/2
]).
-ifdef(TEST).
-export([topic/1]).
-endif.
-define(MQTT_PREFIX, [<<"mqtt">>]).
% resource operations
coap_discover(_Prefix, _Args) ->
[{absolute, [<<"mqtt">>], []}].
coap_get(ChId, ?MQTT_PREFIX, Path, Query, _Content) ->
?LOG(debug, "coap_get() Path=~p, Query=~p~n", [Path, Query]),
#coap_mqtt_auth{clientid = Clientid, username = Usr, password = Passwd} = get_auth(Query),
case emqx_coap_mqtt_adapter:client_pid(Clientid, Usr, Passwd, ChId) of
{ok, Pid} ->
put(mqtt_client_pid, Pid),
#coap_content{};
{error, auth_failure} ->
put(mqtt_client_pid, undefined),
{error, unauthorized};
{error, bad_request} ->
put(mqtt_client_pid, undefined),
{error, bad_request};
{error, _Other} ->
put(mqtt_client_pid, undefined),
{error, internal_server_error}
end;
coap_get(ChId, Prefix, Path, Query, _Content) ->
?LOG(error, "ignore bad get request ChId=~p, Prefix=~p, Path=~p, Query=~p", [ChId, Prefix, Path, Query]),
{error, bad_request}.
coap_post(_ChId, _Prefix, _Topic, _Content) ->
{error, method_not_allowed}.
coap_put(_ChId, ?MQTT_PREFIX, Topic, #coap_content{payload = Payload}) when Topic =/= [] ->
?LOG(debug, "put message, Topic=~p, Payload=~p~n", [Topic, Payload]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:publish(Pid, topic(Topic), Payload);
coap_put(_ChId, Prefix, Topic, Content) ->
?LOG(error, "put has error, Prefix=~p, Topic=~p, Content=~p", [Prefix, Topic, Content]),
{error, bad_request}.
coap_delete(_ChId, _Prefix, _Topic) ->
{error, method_not_allowed}.
coap_observe(ChId, ?MQTT_PREFIX, Topic, Ack, Content) when Topic =/= [] ->
TrueTopic = topic(Topic),
?LOG(debug, "observe Topic=~p, Ack=~p", [TrueTopic, Ack]),
Pid = get(mqtt_client_pid),
case emqx_coap_mqtt_adapter:subscribe(Pid, TrueTopic) of
ok -> {ok, {state, ChId, ?MQTT_PREFIX, [TrueTopic]}, content, Content};
{error, Code} -> {error, Code}
end;
coap_observe(ChId, Prefix, Topic, Ack, _Content) ->
?LOG(error, "unknown observe request ChId=~p, Prefix=~p, Topic=~p, Ack=~p", [ChId, Prefix, Topic, Ack]),
{error, bad_request}.
coap_unobserve({state, _ChId, ?MQTT_PREFIX, Topic}) when Topic =/= [] ->
?LOG(debug, "unobserve ~p", [Topic]),
Pid = get(mqtt_client_pid),
emqx_coap_mqtt_adapter:unsubscribe(Pid, topic(Topic)),
ok;
coap_unobserve({state, ChId, Prefix, Topic}) ->
?LOG(error, "ignore unknown unobserve request ChId=~p, Prefix=~p, Topic=~p", [ChId, Prefix, Topic]),
ok.
handle_info({dispatch, Topic, Payload}, State) ->
?LOG(debug, "dispatch Topic=~p, Payload=~p", [Topic, Payload]),
{notify, [], #coap_content{format = <<"application/octet-stream">>, payload = Payload}, State};
handle_info(Message, State) ->
emqx_coap_mqtt_adapter:handle_info(Message, State).
coap_ack(_Ref, State) -> {ok, State}.
get_auth(Query) ->
get_auth(Query, #coap_mqtt_auth{}).
get_auth([], Auth=#coap_mqtt_auth{}) ->
Auth;
get_auth([<<$c, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{clientid = Rest});
get_auth([<<$u, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{username = Rest});
get_auth([<<$p, $=, Rest/binary>>|T], Auth=#coap_mqtt_auth{}) ->
get_auth(T, Auth#coap_mqtt_auth{password = Rest});
get_auth([Param|T], Auth=#coap_mqtt_auth{}) ->
?LOG(error, "ignore unknown parameter ~p", [Param]),
get_auth(T, Auth).
topic(Topic) when is_binary(Topic) -> Topic;
topic([]) -> <<>>;
topic([Path | TopicPath]) ->
case topic(TopicPath) of
<<>> -> Path;
RemTopic ->
<<Path/binary, $/, RemTopic/binary>>
end.
-type result() :: emqx_coap_message()
| {has_sub, emqx_coap_message(), register()}.
-callback init(hocon:confg()) -> context().
-callback stop(context()) -> ok.
-callback get(emqx_coap_message(), hocon:config()) -> result().
-callback put(emqx_coap_message(), hocon:config()) -> result().
-callback post(emqx_coap_message(), hocon:config()) -> result().
-callback delete(emqx_coap_message(), hocon:config()) -> result().

View File

@ -1,106 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_server).
-include("src/coap/include/emqx_coap.hrl").
-export([ start/1
, stop/1
]).
-export([ start_listener/1
, start_listener/3
, stop_listener/1
, stop_listener/2
]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
start(Envs) ->
{ok, _} = application:ensure_all_started(gen_coap),
start_listeners(Envs).
stop(Envs) ->
stop_listeners(Envs).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
start_listeners(Envs) ->
lists:foreach(fun start_listener/1, listeners_confs(Envs)).
stop_listeners(Envs) ->
lists:foreach(fun stop_listener/1, listeners_confs(Envs)).
start_listener({Proto, ListenOn, Opts}) ->
case start_listener(Proto, ListenOn, Opts) of
{ok, _Pid} ->
io:format("Start coap:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to start coap:~s listener on ~s: ~0p~n",
[Proto, format(ListenOn), Reason]),
error(Reason)
end.
start_listener(udp, ListenOn, Opts) ->
coap_server:start_udp('coap:udp', ListenOn, Opts);
start_listener(dtls, ListenOn, Opts) ->
coap_server:start_dtls('coap:dtls', ListenOn, Opts).
stop_listener({Proto, ListenOn, _Opts}) ->
Ret = stop_listener(Proto, ListenOn),
case Ret of
ok -> io:format("Stop coap:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to stop coap:~s listener on ~s: ~0p~n.",
[Proto, format(ListenOn), Reason])
end,
Ret.
stop_listener(udp, ListenOn) ->
coap_server:stop_udp('coap:udp', ListenOn);
stop_listener(dtls, ListenOn) ->
coap_server:stop_dtls('coap:dtls', ListenOn).
%% XXX: It is a temporary func to convert conf format for esockd
listeners_confs(Envs) ->
listeners_confs(udp, Envs) ++ listeners_confs(dtls, Envs).
listeners_confs(udp, Envs) ->
Udps = proplists:get_value(bind_udp, Envs, []),
[{udp, Port, [{udp_options, InetOpts}]} || {Port, InetOpts} <- Udps];
listeners_confs(dtls, Envs) ->
case proplists:get_value(dtls_opts, Envs, []) of
[] -> [];
DtlsOpts ->
BindDtls = proplists:get_value(bind_dtls, Envs, []),
[{dtls, Port, [{dtls_options, InetOpts ++ DtlsOpts}]} || {Port, InetOpts} <- BindDtls]
end.
format(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).

View File

@ -0,0 +1,195 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_session).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
%% API
-export([new/0, transfer_result/3]).
-export([ received/3
, reply/4
, reply/5
, ack/3
, piggyback/4
, deliver/3
, timeout/3]).
-export_type([session/0]).
-record(session, { transport_manager :: emqx_coap_tm:manager()
, observe_manager :: emqx_coap_observe_res:manager()
, next_msg_id :: coap_message_id()
}).
-type session() :: #session{}.
%%%-------------------------------------------------------------------
%%% API
%%%-------------------------------------------------------------------
-spec new() -> session().
new() ->
_ = emqx_misc:rand_seed(),
#session{ transport_manager = emqx_coap_tm:new()
, observe_manager = emqx_coap_observe_res:new()
, next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}.
%%%-------------------------------------------------------------------
%%% Process Message
%%%-------------------------------------------------------------------
received(Session, Cfg, #coap_message{type = ack} = Msg) ->
handle_response(Session, Cfg, Msg);
received(Session, Cfg, #coap_message{type = reset} = Msg) ->
handle_response(Session, Cfg, Msg);
received(Session, Cfg, #coap_message{method = Method} = Msg) when is_atom(Method) ->
handle_request(Session, Cfg, Msg);
received(Session, Cfg, Msg) ->
handle_response(Session, Cfg, Msg).
reply(Session, Cfg, Req, Method) ->
reply(Session, Cfg, Req, Method, <<>>).
reply(Session, Cfg, Req, Method, Payload) ->
Response = emqx_coap_message:response(Method, Payload, Req),
handle_out(Session, Cfg, Response).
ack(Session, Cfg, Req) ->
piggyback(Session, Cfg, Req, <<>>).
piggyback(Session, Cfg, Req, Payload) ->
Response = emqx_coap_message:ack(Req),
Response2 = emqx_coap_message:set_payload(Payload, Response),
handle_out(Session, Cfg, Response2).
deliver(Session, Cfg, Delivers) ->
Fun = fun({_, Topic, Message},
#{out := OutAcc,
session := #session{observe_manager = OM,
next_msg_id = MsgId} = SAcc} = Acc) ->
case emqx_coap_observe_res:res_changed(OM, Topic) of
undefined ->
Acc;
{Token, SeqId, OM2} ->
Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg),
SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId),
observe_manager = OM2},
#{out := Out} = Result = call_transport_manager(SAcc2, Cfg, Msg, handle_out),
Result#{out := [Out | OutAcc]}
end
end,
lists:foldl(Fun,
#{out => [],
session => Session},
Delivers).
timeout(Session, Cfg, Timer) ->
call_transport_manager(Session, Cfg, Timer, ?FUNCTION_NAME).
transfer_result(Result, From, Value) ->
?TRANSFER_RESULT(Result, [out, subscribe], From, Value).
%%%-------------------------------------------------------------------
%%% Internal functions
%%%-------------------------------------------------------------------
handle_request(Session, Cfg, Msg) ->
call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME).
handle_response(Session, Cfg, Msg) ->
call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME).
handle_out(Session, Cfg, Msg) ->
call_transport_manager(Session, Cfg, Msg, ?FUNCTION_NAME).
call_transport_manager(#session{transport_manager = TM} = Session,
Cfg,
Msg,
Fun) ->
try
Result = emqx_coap_tm:Fun(Msg, TM, Cfg),
{ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2,
fun process_subscribe/2],
Result,
Session),
emqx_coap_channel:transfer_result(Result, session, Session2)
catch Type:Reason:Stack ->
?ERROR("process transmission with, message:~p failed~n
Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]),
#{out => emqx_coap_message:response({error, internal_server_error}, Msg)}
end.
process_tm(#{tm := TM}, Session) ->
{ok, Session#session{transport_manager = TM}};
process_tm(_, Session) ->
{ok, Session}.
process_subscribe(#{subscribe := Sub}, #session{observe_manager = OM} = Session) ->
case Sub of
undefined ->
{ok, Session};
{Topic, Token} ->
OM2 = emqx_coap_observe_res:insert(OM, Topic, Token),
{ok, Session#session{observe_manager = OM2}};
Topic ->
OM2 = emqx_coap_observe_res:remove(OM, Topic),
{ok, Session#session{observe_manager = OM2}}
end;
process_subscribe(_, Session) ->
{ok, Session}.
mqtt_to_coap(MQTT, MsgId, Token, SeqId, Cfg) ->
#message{payload = Payload} = MQTT,
#coap_message{type = get_notify_type(MQTT, Cfg),
method = {ok, content},
id = MsgId,
token = Token,
payload = Payload,
options = #{observe => SeqId,
max_age => get_max_age(MQTT)}}.
get_notify_type(#message{qos = Qos}, #{notify_type := Type}) ->
case Type of
qos ->
case Qos of
?QOS_0 ->
non;
_ ->
con
end;
Other ->
Other
end.
-spec get_max_age(#message{}) -> max_age().
get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
?MAXIMUM_MAX_AGE;
get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = Ts}) ->
Now = erlang:system_time(millisecond),
Diff = (Now - Ts + Interval * 1000) / 1000,
erlang:max(1, erlang:floor(Diff));
get_max_age(_) ->
?DEFAULT_MAX_AGE.
next_msg_id(MsgId) when MsgId >= ?MAX_MESSAGE_ID ->
1;
next_msg_id(MsgId) ->
MsgId + 1.

View File

@ -1,42 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_sup).
-behaviour(supervisor).
-export([ start_link/0
, init/1
]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init(_Args) ->
Registry = #{id => emqx_coap_registry,
start => {emqx_coap_registry, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_coap_registry]},
PsTopics = #{id => emqx_coap_pubsub_topics,
start => {emqx_coap_pubsub_topics, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_coap_pubsub_topics]},
{ok, {{one_for_all, 10, 3600}, [Registry, PsTopics]}}.

View File

@ -1,59 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_timer).
-include("src/coap/include/emqx_coap.hrl").
-export([ cancel_timer/1
, start_timer/2
, restart_timer/1
, kick_timer/1
, is_timeout/1
, get_timer_length/1
]).
-record(timer_state, {interval, kickme, tref, message}).
-define(LOG(Level, Format, Args),
emqx_logger:Level("CoAP-Timer: " ++ Format, Args)).
cancel_timer(#timer_state{tref = TRef}) when is_reference(TRef) ->
catch erlang:cancel_timer(TRef),
ok;
cancel_timer(_) ->
ok.
kick_timer(State=#timer_state{kickme = false}) ->
State#timer_state{kickme = true};
kick_timer(State=#timer_state{kickme = true}) ->
State.
start_timer(Sec, Msg) ->
?LOG(debug, "emqx_coap_timer:start_timer ~p", [Sec]),
TRef = erlang:send_after(timer:seconds(Sec), self(), Msg),
#timer_state{interval = Sec, kickme = false, tref = TRef, message = Msg}.
restart_timer(State=#timer_state{interval = Sec, message = Msg}) ->
?LOG(debug, "emqx_coap_timer:restart_timer ~p", [Sec]),
TRef = erlang:send_after(timer:seconds(Sec), self(), Msg),
State#timer_state{kickme = false, tref = TRef}.
is_timeout(#timer_state{kickme = Bool}) ->
not Bool.
get_timer_length(#timer_state{interval = Interval}) ->
Interval.

View File

@ -0,0 +1,196 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% transport manager
-module(emqx_coap_tm).
-export([ new/0
, handle_request/3
, handle_response/3
, handle_out/3
, timeout/3]).
-export_type([manager/0, event_result/2]).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-type direction() :: in | out.
-type transport_id() :: {direction(), non_neg_integer()}.
-record(transport, { id :: transport_id()
, state :: atom()
, timers :: maps:map()
, data :: any()}).
-type transport() :: #transport{}.
-type message_id() :: 0 .. ?MAX_MESSAGE_ID.
-type manager() :: #{message_id() => transport()}.
-type ttimeout() :: {state_timeout, pos_integer(), any()}
| {stop_timeout, pos_integer()}.
-type topic() :: binary().
-type token() :: binary().
-type sub_register() :: {topic(), token()} | topic().
-type event_result(State, Data) ::
#{next => State,
outgoing => emqx_coap_message(),
timeouts => list(ttimeout()),
has_sub => undefined | sub_register(),
data => Data}.
%%%===================================================================
%%% API
%%%===================================================================
new() ->
#{}.
handle_request(#coap_message{id = MsgId} = Msg, TM, Cfg) ->
Id = {in, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
Data = emqx_coap_transport:new(),
Transport = new_transport(Id, Data),
process_event(in, Msg, TM, Transport, Cfg);
TP ->
process_event(in, Msg, TM, TP, Cfg)
end.
handle_response(#coap_message{type = Type, id = MsgId} = Msg, TM, Cfg) ->
Id = {out, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
case Type of
reset ->
?EMPTY_RESULT;
_ ->
#{out => #coap_message{type = reset,
id = MsgId}}
end;
TP ->
process_event(in, Msg, TM, TP, Cfg)
end.
handle_out(#coap_message{id = MsgId} = Msg, TM, Cfg) ->
Id = {out, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
Data = emqx_coap_transport:new(),
Transport = new_transport(Id, Data),
process_event(out, Msg, TM, Transport, Cfg);
_ ->
?WARN("Repeat sending message with id:~p~n", [Id]),
?EMPTY_RESULT
end.
timeout({Id, Type, Msg}, TM, Cfg) ->
case maps:get(Id, TM, undefined) of
undefined ->
?EMPTY_RESULT;
#transport{timers = Timers} = TP ->
%% maybe timer has been canceled
case maps:is_key(Type, Timers) of
true ->
process_event(Type, Msg, TM, TP, Cfg);
_ ->
?EMPTY_RESULT
end
end.
%%--------------------------------------------------------------------
%% @doc
%% @spec
%% @end
%%--------------------------------------------------------------------
%%%===================================================================
%%% Internal functions
%%%===================================================================
new_transport(Id, Data) ->
#transport{id = Id,
state = idle,
timers = #{},
data = Data}.
process_event(stop_timeout,
_,
TM,
#transport{id = Id,
timers = Timers},
_) ->
lists:foreach(fun({_, Ref}) ->
emqx_misc:cancel_timer(Ref)
end,
maps:to_list(Timers)),
#{tm => maps:remove(Id, TM)};
process_event(Event,
Msg,
TM,
#transport{id = Id,
state = State,
data = Data} = TP,
Cfg) ->
Result = emqx_coap_transport:State(Event, Msg, Data, Cfg),
{ok, _, TP2} = emqx_misc:pipeline([fun process_state_change/2,
fun process_data_change/2,
fun process_timeouts/2],
Result,
TP),
TM2 = TM#{Id => TP2},
emqx_coap_session:transfer_result(Result, tm, TM2).
process_state_change(#{next := Next}, TP) ->
{ok, cancel_state_timer(TP#transport{state = Next})};
process_state_change(_, TP) ->
{ok, TP}.
cancel_state_timer(#transport{timers = Timers} = TP) ->
case maps:get(state_timer, Timers, undefined) of
undefined ->
TP;
Ref ->
_ = emqx_misc:cancel_timer(Ref),
TP#transport{timers = maps:remove(state_timer, Timers)}
end.
process_data_change(#{data := Data}, TP) ->
{ok, TP#transport{data = Data}};
process_data_change(_, TP) ->
{ok, TP}.
process_timeouts(#{timeouts := []}, TP) ->
{ok, TP};
process_timeouts(#{timeouts := Timeouts},
#transport{id = Id, timers = Timers} = TP) ->
NewTimers = lists:foldl(fun({state_timeout, _, _} = Timer, Acc) ->
process_timer(Id, Timer, Acc);
({stop_timeout, I}, Acc) ->
process_timer(Id, {stop_timeout, I, stop}, Acc)
end,
Timers,
Timeouts),
{ok, TP#transport{timers = NewTimers}};
process_timeouts(_, TP) ->
{ok, TP}.
process_timer(Id, {Type, Interval, Msg}, Timers) ->
Ref = emqx_misc:start_timer(Interval, {transport, {Id, Type, Msg}}),
Timers#{Type => Ref}.

View File

@ -0,0 +1,133 @@
-module(emqx_coap_transport).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-define(ACK_TIMEOUT, 2000).
-define(ACK_RANDOM_FACTOR, 1000).
-define(MAX_RETRANSMIT, 4).
-define(EXCHANGE_LIFETIME, 247000).
-define(NON_LIFETIME, 145000).
-record(data, { cache :: undefined | emqx_coap_message()
, retry_interval :: non_neg_integer()
, retry_count :: non_neg_integer()
}).
-type data() :: #data{}.
-export([ new/0, idle/4, maybe_reset/4
, maybe_resend/4, wait_ack/4, until_stop/4]).
-spec new() -> data().
new() ->
#data{cache = undefined,
retry_interval = 0,
retry_count = 0}.
idle(in,
#coap_message{type = non, id = MsgId, method = Method} = Msg,
_,
#{resource := Resource} = Cfg) ->
Ret = #{next => until_stop,
timeouts => [{stop_timeout, ?NON_LIFETIME}]},
case Method of
undefined ->
Ret#{out => #coap_message{type = reset, id = MsgId}};
_ ->
case erlang:apply(Resource, Method, [Msg, Cfg]) of
#coap_message{} = Result ->
Ret#{out => Result};
{has_sub, Result, Sub} ->
Ret#{out => Result, subscribe => Sub};
error ->
Ret#{out =>
emqx_coap_message:response({error, internal_server_error}, Msg)}
end
end;
idle(in,
#coap_message{id = MsgId,
type = con,
method = Method} = Msg,
Data,
#{resource := Resource} = Cfg) ->
Ret = #{next => maybe_resend,
timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]},
case Method of
undefined ->
ResetMsg = #coap_message{type = reset, id = MsgId},
Ret#{data => Data#data{cache = ResetMsg},
out => ResetMsg};
_ ->
{RetMsg, SubInfo} =
case erlang:apply(Resource, Method, [Msg, Cfg]) of
#coap_message{} = Result ->
{Result, undefined};
{has_sub, Result, Sub} ->
{Result, Sub};
error ->
{emqx_coap_message:response({error, internal_server_error}, Msg),
undefined}
end,
RetMsg2 = RetMsg#coap_message{type = ack},
Ret#{out => RetMsg2,
data => Data#data{cache = RetMsg2},
subscribe => SubInfo}
end;
idle(out, #coap_message{type = non} = Msg, _, _) ->
#{next => maybe_reset,
out => Msg,
timeouts => [{stop_timeout, ?NON_LIFETIME}]};
idle(out, Msg, Data, _) ->
_ = emqx_misc:rand_seed(),
Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
#{next => wait_ack,
data => Data#data{cache = Msg},
out => Msg,
timeouts => [ {state_timeout, Timeout, ack_timeout}
, {stop_timeout, ?EXCHANGE_LIFETIME}]}.
maybe_reset(in, Message, _, _) ->
case Message of
#coap_message{type = reset} ->
?INFO("Reset Message:~p~n", [Message]);
_ ->
ok
end,
?EMPTY_RESULT.
maybe_resend(in, _, _, #data{cache = Cache}) ->
#{out => Cache}.
wait_ack(in, #coap_message{type = Type}, _, _) ->
case Type of
ack ->
#{next => until_stop};
reset ->
#{next => until_stop};
_ ->
?EMPTY_RESULT
end;
wait_ack(state_timeout,
ack_timeout,
_,
#data{cache = Msg,
retry_interval = Timeout,
retry_count = Count} =Data) ->
case Count < ?MAX_RETRANSMIT of
true ->
Timeout2 = Timeout * 2,
#{data => Data#data{retry_interval = Timeout2,
retry_count = Count + 1},
out => Msg,
timeouts => [{state_timeout, Timeout2, ack_timeout}]};
_ ->
#{next_state => until_stop}
end.
until_stop(_, _, _, _) ->
?EMPTY_RESULT.

View File

@ -15,6 +15,72 @@
%%--------------------------------------------------------------------
-define(APP, emqx_coap).
-define(DEFAULT_COAP_PORT, 5683).
-define(DEFAULT_COAPS_PORT, 5684).
-define(MAX_MESSAGE_ID, 65535).
-define(MAX_BLOCK_SIZE, 1024).
-define(DEFAULT_MAX_AGE, 60).
-define(MAXIMUM_MAX_AGE, 4294967295).
-define(EMPTY_RESULT, #{}).
-define(TRANSFER_RESULT(R1, Keys, From, Value),
begin
R2 = maps:with(Keys, R1),
R2#{From => Value}
end).
-type coap_message_id() :: 1 .. ?MAX_MESSAGE_ID.
-type message_type() :: con | non | ack | reset.
-type max_age() :: 1 .. ?MAXIMUM_MAX_AGE.
-type message_option_name() :: if_match
| uri_host
| etag
| if_none_match
| uri_port
| location_path
| uri_path
| content_format
| max_age
| uri_query
| 'accept'
| location_query
| proxy_uri
| proxy_scheme
| size1
| observer
| block1
| block2.
-type message_options() :: #{ if_match => list(binary())
, uri_host => binary()
, etag => list(binary())
, if_none_match => boolean()
, uri_port => 0 .. 65535
, location_path => list(binary())
, uri_path => list(binary())
, content_format => 0 .. 65535
, max_age => non_neg_integer()
, uri_query => list(binary())
, 'accept' => 0 .. 65535
, location_query => list(binary())
, proxy_uri => binary()
, proxy_scheme => binary()
, size1 => non_neg_integer()
, observer => non_neg_integer()
, block1 => {non_neg_integer(), boolean(), non_neg_integer()}
, block2 => {non_neg_integer(), boolean(), non_neg_integer()}}.
-record(coap_mqtt_auth, {clientid, username, password}).
-record(coap_message, { type :: message_type()
, method
, id
, token = <<>>
, options = #{}
, payload = <<>>}).
-record(coap_content, {etag, max_age = ?DEFAULT_MAX_AGE, format, location_path = [], payload = <<>>}).
-type emqx_coap_message() :: #coap_message{}.
-type coap_content() :: #coap_content{}.

View File

@ -0,0 +1,154 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a coap to mqtt adapter
-module(emqx_coap_mqtt_resource).
-behaviour(emqx_coap_resource).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-logger_header("[CoAP-RES]").
-export([ init/1
, stop/1
, get/2
, put/2
, post/2
, delete/2
]).
-export([ check_topic/1
, publish/3
, subscribe/3
, unsubscribe/3]).
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, is_new => false}).
init(_) ->
{ok, undefined}.
stop(_) ->
ok.
%% get: subscribe, ignore observe option
get(#coap_message{token = Token} = Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
case Token of
<<>> ->
emqx_coap_message:response({error, bad_request}, <<"observer without token">>, Msg);
_ ->
Ret = subscribe(Msg, Topic, Cfg),
RetMsg = emqx_coap_message:response(Ret, Msg),
case Ret of
{ok, _} ->
{has_sub, RetMsg, {Topic, Token}};
_ ->
RetMsg
end
end;
Any ->
Any
end.
%% put: equal post
put(Msg, Cfg) ->
post(Msg, Cfg).
%% post: publish a message
post(Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
emqx_coap_message:response(publish(Msg, Topic, Cfg), Msg);
Any ->
Any
end.
%% delete: ubsubscribe
delete(Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
unsubscribe(Msg, Topic, Cfg),
{has_sub, emqx_coap_message:response({ok, deleted}, Msg), Topic};
Any ->
Any
end.
check_topic(#coap_message{options = Options} = Msg) ->
case maps:get(uri_path, Options, []) of
[] ->
emqx_coap_message:response({error, bad_request}, <<"invalid topic">> , Msg);
UriPath ->
Sep = <<"/">>,
{ok, lists:foldl(fun(Part, Acc) ->
<<Acc/binary, Sep/binary, Part/binary>>
end,
<<>>,
UriPath)}
end.
publish(#coap_message{payload = Payload} = Msg,
Topic,
#{clientinfo := ClientInfo,
publish_qos := QOS} = Cfg) ->
case emqx_coap_channel:auth_publish(Topic, Cfg) of
allow ->
#{clientid := ClientId} = ClientInfo,
MQTTMsg = emqx_message:make(ClientId, type_to_qos(QOS, Msg), Topic, Payload),
MQTTMsg2 = emqx_message:set_flag(retain, false, MQTTMsg),
_ = emqx_broker:publish(MQTTMsg2),
{ok, changed};
_ ->
{error, unauthorized}
end.
subscribe(Msg, Topic, #{clientinfo := ClientInfo}= Cfg) ->
case emqx_topic:wildcard(Topic) of
false ->
case emqx_coap_channel:auth_subscribe(Topic, Cfg) of
allow ->
#{clientid := ClientId} = ClientInfo,
SubOpts = get_sub_opts(Msg, Cfg),
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
{ok, created};
_ ->
{error, unauthorized}
end;
_ ->
%% now, we don't support wildcard in subscribe topic
{error, bad_request, <<"">>}
end.
unsubscribe(Msg, Topic, #{clientinfo := ClientInfo} = Cfg) ->
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, get_sub_opts(Msg, Cfg)]).
get_sub_opts(Msg, #{subscribe_qos := Type}) ->
?SUBOPTS#{qos => type_to_qos(Type, Msg)}.
type_to_qos(qos0, _) -> ?QOS_0;
type_to_qos(qos1, _) -> ?QOS_1;
type_to_qos(qos2, _) -> ?QOS_2;
type_to_qos(coap, #coap_message{type = Type}) ->
case Type of
non ->
?QOS_0;
_ ->
?QOS_1
end.

View File

@ -0,0 +1,220 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a coap to mqtt adapter with a retained topic message database
-module(emqx_coap_pubsub_resource).
-behaviour(emqx_coap_resource).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-logger_header("[CoAP-PS-RES]").
-export([ init/1
, stop/1
, get/2
, put/2
, post/2
, delete/2
]).
-import(emqx_coap_mqtt_resource, [ check_topic/1, subscribe/3, unsubscribe/3
, publish/3]).
-import(emqx_coap_message, [response/2, response/3, set_content/2]).
%%--------------------------------------------------------------------
%% Resource Callbacks
%%--------------------------------------------------------------------
init(_) ->
emqx_coap_pubsub_topics:start_link().
stop(Pid) ->
emqx_coap_pubsub_topics:stop(Pid).
%% get: read last publish message
%% get with observe 0: subscribe
%% get with observe 1: unsubscribe
get(#coap_message{token = Token} = Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
case emqx_coap_message:get_option(observe, Msg) of
undefined ->
Content = emqx_coap_message:get_content(Msg),
read_last_publish_message(emqx_topic:wildcard(Topic), Msg, Topic, Content);
0 ->
case Token of
<<>> ->
response({error, bad_reuqest}, <<"observe without token">>, Msg);
_ ->
Ret = subscribe(Msg, Topic, Cfg),
RetMsg = response(Ret, Msg),
case Ret of
{ok, _} ->
{has_sub, RetMsg, {Topic, Token}};
_ ->
RetMsg
end
end;
1 ->
unsubscribe(Msg, Topic, Cfg),
{has_sub, response({ok, deleted}, Msg), Topic}
end;
Any ->
Any
end.
%% put: insert a message into topic database
put(Msg, _) ->
case check_topic(Msg) of
{ok, Topic} ->
Content = emqx_coap_message:get_content(Msg),
#coap_content{payload = Payload,
format = Format,
max_age = MaxAge} = Content,
handle_received_create(Msg, Topic, MaxAge, Format, Payload);
Any ->
Any
end.
%% post: like put, but will publish the inserted message
post(Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
Content = emqx_coap_message:get_content(Msg),
#coap_content{max_age = MaxAge,
format = Format,
payload = Payload} = Content,
handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg);
Any ->
Any
end.
%% delete: delete a message from topic database
delete(Msg, _) ->
case check_topic(Msg) of
{ok, Topic} ->
delete_topic_info(Msg, Topic);
Any ->
Any
end.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
add_topic_info(Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[{_, StoredMaxAge, StoredCT, _, _}] ->
?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
%% check whether the ct value stored matches the ct option in this POST message
case Format =:= StoredCT of
true ->
{ok, Ret} =
case StoredMaxAge =:= MaxAge of
true ->
emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload);
false ->
emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload)
end,
{changed, Ret};
false ->
?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]),
{changed, false}
end;
[] ->
?LOG(debug, "publish topic=~p will be created", [Topic]),
{ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload),
{created, Ret}
end;
add_topic_info(Topic, _MaxAge, _Format, _Payload) ->
?LOG(debug, "create topic=~p info failed", [Topic]),
{badarg, false}.
format_string_to_int(<<"application/octet-stream">>) ->
<<"42">>;
format_string_to_int(<<"application/exi">>) ->
<<"47">>;
format_string_to_int(<<"application/json">>) ->
<<"50">>;
format_string_to_int(_) ->
<<"42">>.
handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg) ->
case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of
{_, true} ->
response(publish(Msg, Topic, Cfg), Msg);
{_, false} ->
?LOG(debug, "add_topic_info failed, will return bad_request", []),
response({error, bad_request}, Msg)
end.
handle_received_create(Msg, Topic, MaxAge, Format, Payload) ->
case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of
{Ret, true} ->
response({ok, Ret}, Msg);
{_, false} ->
?LOG(debug, "add_topic_info failed, will return bad_request", []),
response({error, bad_request}, Msg)
end.
return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content) ->
TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000),
case TimeElapsed < MaxAge of
true ->
LeftTime = (MaxAge - TimeElapsed),
?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]),
set_content(Content#coap_content{max_age = LeftTime, payload = Payload},
response({ok, content}, Msg));
false ->
?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]),
response({ok, nocontent}, Msg)
end.
read_last_publish_message(false, Msg, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
response({error, not_found}, Msg);
[{_, MaxAge, CT, Payload, TimeStamp}] ->
case CT =:= format_string_to_int(QueryFormat) of
true ->
return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content);
false ->
?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]),
response({error, bad_request}, Msg)
end
end;
read_last_publish_message(false, Msg, Topic, Content) ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
response({error, not_found}, Msg);
[{_, MaxAge, _, Payload, TimeStamp}] ->
return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content)
end;
read_last_publish_message(true, Msg, Topic, _Content) ->
?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]),
response({error, bad_request}, Msg).
delete_topic_info(Msg, Topic) ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
response({error, not_found}, Msg);
[{_, _, _, _, _}] ->
emqx_coap_pubsub_topics:delete_sub_topics(Topic),
response({ok, deleted}, Msg)
end.

View File

@ -18,10 +18,8 @@
-behaviour(gen_server).
-include("src/coap/include/emqx_coap.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-logger_header("[CoAP-PS-TOPICS]").

View File

@ -45,7 +45,7 @@ load_default_gateway_applications() ->
gateway_type_searching() ->
%% FIXME: Hardcoded apps
[emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl].
[emqx_stomp_impl, emqx_sn_impl, emqx_exproto_impl, emqx_coap_impl].
load(Mod) ->
try

View File

@ -34,7 +34,8 @@ structs() -> ["gateway"].
fields("gateway") ->
[{stomp, t(ref(stomp))},
{mqttsn, t(ref(mqttsn))},
{exproto, t(ref(exproto))}
{exproto, t(ref(exproto))},
{coap, t(ref(coap))}
];
fields(stomp) ->
@ -183,6 +184,20 @@ fields(access) ->
[ {"$id", #{type => string(),
nullable => true}}];
fields(coap) ->
[{"$id", t(ref(coap_structs))}];
fields(coap_structs) ->
[ {enable_stats, t(boolean(), undefined, true)}
, {authenticator, t(union([allow_anonymous]))}
, {heartbeat, t(duration(), undefined, "15s")}
, {resource, t(union([mqtt, pubsub]), undefined, mqtt)}
, {notify_type, t(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {listener, t(ref(udp_listener_group))}
];
fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"),
Mod:fields(ExtraField).