diff --git a/apps/emqx_gateway/etc/emqx_gateway.conf b/apps/emqx_gateway/etc/emqx_gateway.conf index 16315b012..6c7928174 100644 --- a/apps/emqx_gateway/etc/emqx_gateway.conf +++ b/apps/emqx_gateway/etc/emqx_gateway.conf @@ -39,9 +39,22 @@ gateway: { coap.1: { enable_stats: false - authentication.enable: false + + authentication: { + enable: true + authenticators: [ + { + name: "authenticator1" + mechanism: password-based + server_type: built-in-database + user_id_type: clientid + } + ] + } + + #authentication.enable: false + heartbeat: 30s - resource: mqtt notify_type: qos subscribe_qos: qos0 publish_qos: qos1 @@ -50,19 +63,6 @@ gateway: { } } - coap.2: { - enable_stats: false - authentication.enable:false - heartbeat: 30s - resource: pubsub - notify_type: non - subscribe_qos: qos2 - publish_qos: coap - listener.udp.1: { - bind: 5687 - } - } - mqttsn.1: { ## The MQTT-SN Gateway ID in ADVERTISE message. gateway_id: 1 diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl index 1a032a017..abd7391bd 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_channel.erl @@ -94,4 +94,3 @@ %% @doc The callback for process terminated -callback terminate(any(), channel()) -> ok. - diff --git a/apps/emqx_gateway/src/coap/README.md b/apps/emqx_gateway/src/coap/README.md index f71938c92..c451d6533 100644 --- a/apps/emqx_gateway/src/coap/README.md +++ b/apps/emqx_gateway/src/coap/README.md @@ -1,190 +1,401 @@ # Table of Contents -1. [EMQX 5.0 CoAP Gateway](#org6feb6de) -2. [CoAP Message Processing Flow](#org8458c1a) - 1. [Request Timing Diagram](#orgeaa4f53) - 1. [Transport && Transport Manager](#org88207b8) - 2. [Resource](#orgb32ce94) -3. [Resource](#org8956f90) - 1. [MQTT Resource](#orge8c21b1) - 2. [PubSub Resource](#org68ddce7) -4. [Heartbeat](#orgffdfecd) -5. [Command](#org43004c2) -6. [MQTT QOS <=> CoAP non/con](#org0157b5c) +1. [EMQX 5.0 CoAP Gateway](#org61e5bb8) + 1. [Features](#orgeddbc94) + 1. [PubSub Handler](#orgfc7be2d) + 2. [MQTT Handler](#org55be508) + 3. [Heartbeat](#org3d1a32e) + 4. [Query String](#org9a6b996) + 2. [Implementation](#org9985dfe) + 1. [Request/Response flow](#orge94210c) - + # EMQX 5.0 CoAP Gateway -emqx-coap is a CoAP Gateway for EMQ X Broker. -It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients. +emqx-coap is a CoAP Gateway for EMQ X Broker. It translates CoAP messages into MQTT messages and make it possible to communiate between CoAP clients and MQTT clients. - + -# CoAP Message Processing Flow +## Features + +- Partially achieves [Publish-Subscribe Broker for the Constrained Application Protocol (CoAP)](https://datatracker.ietf.org/doc/html/draft-ietf-core-coap-pubsub-09) + we called this as ps handler, include following functions: + - Publish + - Subscribe + - UnSubscribe +- Long connection and authorization verification called as MQTT handler - + -## Request Timing Diagram +### PubSub Handler +1. Publish - ,------. ,------------. ,-----------------. ,---------. ,--------. - |client| |coap_gateway| |transport_manager| |transport| |resource| - `--+---' `-----+------' `--------+--------' `----+----' `---+----' - | | | | | - | -------------------> | | | - | | | | | - | | | | | - | | ------------------------>| | | - | | | | | - | | | | | - | | |----------------------->| | - | | | | | - | | | | | - | | | |------------------>| - | | | | | - | | | | | - | | | |<------------------| - | | | | | - | | | | | - | | |<-----------------------| | - | | | | | - | | | | | - | | <------------------------| | | - | | | | | - | | | | | - | <------------------- | | | - ,--+---. ,-----+------. ,--------+--------. ,----+----. ,---+----. - |client| |coap_gateway| |transport_manager| |transport| |resource| - `------' `------------' `-----------------' `---------' `--------' + Method: POST\ + URI Schema: ps/{+topic}{?q\*}\ + q\*: [Shared Options](#orgc50043b)\ + Response: + - 2.04 "Changed" when success + - 4.00 "Bad Request" when error + - 4.01 "Unauthorized" when with wrong auth uri query - +2. Subscribe -### Transport && Transport Manager + Method: GET + Options: -Transport is a module that manages the life cycle and behaviour of CoAP messages\ -And the transport manager is to manage all transport which in this gateway + - Observer = 0 + URI Schema: ps/{+topic}{?q\*}\ + q\*: see [Shared Options](#orgc50043b)\ + Response: - + - 2.05 "Content" when success + - 4.00 "Bad Request" when error + - 4.01 "Unauthorized" when with wrong auth uri query -### Resource - -The Resource is a behaviour that must implement GET/PUT/POST/DELETE method\ -Different Resources can have different implementations of this four method\ -Each gateway can only use one Resource module to process CoAP Request Message - - - - -# Resource - - - - -## MQTT Resource - -The MQTT Resource is a simple CoAP to MQTT adapter, the implementation of each method is as follows: - -- use uri path as topic -- GET: subscribe the topic -- PUT: publish message to this topic -- POST: like PUT -- DELETE: unsubscribe the topic - - - - -## PubSub Resource - -The PubSub Resource like the MQTT Resource, but has a retained topic's message database\ -This Resource is shared, only can has one instance. The implementation: - -- use uri path as topic -- GET: - - GET with observe = 0: subscribe the topic - - GET with observe = 1: unsubscribe the topic - - GET without observe: read lastest message from the message database, key is the topic -- PUT: - insert message into the message database, key is the topic -- POST: - like PUT, but will publish the message -- DELETE: - delete message from the database, key is topic - - - - -# Heartbeat - -At present, the CoAP gateway only supports UDP/DTLS connection, don't support UDP over TCP and UDP over WebSocket. -Because UDP is connectionless, so the client needs to send heartbeat ping to the server interval. Otherwise, the server will close related resources -Use ****POST with empty uri path**** as a heartbeat ping - -example: ``` -coap-client -m post coap://127.0.0.1 + Client1 Client2 Broker + | | Subscribe | + | | ----- GET /ps/topic1 Observe:0 Token:XX ----> | + | | | + | | <---------- 2.05 Content Observe:10---------- | + | | | + | | | + | | Publish | + | ---------|----------- PUT /ps/topic1 "1033.3" --------> | + | | Notify | + | | <---------- 2.05 Content Observe:11 --------- | + | | | ``` - +3. UnSubscribe -# Command + Method : GET + Options: -Command is means the operation which outside the CoAP protocol, like authorization -The Command format: + - Observe = 1 -1. use ****POST**** method -2. uri path is empty -3. query string is like ****action=comandX&argX=valuex&argY=valueY**** + URI Schema: ps/{+topic}{?q\*}\ + q\*: see [Shared Options](#orgc50043b)\ + Response: -example: -1. connect: -``` -coap-client -m post coap://127.0.0.1?action=connect&clientid=XXX&username=XXX&password=XXX -``` -2. disconnect: -``` -coap-client -m post coap://127.0.0.1?action=disconnect -``` + - 2.07 "No Content" when success + - 4.00 "Bad Request" when error + - 4.01 "Unauthorized" when with wrong auth uri query - -# MQTT QOS <=> CoAP non/con + -CoAP gateway uses some options to control the conversion between MQTT qos and coap non/con: +### MQTT Handler -1.notify_type -Control the type of notify messages when the observed object has changed.Can be: + Establishing a connection is optional. If the CoAP client needs to use connection-based operations, it must first establish a connection. +At the same time, the connectionless mode and the connected mode cannot be mixed. +In connection mode, the Publish/Subscribe/UnSubscribe sent by the client must be has Token and ClientId in query string. +If the Token and Clientid is wrong/miss, EMQ X will reset the request. +The communication token is the data carried in the response payload after the client successfully establishes a connection. +After obtaining the token, the client's subsequent request must attach "token=Token" to the Query String +ClientId is necessary when there is a connection, and is a unique identifier defined by the client. +The server manages the client through the ClientId. If the ClientId is wrong, EMQ X will reset the request. -- non -- con -- qos - in this value, MQTT QOS0 -> non, QOS1/QOS2 -> con +1. Create a Connection -2.subscribe_qos -Control the qos of subscribe.Can be: + Method: POST + URI Schema: mqtt/{+topic}{?q\*} + q\*: -- qos0 -- qos1 -- qos2 -- coap - in this value, CoAP non -> qos0, con -> qos1 + - clientId := client uid + - username + - password -3.publish_qos -like subscribe_qos, but control the qos of the publish MQTT message + Response: -License -------- + - 2.01 "Created" when success + - 4.00 "Bad Request" when error + - 4.01 "Unauthorized" wrong username or password -Apache License Version 2.0 + Payload: Token if success -Author ------- +2. Close a Connection -EMQ X Team. + Method : DELETE + URI Schema: mqtt/{+topic}{?q\*} + q\*: + + - clientId := client uid + - token + + Resonse: + + - 2.01 "Deleted" when success + - 4.00 "Bad Request" when error + - 4.01 "Unauthorized" wrong clientid or token + + + + +### Heartbeat + +The Coap client can maintain the "connection" with the server through the heartbeat (regardless of whether it is authenticated or not), so that the server will not release related resources +Method : PUT +URI Schema: mqtt/{+topic}{?q\*} +q\*: + +- clientId if authenticated +- token if authenticated + +Response: + +- 2.01 "Changed" when success +- 4.00 "Bad Request" when error +- 4.01 "Unauthorized" wrong clientid or token + + + + +### Query String + +CoAP gateway uses some options in query string to conversion between MQTT CoAP. + +1. Shared Options + + - clientId + - token + +2. Connect Options + + - username + - password + +3. Publish + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
optionvalue typedefault
retainbooleanfalse
qosMQTT QOSSee here
expiryMessage Expiry Interval0(Never expiry)
+ +4. Subscribe + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
optionvalue typedefault
qosMQTT QOSSee here
nlMQTT Subscribe No Local0
rhMQTT Subscribe Retain Handing0
+ +5. MQTT QOS <=> CoAP non/con + + 1.notif_type + Control the type of notify messages when the observed object has changed.Can be: + + - non + - con + - qos + in this value, MQTT QOS0 -> non, QOS1/QOS2 -> con + + 2.subscribe_qos + Control the qos of subscribe.Can be: + + - qos0 + - qos1 + - qos2 + - coap + in this value, CoAP non -> qos0, con -> qos1 + + 3.publish_qos + like subscribe_qos, but control the qos of the publish MQTT message + + + + +## Implementation + + + + +### Request/Response flow + +![img](./doc/flow.png) + +1. Authorization check + + Check whether the clientid and token in the query string match the current connection + +2. Session + + Manager the "Transport Mnager" "Observe Resouces Manger" and next message id + +3. Transport Mnager + + Manager "Transport" create/close/dispatch + +4. Observe resources Mnager + + Mnager observe topic and token + +5. Transport + + ![img](./doc/transport.png) + + 1. Shared State + + ![img](./doc/shared_state.png) + +6. Handler + + 1. pubsub + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodObserveAction
GET0subscribe and reply result
GET1unsubscribe and reply result
POSTXpublish and reply result
+ + 2. mqtt + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
MethodAction
PUTreply result
POSTreturn create connection action
DELETEreturn close connection action
diff --git a/apps/emqx_gateway/src/coap/doc/flow.png b/apps/emqx_gateway/src/coap/doc/flow.png new file mode 100644 index 000000000..5c7288348 Binary files /dev/null and b/apps/emqx_gateway/src/coap/doc/flow.png differ diff --git a/apps/emqx_gateway/src/coap/doc/shared_state.png b/apps/emqx_gateway/src/coap/doc/shared_state.png new file mode 100644 index 000000000..2a7df229f Binary files /dev/null and b/apps/emqx_gateway/src/coap/doc/shared_state.png differ diff --git a/apps/emqx_gateway/src/coap/doc/transport.png b/apps/emqx_gateway/src/coap/doc/transport.png new file mode 100644 index 000000000..e63af691a Binary files /dev/null and b/apps/emqx_gateway/src/coap/doc/transport.png differ diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index c208d00f8..7612a6142 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -27,10 +27,11 @@ -export([ info/1 , info/2 , stats/1 - , auth_publish/2 - , auth_subscribe/2 - , reply/4 - , ack/4 + , validator/3 + , get_clientinfo/1 + , get_config/2 + , get_config/3 + , result_keys/0 , transfer_result/3]). -export([ init/2 @@ -60,9 +61,16 @@ keepalive :: emqx_keepalive:keepalive() | undefined, %% Timer timers :: #{atom() => disable | undefined | reference()}, + token :: binary() | undefined, config :: hocon:config() }). +%% the execuate context for session call +-record(exec_ctx, { config :: hocon:config(), + ctx :: emqx_gateway_ctx:context(), + clientinfo :: emqx_types:clientinfo() + }). + -type channel() :: #channel{}. -define(DISCONNECT_WAIT_TIME, timer:seconds(10)). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). @@ -98,13 +106,18 @@ init(ConnInfo = #{peername := {PeerHost, _}, #{ctx := Ctx} = Config) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Config, undefined), + EnableAuth = maps:get(enable, maps:get(authentication, Config)), ClientInfo = set_peercert_infos( Peercert, #{ zone => default , protocol => 'coap' , peerhost => PeerHost , sockport => SockPort - , clientid => emqx_guid:to_base62(emqx_guid:gen()) + , clientid => if EnableAuth -> + undefined; + true -> + emqx_guid:to_base62(emqx_guid:gen()) + end , username => undefined , is_bridge => false , is_superuser => false @@ -116,48 +129,52 @@ init(ConnInfo = #{peername := {PeerHost, _}, , conninfo = ConnInfo , clientinfo = ClientInfo , timers = #{} + , config = Config , session = emqx_coap_session:new() - , config = Config#{clientinfo => ClientInfo, - ctx => Ctx} , keepalive = emqx_keepalive:init(maps:get(heartbeat, Config)) }. -auth_publish(Topic, - #{ctx := Ctx, - clientinfo := ClientInfo}) -> - emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic). +validator(Type, Topic, #exec_ctx{ctx = Ctx, + clientinfo = ClientInfo}) -> + emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic). -auth_subscribe(Topic, - #{ctx := Ctx, - clientinfo := ClientInfo}) -> - emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, Topic). +get_clientinfo(#exec_ctx{clientinfo = ClientInfo}) -> + ClientInfo. + +get_config(Key, Ctx) -> + get_config(Key, Ctx, undefined). + +get_config(Key, #exec_ctx{config = Cfg}, Def) -> + maps:get(Key, Cfg, Def). + +result_keys() -> + [out, reply, connection]. transfer_result(From, Value, Result) -> - ?TRANSFER_RESULT([out], From, Value, Result). + ?TRANSFER_RESULT(From, Value, Result). %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- -%% treat post to root path as a heartbeat -%% treat post to root path with query string as a command -handle_in(#coap_message{method = post, - options = Options} = Msg, ChannelT) -> - Channel = ensure_keepalive_timer(ChannelT), - case maps:get(uri_path, Options, <<>>) of - <<>> -> - handle_command(Msg, Channel); +handle_in(Msg, ChannleT) -> + Channel = ensure_keepalive_timer(ChannleT), + case convert_queries(Msg) of + {ok, Msg2} -> + case emqx_coap_message:is_request(Msg2) of + true -> + check_auth_state(Msg2, Channel); + _ -> + call_session(handle_response, Msg2, Channel) + end; _ -> - call_session(received, [Msg], Channel) - end; - -handle_in(Msg, Channel) -> - call_session(received, [Msg], ensure_keepalive_timer(Channel)). + response({error, bad_request}, <<"bad uri_query">>, Msg, Channel) + end. %%-------------------------------------------------------------------- %% Handle Delivers from broker to client %%-------------------------------------------------------------------- handle_deliver(Delivers, Channel) -> - call_session(deliver, [Delivers], Channel). + call_session(deliver, Delivers, Channel). %%-------------------------------------------------------------------- %% Handle timeout @@ -172,7 +189,7 @@ handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel end; handle_timeout(_, {transport, Msg}, Channel) -> - call_session(timeout, [Msg], Channel); + call_session(timeout, Msg, Channel); handle_timeout(_, disconnect, Channel) -> {shutdown, normal, Channel}; @@ -238,48 +255,123 @@ ensure_keepalive_timer(Fun, #channel{config = Cfg} = Channel) -> Interval = maps:get(heartbeat, Cfg), Fun(keepalive, Interval, keepalive, Channel). -handle_command(#coap_message{options = Options} = Msg, Channel) -> - case maps:get(uri_query, Options, []) of - [] -> - %% heartbeat - ack(Channel, {ok, valid}, <<>>, Msg); - QueryPairs -> - Queries = lists:foldl(fun(Pair, Acc) -> - [{K, V}] = cow_qs:parse_qs(Pair), - Acc#{K => V} - end, - #{}, - QueryPairs), - case maps:get(<<"action">>, Queries, undefined) of - undefined -> - ack(Channel, {error, bad_request}, <<"command without actions">>, Msg); - Action -> - handle_command(Action, Queries, Msg, Channel) - end +call_session(Fun, + Msg, + #channel{session = Session} = Channel) -> + Ctx = new_exec_ctx(Channel), + Result = erlang:apply(emqx_coap_session, Fun, [Msg, Ctx, Session]), + process_result([session, connection, out], Result, Msg, Channel). + +process_result([Key | T], Result, Msg, Channel) -> + case handle_result(Key, Result, Msg, Channel) of + {ok, Channel2} -> + process_result(T, Result, Msg, Channel2); + Other -> + Other + end; + +process_result(_, _, _, Channel) -> + {ok, Channel}. + +handle_result(session, #{session := Session}, _, Channel) -> + {ok, Channel#channel{session = Session}}; + +handle_result(connection, #{connection := open}, Msg, Channel) -> + do_connect(Msg, Channel); + +handle_result(connection, #{connection := close}, Msg, Channel) -> + Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), + {shutdown, close, {outgoing, Reply}, Channel}; + +handle_result(out, #{out := Out}, _, Channel) -> + {ok, {outgoing, Out}, Channel}; + +handle_result(_, _, _, Channel) -> + {ok, Channel}. + +check_auth_state(Method, #channel{config = Cfg} = Channel) -> + #{authentication := #{enable := Enable}} = Cfg, + check_token(Enable, Method, Channel). + +check_token(true, + #coap_message{options = Options} = Msg, + #channel{token = Token, + clientinfo = ClientInfo} = Channel) -> + #{clientid := ClientId} = ClientInfo, + case maps:get(uri_query, Options, undefined) of + #{<<"clientid">> := ClientId, + <<"token">> := Token} -> + call_session(handle_request, Msg, Channel); + #{<<"clientid">> := DesireId} -> + try_takeover(ClientId, DesireId, Msg, Channel); + _ -> + response({error, unauthorized}, Msg, Channel) + end; + +check_token(false, + #coap_message{options = Options} = Msg, + Channel) -> + case maps:get(uri_query, Options, undefined) of + #{<<"clientid">> := _} -> + response({error, unauthorized}, Msg, Channel); + #{<<"token">> := _} -> + response({error, unauthorized}, Msg, Channel); + _ -> + call_session(handle_request, Msg, Channel) end. -handle_command(<<"connect">>, Queries, Msg, Channel) -> +response(Method, Req, Channel) -> + response(Method, <<>>, Req, Channel). + +response(Method, Payload, Req, Channel) -> + Reply = emqx_coap_message:piggyback(Method, Payload, Req), + call_session(handle_out, Reply, Channel). + +try_takeover(undefined, + DesireId, + #coap_message{options = Opts} = Msg, + Channel) -> + case maps:get(uri_path, Opts, []) of + [<<"mqtt">>, <<"connection">> | _] -> + %% may be is a connect request + %% TODO need check repeat connect, unless we implement the + %% udp connection baseon the clientid + call_session(handle_request, Msg, Channel); + _ -> + do_takeover(DesireId, Msg, Channel) + end; + +try_takeover(_, DesireId, Msg, Channel) -> + do_takeover(DesireId, Msg, Channel). + +do_takeover(_DesireId, Msg, Channel) -> + %% TODO completed the takeover, now only reset the message + Reset = emqx_coap_message:reset(Msg), + call_session(handle_out, Reset, Channel). + +new_exec_ctx(#channel{config = Cfg, + ctx = Ctx, + clientinfo = ClientInfo}) -> + #exec_ctx{config = Cfg, + ctx = Ctx, + clientinfo = ClientInfo}. + +do_connect(#coap_message{options = Opts} = Req, Channel) -> + Queries = maps:get(uri_query, Opts), case emqx_misc:pipeline( [ fun run_conn_hooks/2 , fun enrich_clientinfo/2 , fun set_log_meta/2 , fun auth_connect/2 ], - {Queries, Msg}, + {Queries, Req}, Channel) of {ok, _Input, NChannel} -> - process_connect(ensure_connected(NChannel), Msg); + process_connect(ensure_connected(NChannel), Req); {error, ReasonCode, NChannel} -> ErrMsg = io_lib:format("Login Failed: ~s", [ReasonCode]), - ack(NChannel, {error, bad_request}, ErrMsg, Msg) - end; - -handle_command(<<"disconnect">>, _, Msg, Channel) -> - Channel2 = ensure_timer(disconnect, ?DISCONNECT_WAIT_TIME, disconnect, Channel), - ack(Channel2, {ok, deleted}, <<>>, Msg); - -handle_command(_, _, Msg, Channel) -> - ack(Channel, {error, bad_request}, <<"invalid action">>, Msg). + response({error, bad_request}, ErrMsg, Req, NChannel) + end. run_conn_hooks(Input, Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) -> @@ -291,8 +383,7 @@ run_conn_hooks(Input, Channel = #channel{ctx = Ctx, end. enrich_clientinfo({Queries, Msg}, - Channel = #channel{clientinfo = ClientInfo0, - config = Cfg}) -> + Channel = #channel{clientinfo = ClientInfo0}) -> case Queries of #{<<"username">> := UserName, <<"password">> := Password, @@ -301,8 +392,7 @@ enrich_clientinfo({Queries, Msg}, password => Password, clientid => ClientId}, {ok, NClientInfo} = fix_mountpoint(Msg, ClientInfo), - {ok, Channel#channel{clientinfo = NClientInfo, - config = Cfg#{clientinfo := NClientInfo}}}; + {ok, Channel#channel{clientinfo = NClientInfo}}; _ -> {error, "invalid queries", Channel} end. @@ -324,7 +414,8 @@ auth_connect(_Input, Channel = #channel{ctx = Ctx, {error, Reason} end. -fix_mountpoint(_Packet, #{mountpoint := undefined}) -> ok; +fix_mountpoint(_Packet, #{mountpoint := undefined} = ClientInfo) -> + {ok, ClientInfo}; fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> %% TODO: Enrich the varibale replacement???? %% i.e: ${ClientInfo.auth_result.productKey} @@ -334,27 +425,33 @@ fix_mountpoint(_Packet, ClientInfo = #{mountpoint := Mountpoint}) -> ensure_connected(Channel = #channel{ctx = Ctx, conninfo = ConnInfo, clientinfo = ClientInfo}) -> - NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, + NConnInfo = ConnInfo#{ connected_at => erlang:system_time(millisecond) + , proto_name => <<"COAP">> + , proto_ver => <<"1">> + }, ok = run_hooks(Ctx, 'client.connected', [ClientInfo, NConnInfo]), Channel#channel{conninfo = NConnInfo}. process_connect(Channel = #channel{ctx = Ctx, + session = Session, conninfo = ConnInfo, clientinfo = ClientInfo}, Msg) -> - SessFun = fun(_,_) -> emqx_coap_session:new() end, + %% inherit the old session + SessFun = fun(_,_) -> Session end, case emqx_gateway_ctx:open_session( Ctx, true, ClientInfo, ConnInfo, - SessFun + SessFun, + emqx_coap_session ) of {ok, _Sess} -> - ack(Channel, {ok, created}, <<"connected">>, Msg); + response({ok, created}, <<"connected">>, Msg, Channel); {error, Reason} -> ?LOG(error, "Failed to open session du to ~p", [Reason]), - ack(Channel, {error, bad_request}, <<>>, Msg) + response({error, bad_request}, Msg, Channel) end. run_hooks(Ctx, Name, Args) -> @@ -365,24 +462,20 @@ run_hooks(Ctx, Name, Args, Acc) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run_fold(Name, Args, Acc). -reply(Channel, Method, Payload, Req) -> - call_session(reply, [Req, Method, Payload], Channel). - -ack(Channel, Method, Payload, Req) -> - call_session(piggyback, [Req, Method, Payload], Channel). - -call_session(F, - A, - #channel{session = Session, - config = Cfg} = Channel) -> - case erlang:apply(emqx_coap_session, F, A ++ [Cfg, Session]) of - #{out := Out, - session := Session2} -> - {ok, {outgoing, Out}, Channel#channel{session = Session2}}; - #{out := Out} -> - {ok, {outgoing, Out}, Channel}; - #{session := Session2} -> - {ok, Channel#channel{session = Session2}}; - _ -> - {ok, Channel} +convert_queries(#coap_message{options = Opts} = Msg) -> + case maps:get(uri_query, Opts, undefined) of + undefined -> + {ok, Msg#coap_message{options = Opts#{uri_query => #{}}}}; + Queries -> + convert_queries(Queries, #{}, Msg) end. + +convert_queries([H | T], Queries, Msg) -> + case re:split(H, "=") of + [Key, Val] -> + convert_queries(T, Queries#{Key => Val}, Msg); + _ -> + error + end; +convert_queries([], Queries, #coap_message{options = Opts} = Msg) -> + {ok, Msg#coap_message{options = Opts#{uri_query => Queries}}}. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl index 9a53f3e01..c1bc08928 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl @@ -161,9 +161,7 @@ encode_option(location_query, OptVal) -> {?OPTION_LOCATION_QUERY, OptVal}; encode_option(proxy_uri, OptVal) -> {?OPTION_PROXY_URI, OptVal}; encode_option(proxy_scheme, OptVal) -> {?OPTION_PROXY_SCHEME, OptVal}; encode_option(size1, OptVal) -> {?OPTION_SIZE1, binary:encode_unsigned(OptVal)}; -%% draft-ietf-ore-observe-16 encode_option(observe, OptVal) -> {?OPTION_OBSERVE, binary:encode_unsigned(OptVal)}; -%% draft-ietf-ore-block-17 encode_option(block2, OptVal) -> {?OPTION_BLOCK2, encode_block(OptVal)}; encode_option(block1, OptVal) -> {?OPTION_BLOCK1, encode_block(OptVal)}; %% unknown opton diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index 6d27cd85a..09426a13d 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -57,17 +57,14 @@ init([]) -> %%-------------------------------------------------------------------- on_insta_create(_Insta = #{id := InstaId, - rawconf := #{resource := Resource} = RawConf + rawconf := RawConf }, Ctx, _GwState) -> - ResourceMod = get_resource_mod(Resource), Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), ListenerPids = lists:map(fun(Lis) -> - start_listener(InstaId, Ctx, ResourceMod, Lis) + start_listener(InstaId, Ctx, Lis) end, Listeners), - {ok, ResCtx} = ResourceMod:init(RawConf), - {ok, ListenerPids, #{ctx => Ctx, - res_ctx => ResCtx}}. + {ok, ListenerPids, #{ctx => Ctx}}. on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> InstaId = maps:get(id, NewInsta), @@ -85,12 +82,10 @@ on_insta_update(NewInsta, OldInsta, GwInstaState = #{ctx := Ctx}, GwState) -> end. on_insta_destroy(_Insta = #{ id := InstaId, - rawconf := #{resource := Resource} = RawConf + rawconf := RawConf }, - #{res_ctx := ResCtx} = _GwInstaState, + _GwInstaState, _GWState) -> - ResourceMod = get_resource_mod(Resource), - ok = ResourceMod:stop(ResCtx), Listeners = emqx_gateway_utils:normalize_rawconf(RawConf), lists:foreach(fun(Lis) -> stop_listener(InstaId, Lis) @@ -100,10 +95,9 @@ on_insta_destroy(_Insta = #{ id := InstaId, %% Internal funcs %%-------------------------------------------------------------------- -start_listener(InstaId, Ctx, ResourceMod, {Type, ListenOn, SocketOpts, Cfg}) -> +start_listener(InstaId, Ctx, {Type, ListenOn, SocketOpts, Cfg}) -> ListenOnStr = emqx_gateway_utils:format_listenon(ListenOn), - Cfg2 = Cfg#{resource => ResourceMod}, - case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg2) of + case start_listener(InstaId, Ctx, Type, ListenOn, SocketOpts, Cfg) of {ok, Pid} -> ?ULOG("Start coap ~s:~s listener on ~s successfully.~n", [InstaId, Type, ListenOnStr]), @@ -148,8 +142,3 @@ stop_listener(InstaId, {Type, ListenOn, SocketOpts, Cfg}) -> stop_listener(InstaId, Type, ListenOn, _SocketOpts, _Cfg) -> Name = name(InstaId, Type), esockd:close(Name, ListenOn). - -get_resource_mod(mqtt) -> - emqx_coap_mqtt_resource; -get_resource_mod(pubsub) -> - emqx_coap_pubsub_resource. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_message.erl b/apps/emqx_gateway/src/coap/emqx_coap_message.erl index 52a03c418..2e9fb144e 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_message.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_message.erl @@ -24,7 +24,13 @@ %% convenience functions for message construction -module(emqx_coap_message). --export([request/2, request/3, request/4, ack/1, response/1, response/2, response/3]). +-export([ request/2, request/3, request/4 + , ack/1, response/1, response/2 + , reset/1, piggyback/2, piggyback/3 + , response/3]). + +-export([is_request/1]). + -export([set/3, set_payload/2, get_content/1, set_content/2, set_content/3, get_option/2]). -include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). @@ -42,10 +48,13 @@ request(Type, Method, Content=#coap_content{}, Options) -> set_content(Content, #coap_message{type = Type, method = Method, options = Options}). -ack(Request = #coap_message{}) -> - #coap_message{type = ack, - id = Request#coap_message.id}. +ack(#coap_message{id = Id}) -> + #coap_message{type = ack, id = Id}. +reset(#coap_message{id = Id}) -> + #coap_message{type = reset, id = Id}. + +%% just make a response response(#coap_message{type = Type, id = Id, token = Token}) -> @@ -61,6 +70,19 @@ response(Method, Payload, Request) -> set_payload(Payload, response(Request))). +%% make a response which maybe is a piggyback ack +piggyback(Method, Request) -> + piggyback(Method, <<>>, Request). + +piggyback(Method, Payload, Request) -> + Reply = response(Method, Payload, Request), + case Reply of + #coap_message{type = con} -> + Reply#coap_message{type = ack}; + _ -> + Reply + end. + %% omit option for its default value set(max_age, ?DEFAULT_MAX_AGE, Msg) -> Msg; @@ -144,3 +166,9 @@ set_payload_block(Content, BlockId, {Num, _, Size}, Msg) -> set(BlockId, {Num, false, Size}, set_payload(binary:part(Content, OffsetBegin, ContentSize - OffsetBegin), Msg)) end. + +is_request(#coap_message{method = Method}) when is_atom(Method) -> + Method =/= undefined; + +is_request(_) -> + false. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl index 3cf925448..20473322e 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl @@ -18,7 +18,7 @@ %% API -export([ new_manager/0, insert/3, remove/2 - , res_changed/2, foreach/2]). + , res_changed/2, foreach/2, subscriptions/1]). -export_type([manager/0]). -define(MAX_SEQ_ID, 16777215). @@ -40,14 +40,15 @@ new_manager() -> #{}. --spec insert(topic(), token(), manager()) -> manager(). +-spec insert(topic(), token(), manager()) -> {seq_id(), manager()}. insert(Topic, Token, Manager) -> - case maps:get(Topic, Manager, undefined) of - undefined -> - Manager#{Topic => new_res(Token)}; - _ -> - Manager - end. + Res = case maps:get(Topic, Manager, undefined) of + undefined -> + new_res(Token); + Any -> + Any + end, + {maps:get(seq_id, Res), Manager#{Topic => Res}}. -spec remove(topic(), manager()) -> manager(). remove(Topic, Manager) -> @@ -72,6 +73,9 @@ foreach(F, Manager) -> Manager), ok. +subscriptions(Manager) -> + maps:keys(Manager). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 8b9eed14c..98e24f05c 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -23,11 +23,14 @@ %% API -export([new/0, transfer_result/3]). --export([ received/3 - , reply/4 - , reply/5 - , ack/3 - , piggyback/4 +-export([ info/1 + , info/2 + , stats/1 + ]). + +-export([ handle_request/3 + , handle_response/3 + , handle_out/3 , deliver/3 , timeout/3]). @@ -36,10 +39,31 @@ -record(session, { transport_manager :: emqx_coap_tm:manager() , observe_manager :: emqx_coap_observe_res:manager() , next_msg_id :: coap_message_id() + , created_at :: pos_integer() }). -type session() :: #session{}. +%% steal from emqx_session +-define(INFO_KEYS, [subscriptions, + upgrade_qos, + retry_interval, + await_rel_timeout, + created_at + ]). + +-define(STATS_KEYS, [subscriptions_cnt, + subscriptions_max, + inflight_cnt, + inflight_max, + mqueue_len, + mqueue_max, + mqueue_dropped, + next_pkt_id, + awaiting_rel_cnt, + awaiting_rel_max + ]). + %%%------------------------------------------------------------------- %%% API %%%------------------------------------------------------------------- @@ -48,125 +72,163 @@ new() -> _ = emqx_misc:rand_seed(), #session{ transport_manager = emqx_coap_tm:new() , observe_manager = emqx_coap_observe_res:new_manager() - , next_msg_id = rand:uniform(?MAX_MESSAGE_ID)}. + , next_msg_id = rand:uniform(?MAX_MESSAGE_ID) + , created_at = erlang:system_time(millisecond)}. + +%%-------------------------------------------------------------------- +%% Info, Stats +%%-------------------------------------------------------------------- +%% @doc Compatible with emqx_session +%% do we need use inflight and mqueue in here? +-spec(info(session()) -> emqx_types:infos()). +info(Session) -> + maps:from_list(info(?INFO_KEYS, Session)). + +info(Keys, Session) when is_list(Keys) -> + [{Key, info(Key, Session)} || Key <- Keys]; +info(subscriptions, #session{observe_manager = OM}) -> + emqx_coap_observe_res:subscriptions(OM); +info(subscriptions_cnt, #session{observe_manager = OM}) -> + erlang:length(emqx_coap_observe_res:subscriptions(OM)); +info(subscriptions_max, _) -> + infinity; +info(upgrade_qos, _) -> + ?QOS_0; +info(inflight, _) -> + emqx_inflight:new(); +info(inflight_cnt, _) -> + 0; +info(inflight_max, _) -> + 0; +info(retry_interval, _) -> + infinity; +info(mqueue, _) -> + emqx_mqueue:init(#{max_len => 0, store_qos0 => false}); +info(mqueue_len, #session{transport_manager = TM}) -> + maps:size(TM); +info(mqueue_max, _) -> + 0; +info(mqueue_dropped, _) -> + 0; +info(next_pkt_id, #session{next_msg_id = PacketId}) -> + PacketId; +info(awaiting_rel, _) -> + #{}; +info(awaiting_rel_cnt, _) -> + 0; +info(awaiting_rel_max, _) -> + infinity; +info(await_rel_timeout, _) -> + infinity; +info(created_at, #session{created_at = CreatedAt}) -> + CreatedAt. + +%% @doc Get stats of the session. +-spec(stats(session()) -> emqx_types:stats()). +stats(Session) -> info(?STATS_KEYS, Session). %%%------------------------------------------------------------------- %%% Process Message %%%------------------------------------------------------------------- -received(#coap_message{type = ack} = Msg, Cfg, Session) -> - handle_response(Msg, Cfg, Session); +handle_request(Msg, Ctx, Session) -> + call_transport_manager(?FUNCTION_NAME, + Msg, + Ctx, + [fun process_tm/3, fun process_subscribe/3], + Session). -received(#coap_message{type = reset} = Msg, Cfg, Session) -> - handle_response(Msg, Cfg, Session); +handle_response(Msg, Ctx, Session) -> + call_transport_manager(?FUNCTION_NAME, Msg, Ctx, [fun process_tm/3], Session). -received(#coap_message{method = Method} = Msg, Cfg, Session) when is_atom(Method) -> - handle_request(Msg, Cfg, Session); +handle_out(Msg, Ctx, Session) -> + call_transport_manager(?FUNCTION_NAME, Msg, Ctx, [fun process_tm/3], Session). -received(Msg, Cfg, Session) -> - handle_response(Msg, Cfg, Session). - -reply(Req, Method, Cfg, Session) -> - reply(Req, Method, <<>>, Cfg, Session). - -reply(Req, Method, Payload, Cfg, Session) -> - Response = emqx_coap_message:response(Method, Payload, Req), - handle_out(Response, Cfg, Session). - -ack(Req, Cfg, Session) -> - piggyback(Req, <<>>, Cfg, Session). - -piggyback(Req, Payload, Cfg, Session) -> - Response = emqx_coap_message:ack(Req), - Response2 = emqx_coap_message:set_payload(Payload, Response), - handle_out(Response2, Cfg, Session). - -deliver(Delivers, Cfg, Session) -> +deliver(Delivers, Ctx, Session) -> Fun = fun({_, Topic, Message}, #{out := OutAcc, session := #session{observe_manager = OM, - next_msg_id = MsgId} = SAcc} = Acc) -> + next_msg_id = MsgId, + transport_manager = TM} = SAcc} = Acc) -> case emqx_coap_observe_res:res_changed(Topic, OM) of undefined -> Acc; {Token, SeqId, OM2} -> - Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Cfg), - SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId), + Msg = mqtt_to_coap(Message, MsgId, Token, SeqId, Ctx), + SAcc2 = SAcc#session{next_msg_id = next_msg_id(MsgId, TM), observe_manager = OM2}, - #{out := Out} = Result = call_transport_manager(handle_out, Msg, Cfg, SAcc2), + #{out := Out} = Result = handle_out(Msg, Ctx, SAcc2), Result#{out := [Out | OutAcc]} end end, lists:foldl(Fun, - #{out => [], - session => Session}, + #{out => [], session => Session}, Delivers). -timeout(Timer, Cfg, Session) -> - call_transport_manager(?FUNCTION_NAME, Timer, Cfg, Session). +timeout(Timer, Ctx, Session) -> + call_transport_manager(?FUNCTION_NAME, Timer, Ctx, [fun process_tm/3], Session). + +result_keys() -> + [tm, subscribe] ++ emqx_coap_channel:result_keys(). transfer_result(From, Value, Result) -> - ?TRANSFER_RESULT([out, subscribe], From, Value, Result). + ?TRANSFER_RESULT(From, Value, Result). %%%------------------------------------------------------------------- %%% Internal functions %%%------------------------------------------------------------------- -handle_request(Msg, Cfg, Session) -> - call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session). - -handle_response(Msg, Cfg, Session) -> - call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session). - -handle_out(Msg, Cfg, Session) -> - call_transport_manager(?FUNCTION_NAME, Msg, Cfg, Session). - call_transport_manager(Fun, Msg, - Cfg, + Ctx, + Processor, #session{transport_manager = TM} = Session) -> try - Result = emqx_coap_tm:Fun(Msg, Cfg, TM), - {ok, _, Session2} = emqx_misc:pipeline([fun process_tm/2, - fun process_subscribe/2], - Result, - Session), - emqx_coap_channel:transfer_result(session, Session2, Result) + Result = emqx_coap_tm:Fun(Msg, Ctx, TM), + {ok, Result2, Session2} = pipeline(Processor, + Result, + Msg, + Session), + emqx_coap_channel:transfer_result(session, Session2, Result2) catch Type:Reason:Stack -> ?ERROR("process transmission with, message:~p failed~n Type:~p,Reason:~p~n,StackTrace:~p~n", [Msg, Type, Reason, Stack]), - #{out => emqx_coap_message:response({error, internal_server_error}, Msg)} + ?REPLY({error, internal_server_error}, Msg) end. -process_tm(#{tm := TM}, Session) -> +process_tm(#{tm := TM}, _, Session) -> {ok, Session#session{transport_manager = TM}}; -process_tm(_, Session) -> +process_tm(_, _, Session) -> {ok, Session}. -process_subscribe(#{subscribe := Sub}, #session{observe_manager = OM} = Session) -> +process_subscribe(#{subscribe := Sub} = Result, + Msg, + #session{observe_manager = OM} = Session) -> case Sub of undefined -> - {ok, Session}; + {ok, Result, Session}; {Topic, Token} -> - OM2 = emqx_coap_observe_res:insert(Topic, Token, OM), - {ok, Session#session{observe_manager = OM2}}; + {SeqId, OM2} = emqx_coap_observe_res:insert(Topic, Token, OM), + Replay = emqx_coap_message:piggyback({ok, content}, Msg), + Replay2 = Replay#coap_message{options = #{observe => SeqId}}, + {ok, Result#{reply => Replay2}, Session#session{observe_manager = OM2}}; Topic -> OM2 = emqx_coap_observe_res:remove(Topic, OM), - {ok, Session#session{observe_manager = OM2}} + Replay = emqx_coap_message:piggyback({ok, nocontent}, Msg), + {ok, Result#{reply => Replay}, Session#session{observe_manager = OM2}} end; -process_subscribe(_, Session) -> - {ok, Session}. +process_subscribe(Result, _, Session) -> + {ok, Result, Session}. -mqtt_to_coap(MQTT, MsgId, Token, SeqId, Cfg) -> +mqtt_to_coap(MQTT, MsgId, Token, SeqId, Ctx) -> #message{payload = Payload} = MQTT, - #coap_message{type = get_notify_type(MQTT, Cfg), + #coap_message{type = get_notify_type(MQTT, Ctx), method = {ok, content}, id = MsgId, token = Token, payload = Payload, - options = #{observe => SeqId, - max_age => get_max_age(MQTT)}}. + options = #{observe => SeqId}}. -get_notify_type(#message{qos = Qos}, #{notify_type := Type}) -> - case Type of +get_notify_type(#message{qos = Qos}, Ctx) -> + case emqx_coap_channel:get_config(notify_type, Ctx) of qos -> case Qos of ?QOS_0 -> @@ -178,18 +240,31 @@ get_notify_type(#message{qos = Qos}, #{notify_type := Type}) -> Other end. --spec get_max_age(#message{}) -> max_age(). -get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) -> - ?MAXIMUM_MAX_AGE; -get_max_age(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, - timestamp = Ts}) -> - Now = erlang:system_time(millisecond), - Diff = (Now - Ts + Interval * 1000) / 1000, - erlang:max(1, erlang:floor(Diff)); -get_max_age(_) -> - ?DEFAULT_MAX_AGE. +next_msg_id(MsgId, TM) -> + next_msg_id(MsgId + 1, MsgId, TM). -next_msg_id(MsgId) when MsgId >= ?MAX_MESSAGE_ID -> - 1; -next_msg_id(MsgId) -> - MsgId + 1. +next_msg_id(MsgId, MsgId, _) -> + erlang:throw("too many message in delivering"); +next_msg_id(MsgId, BeginId, TM) when MsgId >= ?MAX_MESSAGE_ID -> + check_is_inused(1, BeginId, TM); +next_msg_id(MsgId, BeginId, TM) -> + check_is_inused(MsgId, BeginId, TM). + +check_is_inused(NewMsgId, BeginId, TM) -> + case emqx_coap_tm:is_inused(out, NewMsgId, TM) of + false -> + NewMsgId; + _ -> + next_msg_id(NewMsgId + 1, BeginId, TM) + end. + +pipeline([Fun | T], Result, Msg, Session) -> + case Fun(Result, Msg, Session) of + {ok, Session2} -> + pipeline(T, Result, Msg, Session2); + {ok, Result2, Session2} -> + pipeline(T, Result2, Msg, Session2) + end; + +pipeline([], Result, _, Session) -> + {ok, Result, Session}. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl index 8830d7447..5a664b0f2 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl @@ -21,7 +21,8 @@ , handle_request/3 , handle_response/3 , handle_out/3 - , timeout/3]). + , timeout/3 + , is_inused/3]). -export_type([manager/0, event_result/1]). @@ -60,18 +61,18 @@ new() -> #{}. -handle_request(#coap_message{id = MsgId} = Msg, Cfg, TM) -> +handle_request(#coap_message{id = MsgId} = Msg, Ctx, TM) -> Id = {in, MsgId}, case maps:get(Id, TM, undefined) of undefined -> Transport = emqx_coap_transport:new(), Machine = new_state_machine(Id, Transport), - process_event(in, Msg, TM, Machine, Cfg); + process_event(in, Msg, TM, Ctx, Machine); Machine -> - process_event(in, Msg, TM, Machine, Cfg) + process_event(in, Msg, TM, Ctx, Machine) end. -handle_response(#coap_message{type = Type, id = MsgId} = Msg, Cfg, TM) -> +handle_response(#coap_message{type = Type, id = MsgId} = Msg, Ctx, TM) -> Id = {out, MsgId}, case maps:get(Id, TM, undefined) of undefined -> @@ -79,26 +80,25 @@ handle_response(#coap_message{type = Type, id = MsgId} = Msg, Cfg, TM) -> reset -> ?EMPTY_RESULT; _ -> - #{out => #coap_message{type = reset, - id = MsgId}} + ?RESET(Msg) end; Machine -> - process_event(in, Msg, TM, Machine, Cfg) + process_event(in, Msg, TM, Ctx, Machine) end. -handle_out(#coap_message{id = MsgId} = Msg, Cfg, TM) -> +handle_out(#coap_message{id = MsgId} = Msg, Ctx, TM) -> Id = {out, MsgId}, case maps:get(Id, TM, undefined) of undefined -> Transport = emqx_coap_transport:new(), Machine = new_state_machine(Id, Transport), - process_event(out, Msg, TM, Machine, Cfg); + process_event(out, Msg, TM, Ctx, Machine); _ -> - ?WARN("Repeat sending message with id:~p~n", [Id]), + %% ignore repeat send ?EMPTY_RESULT end. -timeout({Id, Type, Msg}, Cfg, TM) -> +timeout({Id, Type, Msg}, Ctx, TM) -> case maps:get(Id, TM, undefined) of undefined -> ?EMPTY_RESULT; @@ -106,12 +106,16 @@ timeout({Id, Type, Msg}, Cfg, TM) -> %% maybe timer has been canceled case maps:is_key(Type, Timers) of true -> - process_event(Type, Msg, TM, Machine, Cfg); + process_event(Type, Msg, TM, Ctx, Machine); _ -> ?EMPTY_RESULT end end. +-spec is_inused(direction(), message_id(), manager()) -> boolean(). +is_inused(Dir, Msg, Manager) -> + maps:is_key({Dir, Msg}, Manager). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -124,9 +128,9 @@ new_state_machine(Id, Transport) -> process_event(stop_timeout, _, TM, + _, #state_machine{id = Id, - timers = Timers}, - _) -> + timers = Timers}) -> lists:foreach(fun({_, Ref}) -> emqx_misc:cancel_timer(Ref) end, @@ -136,11 +140,11 @@ process_event(stop_timeout, process_event(Event, Msg, TM, + Ctx, #state_machine{id = Id, state = State, - transport = Transport} = Machine, - Cfg) -> - Result = emqx_coap_transport:State(Event, Msg, Transport, Cfg), + transport = Transport} = Machine) -> + Result = emqx_coap_transport:State(Event, Msg, Ctx, Transport), {ok, _, Machine2} = emqx_misc:pipeline([fun process_state_change/2, fun process_transport_change/2, fun process_timeouts/2], diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl index b4c8ae333..2c2aaab2e 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl @@ -21,6 +21,8 @@ -export_type([transport/0]). +-import(emqx_coap_message, [reset/1]). + -spec new() -> transport(). new() -> #transport{cache = undefined, @@ -28,54 +30,33 @@ new() -> retry_count = 0}. idle(in, - #coap_message{type = non, id = MsgId, method = Method} = Msg, - _, - #{resource := Resource} = Cfg) -> + #coap_message{type = non, method = Method} = Msg, + Ctx, + _) -> Ret = #{next => until_stop, timeouts => [{stop_timeout, ?NON_LIFETIME}]}, case Method of undefined -> - Ret#{out => #coap_message{type = reset, id = MsgId}}; + ?RESET(Msg); _ -> - case erlang:apply(Resource, Method, [Msg, Cfg]) of - #coap_message{} = Result -> - Ret#{out => Result}; - {has_sub, Result, Sub} -> - Ret#{out => Result, subscribe => Sub}; - error -> - Ret#{out => - emqx_coap_message:response({error, internal_server_error}, Msg)} - end + Result = call_handler(Msg, Ctx), + maps:merge(Ret, Result) end; idle(in, - #coap_message{id = MsgId, - type = con, - method = Method} = Msg, - Transport, - #{resource := Resource} = Cfg) -> + #coap_message{type = con, method = Method} = Msg, + Ctx, + Transport) -> Ret = #{next => maybe_resend, timeouts =>[{stop_timeout, ?EXCHANGE_LIFETIME}]}, case Method of undefined -> - ResetMsg = #coap_message{type = reset, id = MsgId}, + ResetMsg = reset(Msg), Ret#{transport => Transport#transport{cache = ResetMsg}, out => ResetMsg}; _ -> - {RetMsg, SubInfo} = - case erlang:apply(Resource, Method, [Msg, Cfg]) of - #coap_message{} = Result -> - {Result, undefined}; - {has_sub, Result, Sub} -> - {Result, Sub}; - error -> - {emqx_coap_message:response({error, internal_server_error}, Msg), - undefined} - end, - RetMsg2 = RetMsg#coap_message{type = ack}, - Ret#{out => RetMsg2, - transport => Transport#transport{cache = RetMsg2}, - subscribe => SubInfo} + Result = call_handler(Msg, Ctx), + maps:merge(Ret, Result) end; idle(out, #coap_message{type = non} = Msg, _, _) -> @@ -83,7 +64,7 @@ idle(out, #coap_message{type = non} = Msg, _, _) -> out => Msg, timeouts => [{stop_timeout, ?NON_LIFETIME}]}; -idle(out, Msg, Transport, _) -> +idle(out, Msg, _, Transport) -> _ = emqx_misc:rand_seed(), Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR), #{next => wait_ack, @@ -133,3 +114,13 @@ wait_ack(state_timeout, until_stop(_, _, _, _) -> ?EMPTY_RESULT. + +call_handler(#coap_message{options = Opts} = Msg, Ctx) -> + case maps:get(uri_path, Opts, undefined) of + [<<"ps">> | RestPath] -> + emqx_coap_pubsub_handler:handle_request(RestPath, Msg, Ctx); + [<<"mqtt">> | RestPath] -> + emqx_coap_mqtt_handler:handle_request(RestPath, Msg, Ctx); + _ -> + ?REPLY({error, bad_request}, Msg) + end. diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_mqtt_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_mqtt_handler.erl new file mode 100644 index 000000000..88a4a2310 --- /dev/null +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_mqtt_handler.erl @@ -0,0 +1,40 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_coap_mqtt_handler). + +-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). + +-export([handle_request/3]). +-import(emqx_coap_message, [response/2, response/3]). + +handle_request([<<"connection">>], #coap_message{method = Method} = Msg, _) -> + handle_method(Method, Msg); + +handle_request(_, Msg, _) -> + ?REPLY({error, bad_request}, Msg). + +handle_method(put, Msg) -> + ?REPLY({ok, changed}, Msg); + +handle_method(post, _) -> + #{connection => open}; + +handle_method(delete, _) -> + #{connection => close}; + +handle_method(_, Msg) -> + ?REPLY({error, method_not_allowed}, Msg). diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl new file mode 100644 index 000000000..e6886a559 --- /dev/null +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -0,0 +1,155 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% a coap to mqtt adapter with a retained topic message database +-module(emqx_coap_pubsub_handler). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). + +-export([handle_request/3]). + +-import(emqx_coap_message, [response/2, response/3]). + +-define(UNSUB(Topic), #{subscribe => Topic}). +-define(SUB(Topic, Token), #{subscribe => {Topic, Token}}). +-define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}). + +handle_request(Path, #coap_message{method = Method} = Msg, Ctx) -> + case check_topic(Path) of + {ok, Topic} -> + handle_method(Method, Topic, Msg, Ctx); + _ -> + ?REPLY({error, bad_request}, <<"invalid topic">>, Msg) + end. + +handle_method(get, Topic, #coap_message{options = Opts} = Msg, Ctx) -> + case maps:get(observe, Opts, undefined) of + 0 -> + subscribe(Msg, Topic, Ctx); + 1 -> + unsubscribe(Topic, Ctx); + _ -> + ?REPLY({error, bad_request}, <<"invalid observe value">>, Msg) + end; + +handle_method(post, Topic, #coap_message{payload = Payload} = Msg, Ctx) -> + case emqx_coap_channel:validator(publish, Topic, Ctx) of + allow -> + ClientInfo = emqx_coap_channel:get_clientinfo(Ctx), + #{clientid := ClientId} = ClientInfo, + QOS = get_publish_qos(Msg, Ctx), + MQTTMsg = emqx_message:make(ClientId, QOS, Topic, Payload), + MQTTMsg2 = apply_publish_opts(Msg, MQTTMsg), + _ = emqx_broker:publish(MQTTMsg2), + ?REPLY({ok, changed}, Msg); + _ -> + ?REPLY({error, unauthorized}, Msg) + end; + +handle_method(_, _, Msg, _) -> + ?REPLY({error, method_not_allowed}, Msg). + +check_topic([]) -> + error; + +check_topic(Path) -> + Sep = <<"/">>, + {ok, + emqx_http_lib:uri_decode( + lists:foldl(fun(Part, Acc) -> + <> + end, + <<>>, + Path))}. + +get_sub_opts(#coap_message{options = Opts} = Msg, Ctx) -> + SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts), + case SubOpts of + #{qos := _} -> + maps:merge(SubOpts, ?SUBOPTS); + _ -> + CfgType = emqx_coap_channel:get_config(subscribe_qos, Ctx), + maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}) + end. + +parse_sub_opts(<<"qos">>, V, Opts) -> + Opts#{qos => erlang:binary_to_integer(V)}; +parse_sub_opts(<<"nl">>, V, Opts) -> + Opts#{nl => erlang:binary_to_integer(V)}; +parse_sub_opts(<<"rh">>, V, Opts) -> + Opts#{rh => erlang:binary_to_integer(V)}; +parse_sub_opts(_, _, Opts) -> + Opts. + +type_to_qos(qos0, _) -> ?QOS_0; +type_to_qos(qos1, _) -> ?QOS_1; +type_to_qos(qos2, _) -> ?QOS_2; +type_to_qos(coap, #coap_message{type = Type}) -> + case Type of + non -> + ?QOS_0; + _ -> + ?QOS_1 + end. + +get_publish_qos(#coap_message{options = Opts} = Msg, Ctx) -> + case maps:get(uri_query, Opts) of + #{<<"qos">> := QOS} -> + erlang:binary_to_integer(QOS); + _ -> + CfgType = emqx_coap_channel:get_config(publish_qos, Ctx), + type_to_qos(CfgType, Msg) + end. + +apply_publish_opts(#coap_message{options = Opts}, MQTTMsg) -> + maps:fold(fun(<<"retain">>, V, Acc) -> + Val = erlang:binary_to_atom(V), + emqx_message:set_flag(retain, Val, Acc); + (<<"expiry">>, V, Acc) -> + Val = erlang:binary_to_integer(V), + Props = emqx_message:get_header(properties, Acc), + emqx_message:set_header(properties, + Props#{'Message-Expiry-Interval' => Val}, + Acc); + (_, _, Acc) -> + Acc + end, + MQTTMsg, + maps:get(uri_query, Opts)). + +subscribe(#coap_message{token = <<>>} = Msg, _, _) -> + ?REPLY({error, bad_request}, <<"observe without token">>, Msg); + +subscribe(#coap_message{token = Token} = Msg, Topic, Ctx) -> + case emqx_coap_channel:validator(subscribe, Topic, Ctx) of + allow -> + ClientInfo = emqx_coap_channel:get_clientinfo(Ctx), + #{clientid := ClientId} = ClientInfo, + SubOpts = get_sub_opts(Msg, Ctx), + emqx_broker:subscribe(Topic, ClientId, SubOpts), + emqx_hooks:run('session.subscribed', + [ClientInfo, Topic, SubOpts]), + ?SUB(Topic, Token); + _ -> + ?REPLY({error, unauthorized}, Msg) + end. + +unsubscribe(Topic, Ctx) -> + ClientInfo = emqx_coap_channel:get_clientinfo(Ctx), + emqx_broker:unsubscribe(Topic), + emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, ?SUBOPTS]), + ?UNSUB(Topic). diff --git a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl index 911d10a22..3b0268abb 100644 --- a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl +++ b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl @@ -23,12 +23,17 @@ -define(MAXIMUM_MAX_AGE, 4294967295). -define(EMPTY_RESULT, #{}). --define(TRANSFER_RESULT(Keys, From, Value, R1), +-define(TRANSFER_RESULT(From, Value, R1), begin + Keys = result_keys(), R2 = maps:with(Keys, R1), R2#{From => Value} end). +-define(RESET(Msg), #{out => emqx_coap_message:reset(Msg)}). +-define(REPLY(Resp, Payload, Msg), #{out => emqx_coap_message:piggyback(Resp, Payload, Msg)}). +-define(REPLY(Resp, Msg), ?REPLY(Resp, <<>>, Msg)). + -type coap_message_id() :: 1 .. ?MAX_MESSAGE_ID. -type message_type() :: con | non | ack | reset. -type max_age() :: 1 .. ?MAXIMUM_MAX_AGE. diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl deleted file mode 100644 index 1fd3d7b8e..000000000 --- a/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl +++ /dev/null @@ -1,153 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% a coap to mqtt adapter --module(emqx_coap_mqtt_resource). - --behaviour(emqx_coap_resource). - --include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). - - --export([ init/1 - , stop/1 - , get/2 - , put/2 - , post/2 - , delete/2 - ]). - --export([ check_topic/1 - , publish/3 - , subscribe/3 - , unsubscribe/3]). - --define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, is_new => false}). - -init(_) -> - {ok, undefined}. - -stop(_) -> - ok. - -%% get: subscribe, ignore observe option -get(#coap_message{token = Token} = Msg, Cfg) -> - case check_topic(Msg) of - {ok, Topic} -> - case Token of - <<>> -> - emqx_coap_message:response({error, bad_request}, <<"observer without token">>, Msg); - _ -> - Ret = subscribe(Msg, Topic, Cfg), - RetMsg = emqx_coap_message:response(Ret, Msg), - case Ret of - {ok, _} -> - {has_sub, RetMsg, {Topic, Token}}; - _ -> - RetMsg - end - end; - Any -> - Any - end. - -%% put: equal post -put(Msg, Cfg) -> - post(Msg, Cfg). - -%% post: publish a message -post(Msg, Cfg) -> - case check_topic(Msg) of - {ok, Topic} -> - emqx_coap_message:response(publish(Msg, Topic, Cfg), Msg); - Any -> - Any - end. - -%% delete: ubsubscribe -delete(Msg, Cfg) -> - case check_topic(Msg) of - {ok, Topic} -> - unsubscribe(Msg, Topic, Cfg), - {has_sub, emqx_coap_message:response({ok, deleted}, Msg), Topic}; - Any -> - Any - end. - -check_topic(#coap_message{options = Options} = Msg) -> - case maps:get(uri_path, Options, []) of - [] -> - emqx_coap_message:response({error, bad_request}, <<"invalid topic">> , Msg); - UriPath -> - Sep = <<"/">>, - {ok, lists:foldl(fun(Part, Acc) -> - <> - end, - <<>>, - UriPath)} - end. - -publish(#coap_message{payload = Payload} = Msg, - Topic, - #{clientinfo := ClientInfo, - publish_qos := QOS} = Cfg) -> - case emqx_coap_channel:auth_publish(Topic, Cfg) of - allow -> - #{clientid := ClientId} = ClientInfo, - MQTTMsg = emqx_message:make(ClientId, type_to_qos(QOS, Msg), Topic, Payload), - MQTTMsg2 = emqx_message:set_flag(retain, false, MQTTMsg), - _ = emqx_broker:publish(MQTTMsg2), - {ok, changed}; - _ -> - {error, unauthorized} - end. - -subscribe(Msg, Topic, #{clientinfo := ClientInfo}= Cfg) -> - case emqx_topic:wildcard(Topic) of - false -> - case emqx_coap_channel:auth_subscribe(Topic, Cfg) of - allow -> - #{clientid := ClientId} = ClientInfo, - SubOpts = get_sub_opts(Msg, Cfg), - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), - {ok, created}; - _ -> - {error, unauthorized} - end; - _ -> - %% now, we don't support wildcard in subscribe topic - {error, bad_request, <<"">>} - end. - -unsubscribe(Msg, Topic, #{clientinfo := ClientInfo} = Cfg) -> - emqx_broker:unsubscribe(Topic), - emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, get_sub_opts(Msg, Cfg)]). - -get_sub_opts(Msg, #{subscribe_qos := Type}) -> - ?SUBOPTS#{qos => type_to_qos(Type, Msg)}. - -type_to_qos(qos0, _) -> ?QOS_0; -type_to_qos(qos1, _) -> ?QOS_1; -type_to_qos(qos2, _) -> ?QOS_2; -type_to_qos(coap, #coap_message{type = Type}) -> - case Type of - non -> - ?QOS_0; - _ -> - ?QOS_1 - end. diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl deleted file mode 100644 index c750f66dd..000000000 --- a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl +++ /dev/null @@ -1,219 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% a coap to mqtt adapter with a retained topic message database --module(emqx_coap_pubsub_resource). - --behaviour(emqx_coap_resource). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). - - --export([ init/1 - , stop/1 - , get/2 - , put/2 - , post/2 - , delete/2 - ]). --import(emqx_coap_mqtt_resource, [ check_topic/1, subscribe/3, unsubscribe/3 - , publish/3]). - --import(emqx_coap_message, [response/2, response/3, set_content/2]). -%%-------------------------------------------------------------------- -%% Resource Callbacks -%%-------------------------------------------------------------------- -init(_) -> - emqx_coap_pubsub_topics:start_link(). - -stop(Pid) -> - emqx_coap_pubsub_topics:stop(Pid). - -%% get: read last publish message -%% get with observe 0: subscribe -%% get with observe 1: unsubscribe -get(#coap_message{token = Token} = Msg, Cfg) -> - case check_topic(Msg) of - {ok, Topic} -> - case emqx_coap_message:get_option(observe, Msg) of - undefined -> - Content = emqx_coap_message:get_content(Msg), - read_last_publish_message(emqx_topic:wildcard(Topic), Msg, Topic, Content); - 0 -> - case Token of - <<>> -> - response({error, bad_reuqest}, <<"observe without token">>, Msg); - _ -> - Ret = subscribe(Msg, Topic, Cfg), - RetMsg = response(Ret, Msg), - case Ret of - {ok, _} -> - {has_sub, RetMsg, {Topic, Token}}; - _ -> - RetMsg - end - end; - 1 -> - unsubscribe(Msg, Topic, Cfg), - {has_sub, response({ok, deleted}, Msg), Topic} - end; - Any -> - Any - end. - -%% put: insert a message into topic database -put(Msg, _) -> - case check_topic(Msg) of - {ok, Topic} -> - Content = emqx_coap_message:get_content(Msg), - #coap_content{payload = Payload, - format = Format, - max_age = MaxAge} = Content, - handle_received_create(Msg, Topic, MaxAge, Format, Payload); - Any -> - Any - end. - -%% post: like put, but will publish the inserted message -post(Msg, Cfg) -> - case check_topic(Msg) of - {ok, Topic} -> - Content = emqx_coap_message:get_content(Msg), - #coap_content{max_age = MaxAge, - format = Format, - payload = Payload} = Content, - handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg); - Any -> - Any - end. - -%% delete: delete a message from topic database -delete(Msg, _) -> - case check_topic(Msg) of - {ok, Topic} -> - delete_topic_info(Msg, Topic); - Any -> - Any - end. - -%%-------------------------------------------------------------------- -%% Internal Functions -%%-------------------------------------------------------------------- -add_topic_info(Topic, MaxAge, Format, Payload) when is_binary(Topic), Topic =/= <<>> -> - case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of - [{_, StoredMaxAge, StoredCT, _, _}] -> - ?LOG(debug, "publish topic=~p already exists, need reset the topic info", [Topic]), - %% check whether the ct value stored matches the ct option in this POST message - case Format =:= StoredCT of - true -> - {ok, Ret} = - case StoredMaxAge =:= MaxAge of - true -> - emqx_coap_pubsub_topics:reset_topic_info(Topic, Payload); - false -> - emqx_coap_pubsub_topics:reset_topic_info(Topic, MaxAge, Payload) - end, - {changed, Ret}; - false -> - ?LOG(debug, "ct values of topic=~p do not match, stored ct=~p, new ct=~p, ignore the PUBLISH", [Topic, StoredCT, Format]), - {changed, false} - end; - [] -> - ?LOG(debug, "publish topic=~p will be created", [Topic]), - {ok, Ret} = emqx_coap_pubsub_topics:add_topic_info(Topic, MaxAge, Format, Payload), - {created, Ret} - end; - -add_topic_info(Topic, _MaxAge, _Format, _Payload) -> - ?LOG(debug, "create topic=~p info failed", [Topic]), - {badarg, false}. - -format_string_to_int(<<"application/octet-stream">>) -> - <<"42">>; -format_string_to_int(<<"application/exi">>) -> - <<"47">>; -format_string_to_int(<<"application/json">>) -> - <<"50">>; -format_string_to_int(_) -> - <<"42">>. - -handle_received_publish(Msg, Topic, MaxAge, Format, Payload, Cfg) -> - case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of - {_, true} -> - response(publish(Msg, Topic, Cfg), Msg); - {_, false} -> - ?LOG(debug, "add_topic_info failed, will return bad_request", []), - response({error, bad_request}, Msg) - end. - -handle_received_create(Msg, Topic, MaxAge, Format, Payload) -> - case add_topic_info(Topic, MaxAge, format_string_to_int(Format), Payload) of - {Ret, true} -> - response({ok, Ret}, Msg); - {_, false} -> - ?LOG(debug, "add_topic_info failed, will return bad_request", []), - response({error, bad_request}, Msg) - end. - -return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content) -> - TimeElapsed = trunc((erlang:system_time(millisecond) - TimeStamp) / 1000), - case TimeElapsed < MaxAge of - true -> - LeftTime = (MaxAge - TimeElapsed), - ?LOG(debug, "topic=~p has max age left time is ~p", [Topic, LeftTime]), - set_content(Content#coap_content{max_age = LeftTime, payload = Payload}, - response({ok, content}, Msg)); - false -> - ?LOG(debug, "topic=~p has been timeout, will return empty content", [Topic]), - response({ok, nocontent}, Msg) - end. - -read_last_publish_message(false, Msg, Topic, Content=#coap_content{format = QueryFormat}) when is_binary(QueryFormat)-> - ?LOG(debug, "the QueryFormat=~p", [QueryFormat]), - case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of - [] -> - response({error, not_found}, Msg); - [{_, MaxAge, CT, Payload, TimeStamp}] -> - case CT =:= format_string_to_int(QueryFormat) of - true -> - return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content); - false -> - ?LOG(debug, "format value does not match, the queried format=~p, the stored format=~p", [QueryFormat, CT]), - response({error, bad_request}, Msg) - end - end; - -read_last_publish_message(false, Msg, Topic, Content) -> - case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of - [] -> - response({error, not_found}, Msg); - [{_, MaxAge, _, Payload, TimeStamp}] -> - return_resource(Msg, Topic, Payload, MaxAge, TimeStamp, Content) - end; - -read_last_publish_message(true, Msg, Topic, _Content) -> - ?LOG(debug, "the topic=~p is illegal wildcard topic", [Topic]), - response({error, bad_request}, Msg). - -delete_topic_info(Msg, Topic) -> - case emqx_coap_pubsub_topics:lookup_topic_info(Topic) of - [] -> - response({error, not_found}, Msg); - [{_, _, _, _, _}] -> - emqx_coap_pubsub_topics:delete_sub_topics(Topic), - response({ok, deleted}, Msg) - end. diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl deleted file mode 100644 index 328d1df04..000000000 --- a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl +++ /dev/null @@ -1,185 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_coap_pubsub_topics). - --behaviour(gen_server). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). - - --export([ start_link/0 - , stop/1 - ]). - --export([ add_topic_info/4 - , delete_topic_info/1 - , delete_sub_topics/1 - , is_topic_existed/1 - , is_topic_timeout/1 - , reset_topic_info/2 - , reset_topic_info/3 - , reset_topic_info/4 - , lookup_topic_info/1 - , lookup_topic_payload/1 - ]). - -%% gen_server. --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --record(state, {}). - --define(COAP_TOPIC_TABLE, coap_topic). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -stop(Pid) -> - gen_server:stop(Pid). - -add_topic_info(Topic, MaxAge, CT, Payload) when is_binary(Topic), is_integer(MaxAge), is_binary(CT), is_binary(Payload) -> - gen_server:call(?MODULE, {add_topic, {Topic, MaxAge, CT, Payload}}). - -delete_topic_info(Topic) when is_binary(Topic) -> - gen_server:call(?MODULE, {remove_topic, Topic}). - -delete_sub_topics(Topic) when is_binary(Topic) -> - gen_server:cast(?MODULE, {remove_sub_topics, Topic}). - -reset_topic_info(Topic, Payload) -> - gen_server:call(?MODULE, {reset_topic, {Topic, Payload}}). - -reset_topic_info(Topic, MaxAge, Payload) -> - gen_server:call(?MODULE, {reset_topic, {Topic, MaxAge, Payload}}). - -reset_topic_info(Topic, MaxAge, CT, Payload) -> - gen_server:call(?MODULE, {reset_topic, {Topic, MaxAge, CT, Payload}}). - -is_topic_existed(Topic) -> - ets:member(?COAP_TOPIC_TABLE, Topic). - -is_topic_timeout(Topic) when is_binary(Topic) -> - [{Topic, MaxAge, _, _, TimeStamp}] = ets:lookup(?COAP_TOPIC_TABLE, Topic), - %% MaxAge: x seconds - MaxAge < ((erlang:system_time(millisecond) - TimeStamp) / 1000). - -lookup_topic_info(Topic) -> - ets:lookup(?COAP_TOPIC_TABLE, Topic). - -lookup_topic_payload(Topic) -> - try ets:lookup_element(?COAP_TOPIC_TABLE, Topic, 4) - catch - error:badarg -> undefined - end. - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([]) -> - _ = ets:new(?COAP_TOPIC_TABLE, [set, named_table, protected]), - ?LOG(debug, "Create the coap_topic table", []), - {ok, #state{}}. - -handle_call({add_topic, {Topic, MaxAge, CT, Payload}}, _From, State) -> - Ret = create_table_element(Topic, MaxAge, CT, Payload), - {reply, {ok, Ret}, State, hibernate}; - -handle_call({reset_topic, {Topic, Payload}}, _From, State) -> - Ret = update_table_element(Topic, Payload), - {reply, {ok, Ret}, State, hibernate}; - -handle_call({reset_topic, {Topic, MaxAge, Payload}}, _From, State) -> - Ret = update_table_element(Topic, MaxAge, Payload), - {reply, {ok, Ret}, State, hibernate}; - -handle_call({reset_topic, {Topic, MaxAge, CT, Payload}}, _From, State) -> - Ret = update_table_element(Topic, MaxAge, CT, Payload), - {reply, {ok, Ret}, State, hibernate}; - -handle_call({remove_topic, {Topic, _Content}}, _From, State) -> - ets:delete(?COAP_TOPIC_TABLE, Topic), - ?LOG(debug, "Remove topic ~p in the coap_topic table", [Topic]), - {reply, ok, State, hibernate}; - -handle_call(Request, _From, State) -> - ?LOG(error, "adapter unexpected call ~p", [Request]), - {reply, ignored, State, hibernate}. - -handle_cast({remove_sub_topics, TopicPrefix}, State) -> - DeletedTopicNum = ets:foldl(fun ({Topic, _, _, _, _}, AccIn) -> - case binary:match(Topic, TopicPrefix) =/= nomatch of - true -> - ?LOG(debug, "Remove topic ~p in the coap_topic table", [Topic]), - ets:delete(?COAP_TOPIC_TABLE, Topic), - AccIn + 1; - false -> - AccIn - end - end, 0, ?COAP_TOPIC_TABLE), - ?LOG(debug, "Remove number of ~p topics with prefix=~p in the coap_topic table", [DeletedTopicNum, TopicPrefix]), - {noreply, State, hibernate}; - -handle_cast(Msg, State) -> - ?LOG(error, "broker_api unexpected cast ~p", [Msg]), - {noreply, State, hibernate}. - -handle_info(Info, State) -> - ?LOG(error, "adapter unexpected info ~p", [Info]), - {noreply, State, hibernate}. - -terminate(Reason, #state{}) -> - ets:delete(?COAP_TOPIC_TABLE), - Level = case Reason =:= normal orelse Reason =:= shutdown of - true -> debug; - false -> error - end, - ?SLOG(Level, #{terminate_reason => Reason}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - -%%-------------------------------------------------------------------- -%% Internal Functions -%%-------------------------------------------------------------------- -create_table_element(Topic, MaxAge, CT, Payload) -> - TopicInfo = {Topic, MaxAge, CT, Payload, erlang:system_time(millisecond)}, - ?LOG(debug, "Insert ~p in the coap_topic table", [TopicInfo]), - ets:insert_new(?COAP_TOPIC_TABLE, TopicInfo). - -update_table_element(Topic, Payload) -> - ?LOG(debug, "Update the topic=~p only with Payload", [Topic]), - ets:update_element(?COAP_TOPIC_TABLE, Topic, [{4, Payload}, {5, erlang:system_time(millisecond)}]). - -update_table_element(Topic, MaxAge, Payload) -> - ?LOG(debug, "Update the topic=~p info of MaxAge=~p and Payload", [Topic, MaxAge]), - ets:update_element(?COAP_TOPIC_TABLE, Topic, [{2, MaxAge}, {4, Payload}, {5, erlang:system_time(millisecond)}]). - -update_table_element(Topic, MaxAge, CT, <<>>) -> - ?LOG(debug, "Update the topic=~p info of MaxAge=~p, CT=~p, payload=<<>>", [Topic, MaxAge, CT]), - ets:update_element(?COAP_TOPIC_TABLE, Topic, [{2, MaxAge}, {3, CT}, {5, erlang:system_time(millisecond)}]). diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 546640a90..83142abb1 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -31,6 +31,7 @@ -export([start_link/1]). -export([ open_session/5 + , open_session/6 , kick_session/2 , kick_session/3 , register_channel/4 @@ -225,28 +226,32 @@ connection_closed(Type, ClientId) -> }} | {error, any()}. -open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> +open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> + open_session(Type, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). + +open_session(Type, true = _CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> Self = self(), ClientId = maps:get(clientid, ClientInfo), Fun = fun(_) -> - ok = discard_session(Type, ClientId), - Session = create_session(Type, - ClientInfo, - ConnInfo, - CreateSessionFun - ), - register_channel(Type, ClientId, Self, ConnInfo), - {ok, #{session => Session, present => false}} + ok = discard_session(Type, ClientId), + Session = create_session(Type, + ClientInfo, + ConnInfo, + CreateSessionFun, + SessionMod + ), + register_channel(Type, ClientId, Self, ConnInfo), + {ok, #{session => Session, present => false}} end, locker_trans(Type, ClientId, Fun); open_session(_Type, false = _CleanStart, - _ClientInfo, _ConnInfo, _CreateSessionFun) -> + _ClientInfo, _ConnInfo, _CreateSessionFun, _SessionMod) -> %% TODO: {error, not_supported_now}. %% @private -create_session(Type, ClientInfo, ConnInfo, CreateSessionFun) -> +create_session(Type, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> try Session = emqx_gateway_utils:apply( CreateSessionFun, @@ -255,7 +260,7 @@ create_session(Type, ClientInfo, ConnInfo, CreateSessionFun) -> ok = emqx_gateway_metrics:inc(Type, 'session.created'), SessionInfo = case is_tuple(Session) andalso element(1, Session) == session of - true -> emqx_session:info(Session); + true -> SessionMod:info(Session); _ -> case is_map(Session) of false -> diff --git a/apps/emqx_gateway/src/emqx_gateway_ctx.erl b/apps/emqx_gateway/src/emqx_gateway_ctx.erl index 406de7767..d9517b53f 100644 --- a/apps/emqx_gateway/src/emqx_gateway_ctx.erl +++ b/apps/emqx_gateway/src/emqx_gateway_ctx.erl @@ -40,6 +40,7 @@ %% Authentication circle -export([ authenticate/2 , open_session/5 + , open_session/6 , insert_channel_info/4 , set_chan_info/3 , set_chan_stats/3 @@ -96,15 +97,18 @@ authenticate(_Ctx, ClientInfo) -> pendings => list() }} | {error, any()}. -open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun) -> +open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> + open_session(Ctx, CleanStart, ClientInfo, ConnInfo, CreateSessionFun, emqx_session). + +open_session(Ctx, false, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> logger:warning("clean_start=false is not supported now, " "fallback to clean_start mode"), - open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun); + open_session(Ctx, true, ClientInfo, ConnInfo, CreateSessionFun, SessionMod); open_session(_Ctx = #{type := Type}, - CleanStart, ClientInfo, ConnInfo, CreateSessionFun) -> + CleanStart, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) -> emqx_gateway_cm:open_session(Type, CleanStart, - ClientInfo, ConnInfo, CreateSessionFun). + ClientInfo, ConnInfo, CreateSessionFun, SessionMod). -spec insert_channel_info(context(), emqx_types:clientid(), @@ -132,7 +136,7 @@ connection_closed(_Ctx = #{type := Type}, ClientId) -> -spec authorize(context(), emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic()) - -> allow | deny. + -> allow | deny. authorize(_Ctx, ClientInfo, PubSub, Topic) -> emqx_access_control:authorize(ClientInfo, PubSub, Topic). diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 5cb958701..5c98e1f34 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -215,8 +215,7 @@ fields(coap) -> fields(coap_structs) -> [ {enable_stats, t(boolean(), undefined, true)} , {authentication, t(ref(authentication))} - , {heartbeat, t(duration(), undefined, "15s")} - , {resource, t(union([mqtt, pubsub]), undefined, mqtt)} + , {heartbeat, t(duration(), undefined, "30s")} , {notify_type, t(union([non, con, qos]), undefined, qos)} , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}