refactor(emqx_gateway): refactor the emqx_coap

This commit is contained in:
lafirest 2021-08-06 19:08:20 +08:00
parent 632cc895d5
commit aafdf29cd8
23 changed files with 1044 additions and 1001 deletions

View File

@ -39,9 +39,22 @@ gateway: {
coap.1: {
enable_stats: false
authentication.enable: false
authentication: {
enable: true
authenticators: [
{
name: "authenticator1"
mechanism: password-based
server_type: built-in-database
user_id_type: clientid
}
]
}
#authentication.enable: false
heartbeat: 30s
resource: mqtt
notify_type: qos
subscribe_qos: qos0
publish_qos: qos1
@ -50,19 +63,6 @@ gateway: {
}
}
coap.2: {
enable_stats: false
authentication.enable:false
heartbeat: 30s
resource: pubsub
notify_type: non
subscribe_qos: qos2
publish_qos: coap
listener.udp.1: {
bind: 5687
}
}
mqttsn.1: {
## The MQTT-SN Gateway ID in ADVERTISE message.
gateway_id: 1

View File

@ -94,4 +94,3 @@
%% @doc The callback for process terminated
-callback terminate(any(), channel()) -> ok.

View File

@ -1,190 +1,401 @@
# Table of Contents
1. [EMQX 5.0 CoAP Gateway](#org6feb6de)
2. [CoAP Message Processing Flow](#org8458c1a)
1. [Request Timing Diagram](#orgeaa4f53)
1. [Transport && Transport Manager](#org88207b8)
2. [Resource](#orgb32ce94)
3. [Resource](#org8956f90)
1. [MQTT Resource](#orge8c21b1)
2. [PubSub Resource](#org68ddce7)
4. [Heartbeat](#orgffdfecd)
5. [Command](#org43004c2)
6. [MQTT QOS <=> CoAP non/con](#org0157b5c)
1. [EMQX 5.0 CoAP Gateway](#org61e5bb8)
1. [Features](#orgeddbc94)
1. [PubSub Handler](#orgfc7be2d)
2. [MQTT Handler](#org55be508)
3. [Heartbeat](#org3d1a32e)
4. [Query String](#org9a6b996)
2. [Implementation](#org9985dfe)
1. [Request/Response flow](#orge94210c)
<a id="org6feb6de"></a>
<a id="org61e5bb8"></a>
# EMQX 5.0 CoAP Gateway
emqx-coap is a CoAP Gateway for EMQ X Broker.
It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
emqx-coap is a CoAP Gateway for EMQ X Broker. It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients.
<a id="org8458c1a"></a>
<a id="orgeddbc94"></a>
# CoAP Message Processing Flow
## Features
- Partially achieves [Publish-Subscribe Broker for the Constrained Application Protocol (CoAP)](https://datatracker.ietf.org/doc/html/draft-ietf-core-coap-pubsub-09)
we called this as ps handler, include following functions:
- Publish
- Subscribe
- UnSubscribe
- Long connection and authorization verification called as MQTT handler
<a id="orgeaa4f53"></a>
<a id="orgfc7be2d"></a>
## Request Timing Diagram
### PubSub Handler
1. Publish
,------. ,------------. ,-----------------. ,---------. ,--------.
|client| |coap_gateway| |transport_manager| |transport| |resource|
`--+---' `-----+------' `--------+--------' `----+----' `---+----'
| | | | |
| -------------------> | | |
| | | | |
| | | | |
| | ------------------------>| | |
| | | | |
| | | | |
| | |----------------------->| |
| | | | |
| | | | |
| | | |------------------>|
| | | | |
| | | | |
| | | |<------------------|
| | | | |
| | | | |
| | |<-----------------------| |
| | | | |
| | | | |
| | <------------------------| | |
| | | | |
| | | | |
| <------------------- | | |
,--+---. ,-----+------. ,--------+--------. ,----+----. ,---+----.
|client| |coap_gateway| |transport_manager| |transport| |resource|
`------' `------------' `-----------------' `---------' `--------'
Method: POST\
URI Schema: ps/{+topic}{?q\*}\
q\*: [Shared Options](#orgc50043b)\
Response:
- 2.04 "Changed" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" when with wrong auth uri query
<a id="org88207b8"></a>
2. Subscribe
### Transport && Transport Manager
Method: GET
Options:
Transport is a module that manages the life cycle and behaviour of CoAP messages\
And the transport manager is to manage all transport which in this gateway
- Observer = 0
URI Schema: ps/{+topic}{?q\*}\
q\*: see [Shared Options](#orgc50043b)\
Response:
<a id="orgb32ce94"></a>
- 2.05 "Content" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" when with wrong auth uri query
### Resource
The Resource is a behaviour that must implement GET/PUT/POST/DELETE method\
Different Resources can have different implementations of this four method\
Each gateway can only use one Resource module to process CoAP Request Message
<a id="org8956f90"></a>
# Resource
<a id="orge8c21b1"></a>
## MQTT Resource
The MQTT Resource is a simple CoAP to MQTT adapter, the implementation of each method is as follows:
- use uri path as topic
- GET: subscribe the topic
- PUT: publish message to this topic
- POST: like PUT
- DELETE: unsubscribe the topic
<a id="org68ddce7"></a>
## PubSub Resource
The PubSub Resource like the MQTT Resource, but has a retained topic's message database\
This Resource is shared, only can has one instance. The implementation:
- use uri path as topic
- GET:
- GET with observe = 0: subscribe the topic
- GET with observe = 1: unsubscribe the topic
- GET without observe: read lastest message from the message database, key is the topic
- PUT:
insert message into the message database, key is the topic
- POST:
like PUT, but will publish the message
- DELETE:
delete message from the database, key is topic
<a id="orgffdfecd"></a>
# Heartbeat
At present, the CoAP gateway only supports UDP/DTLS connection, don't support UDP over TCP and UDP over WebSocket.
Because UDP is connectionless, so the client needs to send heartbeat ping to the server interval. Otherwise, the server will close related resources
Use ****POST with empty uri path**** as a heartbeat ping
example:
```
coap-client -m post coap://127.0.0.1
Client1 Client2 Broker
| | Subscribe |
| | ----- GET /ps/topic1 Observe:0 Token:XX ----> |
| | |
| | <---------- 2.05 Content Observe:10---------- |
| | |
| | |
| | Publish |
| ---------|----------- PUT /ps/topic1 "1033.3" --------> |
| | Notify |
| | <---------- 2.05 Content Observe:11 --------- |
| | |
```
<a id="org43004c2"></a>
3. UnSubscribe
# Command
Method : GET
Options:
Command is means the operation which outside the CoAP protocol, like authorization
The Command format:
- Observe = 1
1. use ****POST**** method
2. uri path is empty
3. query string is like ****action=comandX&argX=valuex&argY=valueY****
URI Schema: ps/{+topic}{?q\*}\
q\*: see [Shared Options](#orgc50043b)\
Response:
example:
1. connect:
```
coap-client -m post coap://127.0.0.1?action=connect&clientid=XXX&username=XXX&password=XXX
```
2. disconnect:
```
coap-client -m post coap://127.0.0.1?action=disconnect
```
- 2.07 "No Content" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" when with wrong auth uri query
<a id="org0157b5c"></a>
# MQTT QOS <=> CoAP non/con
<a id="org55be508"></a>
CoAP gateway uses some options to control the conversion between MQTT qos and coap non/con:
### MQTT Handler
1.notify_type
Control the type of notify messages when the observed object has changed.Can be:
Establishing a connection is optional. If the CoAP client needs to use connection-based operations, it must first establish a connection.
At the same time, the connectionless mode and the connected mode cannot be mixed.
In connection mode, the Publish/Subscribe/UnSubscribe sent by the client must be has Token and ClientId in query string.
If the Token and Clientid is wrong/miss, EMQ X will reset the request.
The communication token is the data carried in the response payload after the client successfully establishes a connection.
After obtaining the token, the client's subsequent request must attach "token=Token" to the Query String
ClientId is necessary when there is a connection, and is a unique identifier defined by the client.
The server manages the client through the ClientId. If the ClientId is wrong, EMQ X will reset the request.
- non
- con
- qos
in this value, MQTT QOS0 -> non, QOS1/QOS2 -> con
1. Create a Connection
2.subscribe_qos
Control the qos of subscribe.Can be:
Method: POST
URI Schema: mqtt/{+topic}{?q\*}
q\*:
- qos0
- qos1
- qos2
- coap
in this value, CoAP non -> qos0, con -> qos1
- clientId := client uid
- username
- password
3.publish_qos
like subscribe_qos, but control the qos of the publish MQTT message
Response:
License
-------
- 2.01 "Created" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" wrong username or password
Apache License Version 2.0
Payload: Token if success
Author
------
2. Close a Connection
EMQ X Team.
Method : DELETE
URI Schema: mqtt/{+topic}{?q\*}
q\*:
- clientId := client uid
- token
Resonse:
- 2.01 "Deleted" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" wrong clientid or token
<a id="org3d1a32e"></a>
### Heartbeat
The Coap client can maintain the "connection" with the server through the heartbeat (regardless of whether it is authenticated or not), so that the server will not release related resources
Method : PUT
URI Schema: mqtt/{+topic}{?q\*}
q\*:
- clientId if authenticated
- token if authenticated
Response:
- 2.01 "Changed" when success
- 4.00 "Bad Request" when error
- 4.01 "Unauthorized" wrong clientid or token
<a id="org9a6b996"></a>
### Query String
CoAP gateway uses some options in query string to conversion between MQTT CoAP.
1. Shared Options <a id="orgc50043b"></a>
- clientId
- token
2. Connect Options
- username
- password
3. Publish
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-left" />
<col class="org-left" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">option</th>
<th scope="col" class="org-left">value type</th>
<th scope="col" class="org-left">default</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">retain</td>
<td class="org-left">boolean</td>
<td class="org-left">false</td>
</tr>
<tr>
<td class="org-left">qos</td>
<td class="org-left">MQTT QOS</td>
<td class="org-left">See <a href="#org0345c3e">here</a></td>
</tr>
<tr>
<td class="org-left">expiry</td>
<td class="org-left">Message Expiry Interval</td>
<td class="org-left">0(Never expiry)</td>
</tr>
</tbody>
</table>
4. Subscribe
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-left" />
<col class="org-right" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">option</th>
<th scope="col" class="org-left">value type</th>
<th scope="col" class="org-right">default</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">qos</td>
<td class="org-left">MQTT QOS</td>
<td class="org-right">See <a href="#org2325c7d">here</a></td>
</tr>
<tr>
<td class="org-left">nl</td>
<td class="org-left">MQTT Subscribe No Local</td>
<td class="org-right">0</td>
</tr>
<tr>
<td class="org-left">rh</td>
<td class="org-left">MQTT Subscribe Retain Handing</td>
<td class="org-right">0</td>
</tr>
</tbody>
</table>
5. MQTT QOS <=> CoAP non/con
1.notif_type
Control the type of notify messages when the observed object has changed.Can be:
- non
- con
- qos
in this value, MQTT QOS0 -> non, QOS1/QOS2 -> con
2.subscribe_qos <a id="org2325c7d"></a>
Control the qos of subscribe.Can be:
- qos0
- qos1
- qos2
- coap
in this value, CoAP non -> qos0, con -> qos1
3.publish_qos <a id="org0345c3e"></a>
like subscribe_qos, but control the qos of the publish MQTT message
<a id="org9985dfe"></a>
## Implementation
<a id="orge94210c"></a>
### Request/Response flow
![img](./doc/flow.png)
1. Authorization check
Check whether the clientid and token in the query string match the current connection
2. Session
Manager the "Transport Mnager" "Observe Resouces Manger" and next message id
3. Transport Mnager
Manager "Transport" create/close/dispatch
4. Observe resources Mnager
Mnager observe topic and token
5. Transport
![img](./doc/transport.png)
1. Shared State
![img](./doc/shared_state.png)
6. Handler
1. pubsub
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-right" />
<col class="org-left" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Method</th>
<th scope="col" class="org-right">Observe</th>
<th scope="col" class="org-left">Action</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">GET</td>
<td class="org-right">0</td>
<td class="org-left">subscribe and reply result</td>
</tr>
<tr>
<td class="org-left">GET</td>
<td class="org-right">1</td>
<td class="org-left">unsubscribe and reply result</td>
</tr>
<tr>
<td class="org-left">POST</td>
<td class="org-right">X</td>
<td class="org-left">publish and reply result</td>
</tr>
</tbody>
</table>
2. mqtt
<table border="2" cellspacing="0" cellpadding="6" rules="groups" frame="hsides">
<colgroup>
<col class="org-left" />
<col class="org-left" />
</colgroup>
<thead>
<tr>
<th scope="col" class="org-left">Method</th>
<th scope="col" class="org-left">Action</th>
</tr>
</thead>
<tbody>
<tr>
<td class="org-left">PUT</td>
<td class="org-left">reply result</td>
</tr>
<tr>
<td class="org-left">POST</td>
<td class="org-left">return create connection action</td>
</tr>
<tr>
<td class="org-left">DELETE</td>
<td class="org-left">return close connection action</td>
</tr>
</tbody>
</table>

Binary file not shown.

After

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 148 KiB

View File

@ -27,10 +27,11 @@
-export([ info/1
, info/2
, stats/1
, auth_publish/2
, auth_subscribe/2
, reply/4
, ack/4
, validator/3
, get_clientinfo/1
, get_config/2
, get_config/3
, result_keys/0
, transfer_result/3]).
-export([ init/2
@ -60,9 +61,16 @@
keepalive :: emqx_keepalive:keepalive() | undefined,
%% Timer
timers :: #{atom() => disable | undefined | reference()},
token :: binary() | undefined,
config :: hocon:config()
}).
%% the execuate context for session call
-record(exec_ctx, { config :: hocon:config(),
ctx :: emqx_gateway_ctx:context(),
clientinfo :: emqx_types:clientinfo()
}).
-type channel() :: #channel{}.
-define(DISCONNECT_WAIT_TIME, timer:seconds(10)).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
@ -98,13 +106,18 @@ init(ConnInfo = #{peername := {PeerHost, _},
#{ctx := Ctx} = Config) ->
Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Config, undefined),
EnableAuth = maps:get(enable, maps:get(authentication, Config)),
ClientInfo = set_peercert_infos(
Peercert,
#{ zone => default
, protocol => 'coap'
, peerhost => PeerHost
, sockport => SockPort
, clientid => emqx_guid:to_base62(emqx_guid:gen())
, clientid => if EnableAuth ->
undefined;
true ->
emqx_guid:to_base62(emqx_guid:gen())
end
, username => undefined
, is_bridge => false
, is_superuser => false
@ -116,48 +129,52 @@ init(ConnInfo = #{peername := {PeerHost, _},
, conninfo = ConnInfo
, clientinfo = ClientInfo
, timers = #{}
, config = Config
, session = emqx_coap_session:new()
, config = Config#{clientinfo => ClientInfo,
ctx => Ctx}
, keepalive = emqx_keepalive:init(maps:get(heartbeat, Config))
}.
auth_publish(Topic,
#{ctx := Ctx,
clientinfo := ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic).
validator(Type, Topic, #exec_ctx{ctx = Ctx,
clientinfo = ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic).
auth_subscribe(Topic,
#{ctx := Ctx,
clientinfo := ClientInfo}) ->
emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic).
get_clientinfo(#exec_ctx{clientinfo = ClientInfo}) ->
ClientInfo.
get_config(Key, Ctx) ->
get_config(Key, Ctx, undefined).
get_config(Key, #exec_ctx{config = Cfg}, Def) ->
maps:get(Key, Cfg, Def).
result_keys() ->
[out, reply, connection].
transfer_result(From, Value, Result) ->
?TRANSFER_RESULT([out], From, Value, Result).
?TRANSFER_RESULT(From, Value, Result).
%%--------------------------------------------------------------------
%% Handle incoming packet
%%--------------------------------------------------------------------
%% treat post to root path as a heartbeat
%% treat post to root path with query string as a command
handle_in(#coap_message{method = post,
options = Options} = Msg, ChannelT) ->
Channel = ensure_keepalive_timer(ChannelT),
case maps:get(uri_path, Options, <<>>) of
<<>> ->
handle_command(Msg, Channel);
handle_in(Msg, ChannleT) ->
Channel = ensure_keepalive_timer(ChannleT),
case convert_queries(Msg) of
{ok, Msg2} ->
case emqx_coap_message:is_request(Msg2) of
true ->
check_auth_state(Msg2, Channel);
_ ->
call_session(handle_response, Msg2, Channel)
end;
_ ->
call_session(received, [Msg], Channel)
end;
handle_in(Msg, Channel) ->
call_session(received, [Msg], ensure_keepalive_timer(Channel)).
response({error, bad_request}, <<"bad uri_query">>, Msg, Channel)
end.
%%--------------------------------------------------------------------
%% Handle Delivers from broker to client
%%--------------------------------------------------------------------
handle_deliver(Delivers, Channel) ->
call_session(deliver, [Delivers], Channel).
call_session(deliver, Delivers, Channel).
%%--------------------------------------------------------------------
%% Handle timeout
@ -172,7 +189,7 @@ handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel
end;
handle_timeout(_, {transport, Msg}, Channel) ->
call_session(timeout, [Msg], Channel);
call_session(timeout, Msg, Channel);
handle_timeout(_, disconnect, Channel) ->
{shutdown, normal, Channel};
@ -238,48 +255,123 @@ ensure_keepalive_timer(Fun, #channel{config = Cfg} = Channel) ->
Interval = maps:get(heartbeat, Cfg),
Fun(keepalive, Interval, keepalive, Channel).
handle_command(#coap_message{options = Options} = Msg, Channel) ->
case maps:get(uri_query, Options, []) of
[] ->
%% heartbeat
ack(Channel, {ok, valid}, <<>>, Msg);
QueryPairs ->
Queries = lists:foldl(fun(Pair, Acc) ->
[{K, V}] = cow_qs:parse_qs(Pair),
Acc#{K => V}
end,
#{},
QueryPairs),
case maps:get(<<"action">>, Queries, undefined) of
undefined ->
ack(Channel, {error, bad_request}, <<"command without actions">>, Msg);
Action ->
handle_command(Action, Queries, Msg, Channel)
end
call_session(Fun,
Msg,
#channel{session = Session} = Channel) ->
Ctx = new_exec_ctx(Channel),
Result = erlang:apply(emqx_coap_session, Fun, [Msg, Ctx, Session]),
process_result([session, connection, out], Result, Msg, Channel).
process_result([Key | T], Result, Msg, Channel) ->
case handle_result(Key, Result, Msg, Channel) of
{ok, Channel2} ->
process_result(T, Result, Msg, Channel2);
Other ->
Other
end;
process_result(_, _, _, Channel) ->
{ok, Channel}.
handle_result(session, #{session := Session}, _, Channel) ->
{ok, Channel#channel{session = Session}};
handle_result(connection, #{connection := open}, Msg, Channel) ->
do_connect(Msg, Channel);
handle_result(connection, #{connection := close}, Msg, Channel) ->
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
{shutdown, close, {outgoing, Reply}, Channel};
handle_result(out, #{out := Out}, _, Channel) ->
{ok, {outgoing, Out}, Channel};
handle_result(_, _, _, Channel) ->
{ok, Channel}.
check_auth_state(Method, #channel{config = Cfg} = Channel) ->
#{authentication := #{enable := Enable}} = Cfg,
check_token(Enable, Method, Channel).
check_token(true,
#coap_message{options = Options} = Msg,
#channel{token = Token,
clientinfo = ClientInfo} = Channel) ->
#{clientid := ClientId} = ClientInfo,
case maps:get(uri_query, Options, undefined) of
#{<<"clientid">> := ClientId,
<<"token">> := Token} ->
call_session(handle_request, Msg, Channel);
#{<<"clientid">> := DesireId} ->
try_takeover(ClientId, DesireId, Msg, Channel);
_ ->
response({error, unauthorized}, Msg, Channel)
end;
check_token(false,
#coap_message{options = Options} = Msg,
Channel) ->
case maps:get(uri_query, Options, undefined) of
#{<<"clientid">> := _} ->
response({error, unauthorized}, Msg, Channel);
#{<<"token">> := _} ->
response({error, unauthorized}, Msg, Channel);
_ ->
call_session(handle_request, Msg, Channel)
end.
handle_command(<<"connect">>, Queries, Msg, Channel) ->
response(Method, Req, Channel) ->
response(Method, <<>>, Req, Channel).
response(Method, Payload, Req, Channel) ->
Reply = emqx_coap_message:piggyback(Method, Payload, Req),
call_session(handle_out, Reply, Channel).
try_takeover(undefined,
DesireId,
#coap_message{options = Opts} = Msg,
Channel) ->
case maps:get(uri_path, Opts, []) of
[<<"mqtt">>, <<"connection">> | _] ->
%% may be is a connect request
%% TODO need check repeat connect, unless we implement the
%% udp connection baseon the clientid
call_session(handle_request, Msg, Channel);
_ ->
do_takeover(DesireId, Msg, Channel)
end;
try_takeover(_, DesireId, Msg, Channel) ->
do_takeover(DesireId, Msg, Channel).
do_takeover(_DesireId, Msg, Channel) ->
%% TODO completed the takeover, now only reset the message
Reset = emqx_coap_message:reset(Msg),
call_session(handle_out, Reset, Channel).
new_exec_ctx(#channel{config = Cfg,
ctx = Ctx,
clientinfo = ClientInfo}) ->
#exec_ctx{config = Cfg,
ctx = Ctx,
clientinfo = ClientInfo}.
do_connect(#coap_message{options = Opts} = Req, Channel) ->
Queries = maps:get(uri_query, Opts),
case emqx_misc:pipeline(
[ fun run_conn_hooks/2
, fun enrich_clientinfo/2
, fun set_log_meta/2
, fun auth_connect/2
],
{Queries, Msg},
{Queries, Req},
Channel) of
{ok, _Input, NChannel} ->
process_connect(ensure_connected(NChannel), Msg);
process_connect(ensure_connected(NChannel), Req);
{error, ReasonCode, NChannel} ->
ErrMsg = io_lib:format("Login Failed: ~s", [ReasonCode]),
ack(NChannel, {error, bad_request}, ErrMsg, Msg)
end;
handle_command(<<"disconnect">>, _, Msg, Channel) ->
Channel2 = ensure_timer(disconnect, ?DISCONNECT_WAIT_TIME, disconnect, Channel),
ack(Channel2, {ok, deleted}, <<>>, Msg);
handle_command(_, _, Msg, Channel) ->
ack(Channel, {error, bad_request}, <<"invalid action">>, Msg).
response({error, bad_request}, ErrMsg, Req, NChannel)
end.
run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
conninfo = ConnInfo}) ->
@ -291,8 +383,7 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx,
end.
enrich_clientinfo({Queries, Msg},
Channel = #channel{clientinfo = ClientInfo0,
config = Cfg}) ->
Channel = #channel{clientinfo = ClientInfo0}) ->
case Queries of
#{<<"username">> := UserName,
<<"password">> := Password,
@ -301,8 +392,7 @@ enrich_clientinfo({Queries, Msg},
password => Password,
clientid => ClientId},
{ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo),
{ok, Channel#channel{clientinfo = NClientInfo,
config = Cfg#{clientinfo := NClientInfo}}};
{ok, Channel#channel{clientinfo = NClientInfo}};
_ ->
{error, "invalid queries", Channel}
end.
@ -324,7 +414,8 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx,
{error, Reason}
end.
fix_mountpoint(_Packet, #{mountpoint := undefined}) -> ok;
fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) ->
{ok, ClientInfo};
fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
%% TODO: Enrich the varibale replacement????
%% i.e: ${ClientInfo.auth_result.productKey}
@ -334,27 +425,33 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) ->
ensure_connected(Channel = #channel{ctx = Ctx,
conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond)
, proto_name => <<"COAP">>
, proto_ver => <<"1">>
},
ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]),
Channel#channel{conninfo = NConnInfo}.
process_connect(Channel = #channel{ctx = Ctx,
session = Session,
conninfo = ConnInfo,
clientinfo = ClientInfo},
Msg) ->
SessFun = fun(_,_) -> emqx_coap_session:new() end,
%% inherit the old session
SessFun = fun(_,_) -> Session end,
case emqx_gateway_ctx:open_session(
Ctx,
true,
ClientInfo,
ConnInfo,
SessFun
SessFun,
emqx_coap_session
) of
{ok, _Sess} ->
ack(Channel, {ok, created}, <<"connected">>, Msg);
response({ok, created}, <<"connected">>, Msg, Channel);
{error, Reason} ->
?LOG(error, "Failed to open session du to ~p", [Reason]),
ack(Channel, {error, bad_request}, <<>>, Msg)
response({error, bad_request}, Msg, Channel)
end.
run_hooks(Ctx, Name, Args) ->
@ -365,24 +462,20 @@ run_hooks(Ctx, Name, Args, Acc) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run_fold(Name, Args, Acc).
reply(Channel, Method, Payload, Req) ->
call_session(reply, [Req, Method, Payload], Channel).
ack(Channel, Method, Payload, Req) ->
call_session(piggyback, [Req, Method, Payload], Channel).
call_session(F,
A,
#channel{session = Session,
config = Cfg} = Channel) ->
case erlang:apply(emqx_coap_session, F, A ++ [Cfg, Session]) of
#{out := Out,
session := Session2} ->
{ok, {outgoing, Out}, Channel#channel{session = Session2}};
#{out := Out} ->
{ok, {outgoing, Out}, Channel};
#{session := Session2} ->
{ok, Channel#channel{session = Session2}};
_ ->
{ok, Channel}
convert_queries(#coap_message{options = Opts} = Msg) ->
case maps:get(uri_query, Opts, undefined) of
undefined ->
{ok, Msg#coap_message{options = Opts#{uri_query => #{}}}};
Queries ->
convert_queries(Queries, #{}, Msg)
end.
convert_queries([H | T], Queries, Msg) ->
case re:split(H, "=") of
[Key, Val] ->
convert_queries(T, Queries#{Key => Val}, Msg);
_ ->
error
end;
convert_queries([], Queries, #coap_message{options = Opts} = Msg) ->
{ok, Msg#coap_message{options = Opts#{uri_query => Queries}}}.

View File

@ -161,9 +161,7 @@ encode_option(location_query, OptVal) -> {?OPTION_LOCATION_QUERY, OptVal};
encode_option(proxy_uri, OptVal) -> {?OPTION_PROXY_URI, OptVal};
encode_option(proxy_scheme, OptVal) -> {?OPTION_PROXY_SCHEME, OptVal};
encode_option(size1, OptVal) -> {?OPTION_SIZE1, binary:encode_unsigned(OptVal)};
%% draft-ietf-ore-observe-16
encode_option(observe, OptVal) -> {?OPTION_OBSERVE, binary:encode_unsigned(OptVal)};
%% draft-ietf-ore-block-17
encode_option(block2, OptVal) -> {?OPTION_BLOCK2, encode_block(OptVal)};
encode_option(block1, OptVal) -> {?OPTION_BLOCK1, encode_block(OptVal)};
%% unknown opton

View File

@ -57,17 +57,14 @@ init([]) ->
%%--------------------------------------------------------------------
on_insta_create(_Insta = #{id := InstaId,
rawconf := #{resource := Resource} = RawConf
rawconf := RawConf
}, Ctx, _GwState) ->
ResourceMod = get_resource_mod(Resource),
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
ListenerPids = lists:map(fun(Lis) ->
start_listener(InstaId, Ctx, ResourceMod, Lis)
start_listener(InstaId, Ctx, Lis)
end, Listeners),
{ok, ResCtx} = ResourceMod:init(RawConf),
{ok, ListenerPids, #{ctx => Ctx,
res_ctx => ResCtx}}.
{ok, ListenerPids, #{ctx => Ctx}}.
on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
InstaId = maps:get(id, NewInsta),
@ -85,12 +82,10 @@ on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) ->
end.
on_insta_destroy(_Insta = #{ id := InstaId,
rawconf := #{resource := Resource} = RawConf
rawconf := RawConf
},
#{res_ctx := ResCtx} = _GwInstaState,
_GwInstaState,
_GWState) ->
ResourceMod = get_resource_mod(Resource),
ok = ResourceMod:stop(ResCtx),
Listeners = emqx_gateway_utils:normalize_rawconf(RawConf),
lists:foreach(fun(Lis) ->
stop_listener(InstaId, Lis)
@ -100,10 +95,9 @@ on_insta_destroy(_Insta = #{ id := InstaId,
%% Internal funcs
%%--------------------------------------------------------------------
start_listener(InstaId, Ctx, ResourceMod, {Type, ListenOn, SocketOpts, Cfg}) ->
start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) ->
ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn),
Cfg2 = Cfg#{resource => ResourceMod},
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg2) of
case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of
{ok, Pid} ->
?ULOG("Start coap ~s:~s listener on ~s successfully.~n",
[InstaId, Type, ListenOnStr]),
@ -148,8 +142,3 @@ stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) ->
stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) ->
Name = name(InstaId, Type),
esockd:close(Name, ListenOn).
get_resource_mod(mqtt) ->
emqx_coap_mqtt_resource;
get_resource_mod(pubsub) ->
emqx_coap_pubsub_resource.

View File

@ -24,7 +24,13 @@
%% convenience functions for message construction
-module(emqx_coap_message).
-export([request/2, request/3, request/4, ack/1, response/1, response/2, response/3]).
-export([ request/2, request/3, request/4
, ack/1, response/1, response/2
, reset/1, piggyback/2, piggyback/3
, response/3]).
-export([is_request/1]).
-export([set/3, set_payload/2, get_content/1, set_content/2, set_content/3, get_option/2]).
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
@ -42,10 +48,13 @@ request(Type, Method, Content=#coap_content{}, Options) ->
set_content(Content,
#coap_message{type = Type, method = Method, options = Options}).
ack(Request = #coap_message{}) ->
#coap_message{type = ack,
id = Request#coap_message.id}.
ack(#coap_message{id = Id}) ->
#coap_message{type = ack, id = Id}.
reset(#coap_message{id = Id}) ->
#coap_message{type = reset, id = Id}.
%% just make a response
response(#coap_message{type = Type,
id = Id,
token = Token}) ->
@ -61,6 +70,19 @@ response(Method, Payload, Request) ->
set_payload(Payload,
response(Request))).
%% make a response which maybe is a piggyback ack
piggyback(Method, Request) ->
piggyback(Method, <<>>, Request).
piggyback(Method, Payload, Request) ->
Reply = response(Method, Payload, Request),
case Reply of
#coap_message{type = con} ->
Reply#coap_message{type = ack};
_ ->
Reply
end.
%% omit option for its default value
set(max_age, ?DEFAULT_MAX_AGE, Msg) -> Msg;
@ -144,3 +166,9 @@ set_payload_block(Content, BlockId, {Num, _, Size}, Msg) ->
set(BlockId, {Num, false, Size},
set_payload(binary:part(Content, OffsetBegin, ContentSize - OffsetBegin), Msg))
end.
is_request(#coap_message{method = Method}) when is_atom(Method) ->
Method =/= undefined;
is_request(_) ->
false.

View File

@ -18,7 +18,7 @@
%% API
-export([ new_manager/0, insert/3, remove/2
, res_changed/2, foreach/2]).
, res_changed/2, foreach/2, subscriptions/1]).
-export_type([manager/0]).
-define(MAX_SEQ_ID, 16777215).
@ -40,14 +40,15 @@
new_manager() ->
#{}.
-spec insert(topic(), token(), manager()) -> manager().
-spec insert(topic(), token(), manager()) -> {seq_id(), manager()}.
insert(Topic, Token, Manager) ->
case maps:get(Topic, Manager, undefined) of
undefined ->
Manager#{Topic => new_res(Token)};
_ ->
Manager
end.
Res = case maps:get(Topic, Manager, undefined) of
undefined ->
new_res(Token);
Any ->
Any
end,
{maps:get(seq_id, Res), Manager#{Topic => Res}}.
-spec remove(topic(), manager()) -> manager().
remove(Topic, Manager) ->
@ -72,6 +73,9 @@ foreach(F, Manager) ->
Manager),
ok.
subscriptions(Manager) ->
maps:keys(Manager).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -23,11 +23,14 @@
%% API
-export([new/0, transfer_result/3]).
-export([ received/3
, reply/4
, reply/5
, ack/3
, piggyback/4
-export([ info/1
, info/2
, stats/1
]).
-export([ handle_request/3
, handle_response/3
, handle_out/3
, deliver/3
, timeout/3]).
@ -36,10 +39,31 @@
-record(session, { transport_manager :: emqx_coap_tm:manager()
, observe_manager :: emqx_coap_observe_res:manager()
, next_msg_id :: coap_message_id()
, created_at :: pos_integer()
}).
-type session() :: #session{}.
%% steal from emqx_session
-define(INFO_KEYS, [subscriptions,
upgrade_qos,
retry_interval,
await_rel_timeout,
created_at
]).
-define(STATS_KEYS, [subscriptions_cnt,
subscriptions_max,
inflight_cnt,
inflight_max,
mqueue_len,
mqueue_max,
mqueue_dropped,
next_pkt_id,
awaiting_rel_cnt,
awaiting_rel_max
]).
%%%-------------------------------------------------------------------
%%% API
%%%-------------------------------------------------------------------
@ -48,125 +72,163 @@ new() ->
_ = emqx_misc:rand_seed(),
#session{ transport_manager = emqx_coap_tm:new()
, observe_manager = emqx_coap_observe_res:new_manager()
, next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}.
, next_msg_id = rand:uniform(?MAX_MESSAGE_ID)
, created_at = erlang:system_time(millisecond)}.
%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------
%% @doc Compatible with emqx_session
%% do we need use inflight and mqueue in here?
-spec(info(session()) -> emqx_types:infos()).
info(Session) ->
maps:from_list(info(?INFO_KEYS, Session)).
info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys];
info(subscriptions, #session{observe_manager = OM}) ->
emqx_coap_observe_res:subscriptions(OM);
info(subscriptions_cnt, #session{observe_manager = OM}) ->
erlang:length(emqx_coap_observe_res:subscriptions(OM));
info(subscriptions_max, _) ->
infinity;
info(upgrade_qos, _) ->
?QOS_0;
info(inflight, _) ->
emqx_inflight:new();
info(inflight_cnt, _) ->
0;
info(inflight_max, _) ->
0;
info(retry_interval, _) ->
infinity;
info(mqueue, _) ->
emqx_mqueue:init(#{max_len => 0, store_qos0 => false});
info(mqueue_len, #session{transport_manager = TM}) ->
maps:size(TM);
info(mqueue_max, _) ->
0;
info(mqueue_dropped, _) ->
0;
info(next_pkt_id, #session{next_msg_id = PacketId}) ->
PacketId;
info(awaiting_rel, _) ->
#{};
info(awaiting_rel_cnt, _) ->
0;
info(awaiting_rel_max, _) ->
infinity;
info(await_rel_timeout, _) ->
infinity;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.
%% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()).
stats(Session) -> info(?STATS_KEYS, Session).
%%%-------------------------------------------------------------------
%%% Process Message
%%%-------------------------------------------------------------------
received(#coap_message{type = ack} = Msg, Cfg, Session) ->
handle_response(Msg, Cfg, Session);
handle_request(Msg, Ctx, Session) ->
call_transport_manager(?FUNCTION_NAME,
Msg,
Ctx,
[fun process_tm/3, fun process_subscribe/3],
Session).
received(#coap_message{type = reset} = Msg, Cfg, Session) ->
handle_response(Msg, Cfg, Session);
handle_response(Msg, Ctx, Session) ->
call_transport_manager(?FUNCTION_NAME, Msg, Ctx, [fun process_tm/3], Session).
received(#coap_message{method = Method} = Msg, Cfg, Session) when is_atom(Method) ->
handle_request(Msg, Cfg, Session);
handle_out(Msg, Ctx, Session) ->
call_transport_manager(?FUNCTION_NAME, Msg, Ctx, [fun process_tm/3], Session).
received(Msg, Cfg, Session) ->
handle_response(Msg, Cfg, Session).
reply(Req, Method, Cfg, Session) ->
reply(Req, Method, <<>>, Cfg, Session).
reply(Req, Method, Payload, Cfg, Session) ->
Response = emqx_coap_message:response(Method, Payload, Req),
handle_out(Response, Cfg, Session).
ack(Req, Cfg, Session) ->
piggyback(Req, <<>>, Cfg, Session).
piggyback(Req, Payload, Cfg, Session) ->
Response = emqx_coap_message:ack(Req),
Response2 = emqx_coap_message:set_payload(Payload, Response),
handle_out(Response2, Cfg, Session).
deliver(Delivers, Cfg, Session) ->
deliver(Delivers, Ctx, Session) ->
Fun = fun({_, Topic, Message},
#{out := OutAcc,
session := #session{observe_manager = OM,
next_msg_id = MsgId} = SAcc} = Acc) ->
next_msg_id = MsgId,
transport_manager = TM} = SAcc} = Acc) ->
case emqx_coap_observe_res:res_changed(Topic, OM) of
undefined ->
Acc;
{Token, SeqId, OM2} ->
Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg),
SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId),
Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Ctx),
SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId, TM),
observe_manager = OM2},
#{out := Out} = Result = call_transport_manager(handle_out, Msg, Cfg, SAcc2),
#{out := Out} = Result = handle_out(Msg, Ctx, SAcc2),
Result#{out := [Out | OutAcc]}
end
end,
lists:foldl(Fun,
#{out => [],
session => Session},
#{out => [], session => Session},
Delivers).
timeout(Timer, Cfg, Session) ->
call_transport_manager(?FUNCTION_NAME, Timer, Cfg, Session).
timeout(Timer, Ctx, Session) ->
call_transport_manager(?FUNCTION_NAME, Timer, Ctx, [fun process_tm/3], Session).
result_keys() ->
[tm, subscribe] ++ emqx_coap_channel:result_keys().
transfer_result(From, Value, Result) ->
?TRANSFER_RESULT([out, subscribe], From, Value, Result).
?TRANSFER_RESULT(From, Value, Result).
%%%-------------------------------------------------------------------
%%% Internal functions
%%%-------------------------------------------------------------------
handle_request(Msg, Cfg, Session) ->
call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session).
handle_response(Msg, Cfg, Session) ->
call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session).
handle_out(Msg, Cfg, Session) ->
call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session).
call_transport_manager(Fun,
Msg,
Cfg,
Ctx,
Processor,
#session{transport_manager = TM} = Session) ->
try
Result = emqx_coap_tm:Fun(Msg, Cfg, TM),
{ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2,
fun process_subscribe/2],
Result,
Session),
emqx_coap_channel:transfer_result(session, Session2, Result)
Result = emqx_coap_tm:Fun(Msg, Ctx, TM),
{ok, Result2, Session2} = pipeline(Processor,
Result,
Msg,
Session),
emqx_coap_channel:transfer_result(session, Session2, Result2)
catch Type:Reason:Stack ->
?ERROR("process transmission with, message:~p failed~n
Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]),
#{out => emqx_coap_message:response({error, internal_server_error}, Msg)}
?REPLY({error, internal_server_error}, Msg)
end.
process_tm(#{tm := TM}, Session) ->
process_tm(#{tm := TM}, _, Session) ->
{ok, Session#session{transport_manager = TM}};
process_tm(_, Session) ->
process_tm(_, _, Session) ->
{ok, Session}.
process_subscribe(#{subscribe := Sub}, #session{observe_manager = OM} = Session) ->
process_subscribe(#{subscribe := Sub} = Result,
Msg,
#session{observe_manager = OM} = Session) ->
case Sub of
undefined ->
{ok, Session};
{ok, Result, Session};
{Topic, Token} ->
OM2 = emqx_coap_observe_res:insert(Topic, Token, OM),
{ok, Session#session{observe_manager = OM2}};
{SeqId, OM2} = emqx_coap_observe_res:insert(Topic, Token, OM),
Replay = emqx_coap_message:piggyback({ok, content}, Msg),
Replay2 = Replay#coap_message{options = #{observe => SeqId}},
{ok, Result#{reply => Replay2}, Session#session{observe_manager = OM2}};
Topic ->
OM2 = emqx_coap_observe_res:remove(Topic, OM),
{ok, Session#session{observe_manager = OM2}}
Replay = emqx_coap_message:piggyback({ok, nocontent}, Msg),
{ok, Result#{reply => Replay}, Session#session{observe_manager = OM2}}
end;
process_subscribe(_, Session) ->
{ok, Session}.
process_subscribe(Result, _, Session) ->
{ok, Result, Session}.
mqtt_to_coap(MQTT, MsgId, Token, SeqId, Cfg) ->
mqtt_to_coap(MQTT, MsgId, Token, SeqId, Ctx) ->
#message{payload = Payload} = MQTT,
#coap_message{type = get_notify_type(MQTT, Cfg),
#coap_message{type = get_notify_type(MQTT, Ctx),
method = {ok, content},
id = MsgId,
token = Token,
payload = Payload,
options = #{observe => SeqId,
max_age => get_max_age(MQTT)}}.
options = #{observe => SeqId}}.
get_notify_type(#message{qos = Qos}, #{notify_type := Type}) ->
case Type of
get_notify_type(#message{qos = Qos}, Ctx) ->
case emqx_coap_channel:get_config(notify_type, Ctx) of
qos ->
case Qos of
?QOS_0 ->
@ -178,18 +240,31 @@ get_notify_type(#message{qos = Qos}, #{notify_type := Type}) ->
Other
end.
-spec get_max_age(#message{}) -> max_age().
get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
?MAXIMUM_MAX_AGE;
get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
timestamp = Ts}) ->
Now = erlang:system_time(millisecond),
Diff = (Now - Ts + Interval * 1000) / 1000,
erlang:max(1, erlang:floor(Diff));
get_max_age(_) ->
?DEFAULT_MAX_AGE.
next_msg_id(MsgId, TM) ->
next_msg_id(MsgId + 1, MsgId, TM).
next_msg_id(MsgId) when MsgId >= ?MAX_MESSAGE_ID ->
1;
next_msg_id(MsgId) ->
MsgId + 1.
next_msg_id(MsgId, MsgId, _) ->
erlang:throw("too many message in delivering");
next_msg_id(MsgId, BeginId, TM) when MsgId >= ?MAX_MESSAGE_ID ->
check_is_inused(1, BeginId, TM);
next_msg_id(MsgId, BeginId, TM) ->
check_is_inused(MsgId, BeginId, TM).
check_is_inused(NewMsgId, BeginId, TM) ->
case emqx_coap_tm:is_inused(out, NewMsgId, TM) of
false ->
NewMsgId;
_ ->
next_msg_id(NewMsgId + 1, BeginId, TM)
end.
pipeline([Fun | T], Result, Msg, Session) ->
case Fun(Result, Msg, Session) of
{ok, Session2} ->
pipeline(T, Result, Msg, Session2);
{ok, Result2, Session2} ->
pipeline(T, Result2, Msg, Session2)
end;
pipeline([], Result, _, Session) ->
{ok, Result, Session}.

View File

@ -21,7 +21,8 @@
, handle_request/3
, handle_response/3
, handle_out/3
, timeout/3]).
, timeout/3
, is_inused/3]).
-export_type([manager/0, event_result/1]).
@ -60,18 +61,18 @@
new() ->
#{}.
handle_request(#coap_message{id = MsgId} = Msg, Cfg, TM) ->
handle_request(#coap_message{id = MsgId} = Msg, Ctx, TM) ->
Id = {in, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
Transport = emqx_coap_transport:new(),
Machine = new_state_machine(Id, Transport),
process_event(in, Msg, TM, Machine, Cfg);
process_event(in, Msg, TM, Ctx, Machine);
Machine ->
process_event(in, Msg, TM, Machine, Cfg)
process_event(in, Msg, TM, Ctx, Machine)
end.
handle_response(#coap_message{type = Type, id = MsgId} = Msg, Cfg, TM) ->
handle_response(#coap_message{type = Type, id = MsgId} = Msg, Ctx, TM) ->
Id = {out, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
@ -79,26 +80,25 @@ handle_response(#coap_message{type = Type, id = MsgId} = Msg, Cfg, TM) ->
reset ->
?EMPTY_RESULT;
_ ->
#{out => #coap_message{type = reset,
id = MsgId}}
?RESET(Msg)
end;
Machine ->
process_event(in, Msg, TM, Machine, Cfg)
process_event(in, Msg, TM, Ctx, Machine)
end.
handle_out(#coap_message{id = MsgId} = Msg, Cfg, TM) ->
handle_out(#coap_message{id = MsgId} = Msg, Ctx, TM) ->
Id = {out, MsgId},
case maps:get(Id, TM, undefined) of
undefined ->
Transport = emqx_coap_transport:new(),
Machine = new_state_machine(Id, Transport),
process_event(out, Msg, TM, Machine, Cfg);
process_event(out, Msg, TM, Ctx, Machine);
_ ->
?WARN("Repeat sending message with id:~p~n", [Id]),
%% ignore repeat send
?EMPTY_RESULT
end.
timeout({Id, Type, Msg}, Cfg, TM) ->
timeout({Id, Type, Msg}, Ctx, TM) ->
case maps:get(Id, TM, undefined) of
undefined ->
?EMPTY_RESULT;
@ -106,12 +106,16 @@ timeout({Id, Type, Msg}, Cfg, TM) ->
%% maybe timer has been canceled
case maps:is_key(Type, Timers) of
true ->
process_event(Type, Msg, TM, Machine, Cfg);
process_event(Type, Msg, TM, Ctx, Machine);
_ ->
?EMPTY_RESULT
end
end.
-spec is_inused(direction(), message_id(), manager()) -> boolean().
is_inused(Dir, Msg, Manager) ->
maps:is_key({Dir, Msg}, Manager).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -124,9 +128,9 @@ new_state_machine(Id, Transport) ->
process_event(stop_timeout,
_,
TM,
_,
#state_machine{id = Id,
timers = Timers},
_) ->
timers = Timers}) ->
lists:foreach(fun({_, Ref}) ->
emqx_misc:cancel_timer(Ref)
end,
@ -136,11 +140,11 @@ process_event(stop_timeout,
process_event(Event,
Msg,
TM,
Ctx,
#state_machine{id = Id,
state = State,
transport = Transport} = Machine,
Cfg) ->
Result = emqx_coap_transport:State(Event, Msg, Transport, Cfg),
transport = Transport} = Machine) ->
Result = emqx_coap_transport:State(Event, Msg, Ctx, Transport),
{ok, _, Machine2} = emqx_misc:pipeline([fun process_state_change/2,
fun process_transport_change/2,
fun process_timeouts/2],

View File

@ -21,6 +21,8 @@
-export_type([transport/0]).
-import(emqx_coap_message, [reset/1]).
-spec new() -> transport().
new() ->
#transport{cache = undefined,
@ -28,54 +30,33 @@ new() ->
retry_count = 0}.
idle(in,
#coap_message{type = non, id = MsgId, method = Method} = Msg,
_,
#{resource := Resource} = Cfg) ->
#coap_message{type = non, method = Method} = Msg,
Ctx,
_) ->
Ret = #{next => until_stop,
timeouts => [{stop_timeout, ?NON_LIFETIME}]},
case Method of
undefined ->
Ret#{out => #coap_message{type = reset, id = MsgId}};
?RESET(Msg);
_ ->
case erlang:apply(Resource, Method, [Msg, Cfg]) of
#coap_message{} = Result ->
Ret#{out => Result};
{has_sub, Result, Sub} ->
Ret#{out => Result, subscribe => Sub};
error ->
Ret#{out =>
emqx_coap_message:response({error, internal_server_error}, Msg)}
end
Result = call_handler(Msg, Ctx),
maps:merge(Ret, Result)
end;
idle(in,
#coap_message{id = MsgId,
type = con,
method = Method} = Msg,
Transport,
#{resource := Resource} = Cfg) ->
#coap_message{type = con, method = Method} = Msg,
Ctx,
Transport) ->
Ret = #{next => maybe_resend,
timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]},
case Method of
undefined ->
ResetMsg = #coap_message{type = reset, id = MsgId},
ResetMsg = reset(Msg),
Ret#{transport => Transport#transport{cache = ResetMsg},
out => ResetMsg};
_ ->
{RetMsg, SubInfo} =
case erlang:apply(Resource, Method, [Msg, Cfg]) of
#coap_message{} = Result ->
{Result, undefined};
{has_sub, Result, Sub} ->
{Result, Sub};
error ->
{emqx_coap_message:response({error, internal_server_error}, Msg),
undefined}
end,
RetMsg2 = RetMsg#coap_message{type = ack},
Ret#{out => RetMsg2,
transport => Transport#transport{cache = RetMsg2},
subscribe => SubInfo}
Result = call_handler(Msg, Ctx),
maps:merge(Ret, Result)
end;
idle(out, #coap_message{type = non} = Msg, _, _) ->
@ -83,7 +64,7 @@ idle(out, #coap_message{type = non} = Msg, _, _) ->
out => Msg,
timeouts => [{stop_timeout, ?NON_LIFETIME}]};
idle(out, Msg, Transport, _) ->
idle(out, Msg, _, Transport) ->
_ = emqx_misc:rand_seed(),
Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
#{next => wait_ack,
@ -133,3 +114,13 @@ wait_ack(state_timeout,
until_stop(_, _, _, _) ->
?EMPTY_RESULT.
call_handler(#coap_message{options = Opts} = Msg, Ctx) ->
case maps:get(uri_path, Opts, undefined) of
[<<"ps">> | RestPath] ->
emqx_coap_pubsub_handler:handle_request(RestPath, Msg, Ctx);
[<<"mqtt">> | RestPath] ->
emqx_coap_mqtt_handler:handle_request(RestPath, Msg, Ctx);
_ ->
?REPLY({error, bad_request}, Msg)
end.

View File

@ -0,0 +1,40 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_mqtt_handler).
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-export([handle_request/3]).
-import(emqx_coap_message, [response/2, response/3]).
handle_request([<<"connection">>], #coap_message{method = Method} = Msg, _) ->
handle_method(Method, Msg);
handle_request(_, Msg, _) ->
?REPLY({error, bad_request}, Msg).
handle_method(put, Msg) ->
?REPLY({ok, changed}, Msg);
handle_method(post, _) ->
#{connection => open};
handle_method(delete, _) ->
#{connection => close};
handle_method(_, Msg) ->
?REPLY({error, method_not_allowed}, Msg).

View File

@ -0,0 +1,155 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a coap to mqtt adapter with a retained topic message database
-module(emqx_coap_pubsub_handler).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-export([handle_request/3]).
-import(emqx_coap_message, [response/2, response/3]).
-define(UNSUB(Topic), #{subscribe => Topic}).
-define(SUB(Topic, Token), #{subscribe => {Topic, Token}}).
-define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}).
handle_request(Path, #coap_message{method = Method} = Msg, Ctx) ->
case check_topic(Path) of
{ok, Topic} ->
handle_method(Method, Topic, Msg, Ctx);
_ ->
?REPLY({error, bad_request}, <<"invalid topic">>, Msg)
end.
handle_method(get, Topic, #coap_message{options = Opts} = Msg, Ctx) ->
case maps:get(observe, Opts, undefined) of
0 ->
subscribe(Msg, Topic, Ctx);
1 ->
unsubscribe(Topic, Ctx);
_ ->
?REPLY({error, bad_request}, <<"invalid observe value">>, Msg)
end;
handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx) ->
case emqx_coap_channel:validator(publish, Topic, Ctx) of
allow ->
ClientInfo = emqx_coap_channel:get_clientinfo(Ctx),
#{clientid := ClientId} = ClientInfo,
QOS = get_publish_qos(Msg, Ctx),
MQTTMsg = emqx_message:make(ClientId, QOS, Topic, Payload),
MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg),
_ = emqx_broker:publish(MQTTMsg2),
?REPLY({ok, changed}, Msg);
_ ->
?REPLY({error, unauthorized}, Msg)
end;
handle_method(_, _, Msg, _) ->
?REPLY({error, method_not_allowed}, Msg).
check_topic([]) ->
error;
check_topic(Path) ->
Sep = <<"/">>,
{ok,
emqx_http_lib:uri_decode(
lists:foldl(fun(Part, Acc) ->
<<Acc/binary, Sep/binary, Part/binary>>
end,
<<>>,
Path))}.
get_sub_opts(#coap_message{options = Opts} = Msg, Ctx) ->
SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
case SubOpts of
#{qos := _} ->
maps:merge(SubOpts, ?SUBOPTS);
_ ->
CfgType = emqx_coap_channel:get_config(subscribe_qos, Ctx),
maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)})
end.
parse_sub_opts(<<"qos">>, V, Opts) ->
Opts#{qos => erlang:binary_to_integer(V)};
parse_sub_opts(<<"nl">>, V, Opts) ->
Opts#{nl => erlang:binary_to_integer(V)};
parse_sub_opts(<<"rh">>, V, Opts) ->
Opts#{rh => erlang:binary_to_integer(V)};
parse_sub_opts(_, _, Opts) ->
Opts.
type_to_qos(qos0, _) -> ?QOS_0;
type_to_qos(qos1, _) -> ?QOS_1;
type_to_qos(qos2, _) -> ?QOS_2;
type_to_qos(coap, #coap_message{type = Type}) ->
case Type of
non ->
?QOS_0;
_ ->
?QOS_1
end.
get_publish_qos(#coap_message{options = Opts} = Msg, Ctx) ->
case maps:get(uri_query, Opts) of
#{<<"qos">> := QOS} ->
erlang:binary_to_integer(QOS);
_ ->
CfgType = emqx_coap_channel:get_config(publish_qos, Ctx),
type_to_qos(CfgType, Msg)
end.
apply_publish_opts(#coap_message{options = Opts}, MQTTMsg) ->
maps:fold(fun(<<"retain">>, V, Acc) ->
Val = erlang:binary_to_atom(V),
emqx_message:set_flag(retain, Val, Acc);
(<<"expiry">>, V, Acc) ->
Val = erlang:binary_to_integer(V),
Props = emqx_message:get_header(properties, Acc),
emqx_message:set_header(properties,
Props#{'Message-Expiry-Interval' => Val},
Acc);
(_, _, Acc) ->
Acc
end,
MQTTMsg,
maps:get(uri_query, Opts)).
subscribe(#coap_message{token = <<>>} = Msg, _, _) ->
?REPLY({error, bad_request}, <<"observe without token">>, Msg);
subscribe(#coap_message{token = Token} = Msg, Topic, Ctx) ->
case emqx_coap_channel:validator(subscribe, Topic, Ctx) of
allow ->
ClientInfo = emqx_coap_channel:get_clientinfo(Ctx),
#{clientid := ClientId} = ClientInfo,
SubOpts = get_sub_opts(Msg, Ctx),
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed',
[ClientInfo, Topic, SubOpts]),
?SUB(Topic, Token);
_ ->
?REPLY({error, unauthorized}, Msg)
end.
unsubscribe(Topic, Ctx) ->
ClientInfo = emqx_coap_channel:get_clientinfo(Ctx),
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, ?SUBOPTS]),
?UNSUB(Topic).

View File

@ -23,12 +23,17 @@
-define(MAXIMUM_MAX_AGE, 4294967295).
-define(EMPTY_RESULT, #{}).
-define(TRANSFER_RESULT(Keys, From, Value, R1),
-define(TRANSFER_RESULT(From, Value, R1),
begin
Keys = result_keys(),
R2 = maps:with(Keys, R1),
R2#{From => Value}
end).
-define(RESET(Msg), #{out => emqx_coap_message:reset(Msg)}).
-define(REPLY(Resp, Payload, Msg), #{out => emqx_coap_message:piggyback(Resp, Payload, Msg)}).
-define(REPLY(Resp, Msg), ?REPLY(Resp, <<>>, Msg)).
-type coap_message_id() :: 1 .. ?MAX_MESSAGE_ID.
-type message_type() :: con | non | ack | reset.
-type max_age() :: 1 .. ?MAXIMUM_MAX_AGE.

View File

@ -1,153 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a coap to mqtt adapter
-module(emqx_coap_mqtt_resource).
-behaviour(emqx_coap_resource).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-export([ init/1
, stop/1
, get/2
, put/2
, post/2
, delete/2
]).
-export([ check_topic/1
, publish/3
, subscribe/3
, unsubscribe/3]).
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, is_new => false}).
init(_) ->
{ok, undefined}.
stop(_) ->
ok.
%% get: subscribe, ignore observe option
get(#coap_message{token = Token} = Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
case Token of
<<>> ->
emqx_coap_message:response({error, bad_request}, <<"observer without token">>, Msg);
_ ->
Ret = subscribe(Msg, Topic, Cfg),
RetMsg = emqx_coap_message:response(Ret, Msg),
case Ret of
{ok, _} ->
{has_sub, RetMsg, {Topic, Token}};
_ ->
RetMsg
end
end;
Any ->
Any
end.
%% put: equal post
put(Msg, Cfg) ->
post(Msg, Cfg).
%% post: publish a message
post(Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
emqx_coap_message:response(publish(Msg, Topic, Cfg), Msg);
Any ->
Any
end.
%% delete: ubsubscribe
delete(Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
unsubscribe(Msg, Topic, Cfg),
{has_sub, emqx_coap_message:response({ok, deleted}, Msg), Topic};
Any ->
Any
end.
check_topic(#coap_message{options = Options} = Msg) ->
case maps:get(uri_path, Options, []) of
[] ->
emqx_coap_message:response({error, bad_request}, <<"invalid topic">> , Msg);
UriPath ->
Sep = <<"/">>,
{ok, lists:foldl(fun(Part, Acc) ->
<<Acc/binary, Sep/binary, Part/binary>>
end,
<<>>,
UriPath)}
end.
publish(#coap_message{payload = Payload} = Msg,
Topic,
#{clientinfo := ClientInfo,
publish_qos := QOS} = Cfg) ->
case emqx_coap_channel:auth_publish(Topic, Cfg) of
allow ->
#{clientid := ClientId} = ClientInfo,
MQTTMsg = emqx_message:make(ClientId, type_to_qos(QOS, Msg), Topic, Payload),
MQTTMsg2 = emqx_message:set_flag(retain, false, MQTTMsg),
_ = emqx_broker:publish(MQTTMsg2),
{ok, changed};
_ ->
{error, unauthorized}
end.
subscribe(Msg, Topic, #{clientinfo := ClientInfo}= Cfg) ->
case emqx_topic:wildcard(Topic) of
false ->
case emqx_coap_channel:auth_subscribe(Topic, Cfg) of
allow ->
#{clientid := ClientId} = ClientInfo,
SubOpts = get_sub_opts(Msg, Cfg),
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
{ok, created};
_ ->
{error, unauthorized}
end;
_ ->
%% now, we don't support wildcard in subscribe topic
{error, bad_request, <<"">>}
end.
unsubscribe(Msg, Topic, #{clientinfo := ClientInfo} = Cfg) ->
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, get_sub_opts(Msg, Cfg)]).
get_sub_opts(Msg, #{subscribe_qos := Type}) ->
?SUBOPTS#{qos => type_to_qos(Type, Msg)}.
type_to_qos(qos0, _) -> ?QOS_0;
type_to_qos(qos1, _) -> ?QOS_1;
type_to_qos(qos2, _) -> ?QOS_2;
type_to_qos(coap, #coap_message{type = Type}) ->
case Type of
non ->
?QOS_0;
_ ->
?QOS_1
end.

View File

@ -1,219 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% a coap to mqtt adapter with a retained topic message database
-module(emqx_coap_pubsub_resource).
-behaviour(emqx_coap_resource).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-export([ init/1
, stop/1
, get/2
, put/2
, post/2
, delete/2
]).
-import(emqx_coap_mqtt_resource, [ check_topic/1, subscribe/3, unsubscribe/3
, publish/3]).
-import(emqx_coap_message, [response/2, response/3, set_content/2]).
%%--------------------------------------------------------------------
%% Resource Callbacks
%%--------------------------------------------------------------------
init(_) ->
emqx_coap_pubsub_topics:start_link().
stop(Pid) ->
emqx_coap_pubsub_topics:stop(Pid).
%% get: read last publish message
%% get with observe 0: subscribe
%% get with observe 1: unsubscribe
get(#coap_message{token = Token} = Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
case emqx_coap_message:get_option(observe, Msg) of
undefined ->
Content = emqx_coap_message:get_content(Msg),
read_last_publish_message(emqx_topic:wildcard(Topic), Msg, Topic, Content);
0 ->
case Token of
<<>> ->
response({error, bad_reuqest}, <<"observe without token">>, Msg);
_ ->
Ret = subscribe(Msg, Topic, Cfg),
RetMsg = response(Ret, Msg),
case Ret of
{ok, _} ->
{has_sub, RetMsg, {Topic, Token}};
_ ->
RetMsg
end
end;
1 ->
unsubscribe(Msg, Topic, Cfg),
{has_sub, response({ok, deleted}, Msg), Topic}
end;
Any ->
Any
end.
%% put: insert a message into topic database
put(Msg, _) ->
case check_topic(Msg) of
{ok, Topic} ->
Content = emqx_coap_message:get_content(Msg),
#coap_content{payload = Payload,
format = Format,
max_age = MaxAge} = Content,
handle_received_create(Msg, Topic, MaxAge, Format, Payload);
Any ->
Any
end.
%% post: like put, but will publish the inserted message
post(Msg, Cfg) ->
case check_topic(Msg) of
{ok, Topic} ->
Content = emqx_coap_message:get_content(Msg),
#coap_content{max_age = MaxAge,
format = Format,
payload = Payload} = Content,
handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg);
Any ->
Any
end.
%% delete: delete a message from topic database
delete(Msg, _) ->
case check_topic(Msg) of
{ok, Topic} ->
delete_topic_info(Msg, Topic);
Any ->
Any
end.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
add_topic_info(Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[{_, StoredMaxAge, StoredCT, _, _}] ->
?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]),
%% check whether the ct value stored matches the ct option in this POST message
case Format =:= StoredCT of
true ->
{ok, Ret} =
case StoredMaxAge =:= MaxAge of
true ->
emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload);
false ->
emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload)
end,
{changed, Ret};
false ->
?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]),
{changed, false}
end;
[] ->
?LOG(debug, "publish topic=~p will be created", [Topic]),
{ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload),
{created, Ret}
end;
add_topic_info(Topic, _MaxAge, _Format, _Payload) ->
?LOG(debug, "create topic=~p info failed", [Topic]),
{badarg, false}.
format_string_to_int(<<"application/octet-stream">>) ->
<<"42">>;
format_string_to_int(<<"application/exi">>) ->
<<"47">>;
format_string_to_int(<<"application/json">>) ->
<<"50">>;
format_string_to_int(_) ->
<<"42">>.
handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg) ->
case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of
{_, true} ->
response(publish(Msg, Topic, Cfg), Msg);
{_, false} ->
?LOG(debug, "add_topic_info failed, will return bad_request", []),
response({error, bad_request}, Msg)
end.
handle_received_create(Msg, Topic, MaxAge, Format, Payload) ->
case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of
{Ret, true} ->
response({ok, Ret}, Msg);
{_, false} ->
?LOG(debug, "add_topic_info failed, will return bad_request", []),
response({error, bad_request}, Msg)
end.
return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content) ->
TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000),
case TimeElapsed < MaxAge of
true ->
LeftTime = (MaxAge - TimeElapsed),
?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]),
set_content(Content#coap_content{max_age = LeftTime, payload = Payload},
response({ok, content}, Msg));
false ->
?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]),
response({ok, nocontent}, Msg)
end.
read_last_publish_message(false, Msg, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)->
?LOG(debug, "the QueryFormat=~p", [QueryFormat]),
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
response({error, not_found}, Msg);
[{_, MaxAge, CT, Payload, TimeStamp}] ->
case CT =:= format_string_to_int(QueryFormat) of
true ->
return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content);
false ->
?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]),
response({error, bad_request}, Msg)
end
end;
read_last_publish_message(false, Msg, Topic, Content) ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
response({error, not_found}, Msg);
[{_, MaxAge, _, Payload, TimeStamp}] ->
return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content)
end;
read_last_publish_message(true, Msg, Topic, _Content) ->
?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]),
response({error, bad_request}, Msg).
delete_topic_info(Msg, Topic) ->
case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of
[] ->
response({error, not_found}, Msg);
[{_, _, _, _, _}] ->
emqx_coap_pubsub_topics:delete_sub_topics(Topic),
response({ok, deleted}, Msg)
end.

View File

@ -1,185 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_pubsub_topics).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl").
-export([ start_link/0
, stop/1
]).
-export([ add_topic_info/4
, delete_topic_info/1
, delete_sub_topics/1
, is_topic_existed/1
, is_topic_timeout/1
, reset_topic_info/2
, reset_topic_info/3
, reset_topic_info/4
, lookup_topic_info/1
, lookup_topic_payload/1
]).
%% gen_server.
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(state, {}).
-define(COAP_TOPIC_TABLE, coap_topic).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop(Pid) ->
gen_server:stop(Pid).
add_topic_info(Topic, MaxAge, CT, Payload) when is_binary(Topic), is_integer(MaxAge), is_binary(CT), is_binary(Payload) ->
gen_server:call(?MODULE, {add_topic, {Topic, MaxAge, CT, Payload}}).
delete_topic_info(Topic) when is_binary(Topic) ->
gen_server:call(?MODULE, {remove_topic, Topic}).
delete_sub_topics(Topic) when is_binary(Topic) ->
gen_server:cast(?MODULE, {remove_sub_topics, Topic}).
reset_topic_info(Topic, Payload) ->
gen_server:call(?MODULE, {reset_topic, {Topic, Payload}}).
reset_topic_info(Topic, MaxAge, Payload) ->
gen_server:call(?MODULE, {reset_topic, {Topic, MaxAge, Payload}}).
reset_topic_info(Topic, MaxAge, CT, Payload) ->
gen_server:call(?MODULE, {reset_topic, {Topic, MaxAge, CT, Payload}}).
is_topic_existed(Topic) ->
ets:member(?COAP_TOPIC_TABLE, Topic).
is_topic_timeout(Topic) when is_binary(Topic) ->
[{Topic, MaxAge, _, _, TimeStamp}] = ets:lookup(?COAP_TOPIC_TABLE, Topic),
%% MaxAge: x seconds
MaxAge < ((erlang:system_time(millisecond) - TimeStamp) / 1000).
lookup_topic_info(Topic) ->
ets:lookup(?COAP_TOPIC_TABLE, Topic).
lookup_topic_payload(Topic) ->
try ets:lookup_element(?COAP_TOPIC_TABLE, Topic, 4)
catch
error:badarg -> undefined
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
_ = ets:new(?COAP_TOPIC_TABLE, [set, named_table, protected]),
?LOG(debug, "Create the coap_topic table", []),
{ok, #state{}}.
handle_call({add_topic, {Topic, MaxAge, CT, Payload}}, _From, State) ->
Ret = create_table_element(Topic, MaxAge, CT, Payload),
{reply, {ok, Ret}, State, hibernate};
handle_call({reset_topic, {Topic, Payload}}, _From, State) ->
Ret = update_table_element(Topic, Payload),
{reply, {ok, Ret}, State, hibernate};
handle_call({reset_topic, {Topic, MaxAge, Payload}}, _From, State) ->
Ret = update_table_element(Topic, MaxAge, Payload),
{reply, {ok, Ret}, State, hibernate};
handle_call({reset_topic, {Topic, MaxAge, CT, Payload}}, _From, State) ->
Ret = update_table_element(Topic, MaxAge, CT, Payload),
{reply, {ok, Ret}, State, hibernate};
handle_call({remove_topic, {Topic, _Content}}, _From, State) ->
ets:delete(?COAP_TOPIC_TABLE, Topic),
?LOG(debug, "Remove topic ~p in the coap_topic table", [Topic]),
{reply, ok, State, hibernate};
handle_call(Request, _From, State) ->
?LOG(error, "adapter unexpected call ~p", [Request]),
{reply, ignored, State, hibernate}.
handle_cast({remove_sub_topics, TopicPrefix}, State) ->
DeletedTopicNum = ets:foldl(fun ({Topic, _, _, _, _}, AccIn) ->
case binary:match(Topic, TopicPrefix) =/= nomatch of
true ->
?LOG(debug, "Remove topic ~p in the coap_topic table", [Topic]),
ets:delete(?COAP_TOPIC_TABLE, Topic),
AccIn + 1;
false ->
AccIn
end
end, 0, ?COAP_TOPIC_TABLE),
?LOG(debug, "Remove number of ~p topics with prefix=~p in the coap_topic table", [DeletedTopicNum, TopicPrefix]),
{noreply, State, hibernate};
handle_cast(Msg, State) ->
?LOG(error, "broker_api unexpected cast ~p", [Msg]),
{noreply, State, hibernate}.
handle_info(Info, State) ->
?LOG(error, "adapter unexpected info ~p", [Info]),
{noreply, State, hibernate}.
terminate(Reason, #state{}) ->
ets:delete(?COAP_TOPIC_TABLE),
Level = case Reason =:= normal orelse Reason =:= shutdown of
true -> debug;
false -> error
end,
?SLOG(Level, #{terminate_reason => Reason}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
create_table_element(Topic, MaxAge, CT, Payload) ->
TopicInfo = {Topic, MaxAge, CT, Payload, erlang:system_time(millisecond)},
?LOG(debug, "Insert ~p in the coap_topic table", [TopicInfo]),
ets:insert_new(?COAP_TOPIC_TABLE, TopicInfo).
update_table_element(Topic, Payload) ->
?LOG(debug, "Update the topic=~p only with Payload", [Topic]),
ets:update_element(?COAP_TOPIC_TABLE, Topic, [{4, Payload}, {5, erlang:system_time(millisecond)}]).
update_table_element(Topic, MaxAge, Payload) ->
?LOG(debug, "Update the topic=~p info of MaxAge=~p and Payload", [Topic, MaxAge]),
ets:update_element(?COAP_TOPIC_TABLE, Topic, [{2, MaxAge}, {4, Payload}, {5, erlang:system_time(millisecond)}]).
update_table_element(Topic, MaxAge, CT, <<>>) ->
?LOG(debug, "Update the topic=~p info of MaxAge=~p, CT=~p, payload=<<>>", [Topic, MaxAge, CT]),
ets:update_element(?COAP_TOPIC_TABLE, Topic, [{2, MaxAge}, {3, CT}, {5, erlang:system_time(millisecond)}]).

View File

@ -31,6 +31,7 @@
-export([start_link/1]).
-export([ open_session/5
, open_session/6
, kick_session/2
, kick_session/3
, register_channel/4
@ -225,28 +226,32 @@ connection_closed(Type, ClientId) ->
}}
| {error, any()}.
open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session).
open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
Self = self(),
ClientId = maps:get(clientid, ClientInfo),
Fun = fun(_) ->
ok = discard_session(Type, ClientId),
Session = create_session(Type,
ClientInfo,
ConnInfo,
CreateSessionFun
),
register_channel(Type, ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
ok = discard_session(Type, ClientId),
Session = create_session(Type,
ClientInfo,
ConnInfo,
CreateSessionFun,
SessionMod
),
register_channel(Type, ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end,
locker_trans(Type, ClientId, Fun);
open_session(_Type, false = _CleanStart,
_ClientInfo, _ConnInfo, _CreateSessionFun) ->
_ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) ->
%% TODO:
{error, not_supported_now}.
%% @private
create_session(Type, ClientInfo, ConnInfo, CreateSessionFun) ->
create_session(Type, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
try
Session = emqx_gateway_utils:apply(
CreateSessionFun,
@ -255,7 +260,7 @@ create_session(Type, ClientInfo, ConnInfo, CreateSessionFun) ->
ok = emqx_gateway_metrics:inc(Type, 'session.created'),
SessionInfo = case is_tuple(Session)
andalso element(1, Session) == session of
true -> emqx_session:info(Session);
true -> SessionMod:info(Session);
_ ->
case is_map(Session) of
false ->

View File

@ -40,6 +40,7 @@
%% Authentication circle
-export([ authenticate/2
, open_session/5
, open_session/6
, insert_channel_info/4
, set_chan_info/3
, set_chan_stats/3
@ -96,15 +97,18 @@ authenticate(_Ctx, ClientInfo) ->
pendings => list()
}}
| {error, any()}.
open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session).
open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
logger:warning("clean_start=false is not supported now, "
"fallback to clean_start mode"),
open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun);
open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod);
open_session(_Ctx = #{type := Type},
CleanStart, ClientInfo, ConnInfo, CreateSessionFun) ->
CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
emqx_gateway_cm:open_session(Type, CleanStart,
ClientInfo, ConnInfo, CreateSessionFun).
ClientInfo, ConnInfo, CreateSessionFun, SessionMod).
-spec insert_channel_info(context(),
emqx_types:clientid(),
@ -132,7 +136,7 @@ connection_closed(_Ctx = #{type := Type}, ClientId) ->
-spec authorize(context(), emqx_types:clientinfo(),
emqx_types:pubsub(), emqx_types:topic())
-> allow | deny.
-> allow | deny.
authorize(_Ctx, ClientInfo, PubSub, Topic) ->
emqx_access_control:authorize(ClientInfo, PubSub, Topic).

View File

@ -215,8 +215,7 @@ fields(coap) ->
fields(coap_structs) ->
[ {enable_stats, t(boolean(), undefined, true)}
, {authentication, t(ref(authentication))}
, {heartbeat, t(duration(), undefined, "15s")}
, {resource, t(union([mqtt, pubsub]), undefined, mqtt)}
, {heartbeat, t(duration(), undefined, "30s")}
, {notify_type, t(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}