diff --git a/.gitignore b/.gitignore index aaec950d4..b23eeafe3 100644 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,4 @@ erlang.mk *.coverdata etc/emqx.conf.rendered Mnesia.*/ +*.DS_Store diff --git a/apps/emqx_exproto/.gitignore b/apps/emqx_exproto/.gitignore new file mode 100644 index 000000000..791a6e94e --- /dev/null +++ b/apps/emqx_exproto/.gitignore @@ -0,0 +1,5 @@ +src/emqx_exproto_pb.erl +src/emqx_exproto_v_1_connection_adapter_bhvr.erl +src/emqx_exproto_v_1_connection_adapter_client.erl +src/emqx_exproto_v_1_connection_handler_bhvr.erl +src/emqx_exproto_v_1_connection_handler_client.erl diff --git a/apps/emqx_exproto/README.md b/apps/emqx_exproto/README.md new file mode 100644 index 000000000..4b59dcae3 --- /dev/null +++ b/apps/emqx_exproto/README.md @@ -0,0 +1,28 @@ +# emqx-exproto + +The `emqx_exproto` extremly enhance the extensibility for EMQ X. It allow using an others programming language to **replace the protocol handling layer in EMQ X Broker**. + +## Feature + +- [x] Based on gRPC, it brings a very wide range of applicability +- [x] Allows you to use the return value to extend emqx behavior. + +## Architecture + +![EMQ X ExProto Arch](./docs/images/exproto-arch.jpg) + +## Usage + +### gRPC service + +See: `priv/protos/exproto.proto` + +## Example + +## Recommended gRPC Framework + +See: https://github.com/grpc-ecosystem/awesome-grpc + +## Thanks + +- [grpcbox](https://github.com/tsloughter/grpcbox) diff --git a/apps/emqx_exproto/docs/design.md b/apps/emqx_exproto/docs/design.md new file mode 100644 index 000000000..0a6a082e2 --- /dev/null +++ b/apps/emqx_exproto/docs/design.md @@ -0,0 +1,127 @@ +# 多语言 - 协议接入 + +`emqx-exproto` 插件用于协议解析的多语言支持。它能够允许其他编程语言(例如:Python,Java 等)直接处理数据流实现协议的解析,并提供 Pub/Sub 接口以实现与系统其它组件的通信。 + +该插件给 EMQ X 带来的扩展性十分的强大,它能以你熟悉语言处理任何的私有协议,并享受由 EMQ X 系统带来的高连接,和高并发的优点。 + +## 特性 + +- 极强的扩展能力。使用 gRPC 作为 RPC 通信框架,支持各个主流编程语言 +- 高吞吐。连接层以完全的异步非阻塞式 I/O 的方式实现 +- 连接层透明。完全的支持 TCP\TLS UDP\DTLS 类型的连接管理,并对上层提供统一个 API +- 连接层的管理能力。例如,最大连接数,连接和吞吐的速率限制,IP 黑名单 等 + +## 架构 + +![Extension-Protocol Arch](images/exproto-arch.jpg) + +该插件主要需要处理的内容包括: + +1. **连接层:** 该部分主要**维持 Socket 的生命周期,和数据的收发**。它的功能要求包括: + - 监听某个端口。当有新的 TCP/UDP 连接到达后,启动一个连接进程,来维持连接的状态。 + - 调用 `OnSocketCreated` 回调。用于通知外部模块**已新建立了一个连接**。 + - 调用 `OnScoektClosed` 回调。用于通知外部模块连接**已关闭**。 + - 调用 `OnReceivedBytes` 回调。用于通知外部模块**该连接新收到的数据包**。 + - 提供 `Send` 接口。供外部模块调用,**用于发送数据包**。 + - 提供 `Close` 接口。供外部模块调用,**用于主动关闭连接**。 + +2. **协议/会话层:**该部分主要**提供 PUB/SUB 接口**,以实现与 EMQ X Broker 系统的消息互通。包括: + + - 提供 `Authenticate` 接口。供外部模块调用,用于向集群注册客户端。 + - 提供 `StartTimer` 接口。供外部模块调用,用于为该连接进程启动心跳等定时器。 + - 提供 `Publish` 接口。供外部模块调用,用于发布消息 EMQ X Broker 中。 + - 提供 `Subscribe` 接口。供外部模块调用,用于订阅某主题,以实现从 EMQ X Broker 中接收某些下行消息。 + - 提供 `Unsubscribe` 接口。供外部模块调用,用于取消订阅某主题。 + - 调用 `OnTimerTimeout` 回调。用于处理定时器超时的事件。 + - 调用 `OnReceivedMessages` 回调。用于接收下行消息(在订阅主题成功后,如果主题上有消息,便会回调该方法) + +## 接口设计 + +从 gRPC 上的逻辑来说,emqx-exproto 会作为客户端向用户的 `ProtocolHandler` 服务发送回调请求。同时,它也会作为服务端向用户提供 `ConnectionAdapter` 服务,以提供 emqx-exproto 各个接口的访问。如图: + +![Extension Protocol gRPC Arch](images/exproto-grpc-arch.jpg) + + +详情参见:`priv/protos/exproto.proto`,例如接口的定义有: + +```protobuff +syntax = "proto3"; + +package emqx.exproto.v1; + +// The Broker side serivce. It provides a set of APIs to +// handle a protcol access +service ConnectionAdapter { + + // -- socket layer + + rpc Send(SendBytesRequest) returns (CodeResponse) {}; + + rpc Close(CloseSocketRequest) returns (CodeResponse) {}; + + // -- protocol layer + + rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {}; + + rpc StartTimer(TimerRequest) returns (CodeResponse) {}; + + // -- pub/sub layer + + rpc Publish(PublishRequest) returns (CodeResponse) {}; + + rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; + + rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; +} + +service ConnectionHandler { + + // -- socket layer + + rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {}; + + rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {}; + + // -- pub/sub layer + + rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {}; + + rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {}; +} +``` + +## 配置项设计 + +1. 以 **监听器( Listener)** 为基础,提供 TCP/UDP 的监听。 + - Listener 目前仅支持:TCP、TLS、UDP、DTLS。(ws、wss、quic 暂不支持) +2. 每个监听器,会指定一个 `ProtocolHandler` 的服务地址,用于调用外部模块的接口。 +3. emqx-exproto 还会监听一个 gRPC 端口用于提供对 `ConnectionAdapter` 服务的访问。 + +例如: + +``` properties +## gRPC 服务监听地址 (HTTP) +## +exproto.server.http.url = http://127.0.0.1:9002 + +## gRPC 服务监听地址 (HTTPS) +## +exproto.server.https.url = https://127.0.0.1:9002 +exproto.server.https.cacertfile = ca.pem +exproto.server.https.certfile = cert.pem +exproto.server.https.keyfile = key.pem + +## Listener 配置 +## 例如,名称为 protoname 协议的 TCP 监听器配置 +exproto.listener.protoname = tcp://0.0.0.0:7993 + +## ProtocolHandler 服务地址及 https 的证书配置 +exproto.listener.protoname.proto_handler_url = http://127.0.0.1:9001 +#exproto.listener.protoname.proto_handler_certfile = +#exproto.listener.protoname.proto_handler_cacertfile = +#exproto.listener.protoname.proto_handler_keyfile = + +# ... +``` diff --git a/apps/emqx_exproto/docs/images/exproto-arch.jpg b/apps/emqx_exproto/docs/images/exproto-arch.jpg new file mode 100644 index 000000000..dddf7996b Binary files /dev/null and b/apps/emqx_exproto/docs/images/exproto-arch.jpg differ diff --git a/apps/emqx_exproto/docs/images/exproto-grpc-arch.jpg b/apps/emqx_exproto/docs/images/exproto-grpc-arch.jpg new file mode 100644 index 000000000..71efa76f9 Binary files /dev/null and b/apps/emqx_exproto/docs/images/exproto-grpc-arch.jpg differ diff --git a/apps/emqx_exproto/include/emqx_exproto.hrl b/apps/emqx_exproto/include/emqx_exproto.hrl new file mode 100644 index 000000000..079a1e60f --- /dev/null +++ b/apps/emqx_exproto/include/emqx_exproto.hrl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(APP, emqx_exproto). + +-define(TCP_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true}, + {backlog, 512}, {nodelay, true}]). + +%% TODO: +-define(UDP_SOCKOPTS, []). + +%%-------------------------------------------------------------------- +%% gRPC result code + +-define(RESP_UNKNOWN, 'UNKNOWN'). +-define(RESP_SUCCESS, 'SUCCESS'). +-define(RESP_CONN_PROCESS_NOT_ALIVE, 'CONN_PROCESS_NOT_ALIVE'). +-define(RESP_PARAMS_TYPE_ERROR, 'PARAMS_TYPE_ERROR'). +-define(RESP_REQUIRED_PARAMS_MISSED, 'REQUIRED_PARAMS_MISSED'). +-define(RESP_PERMISSION_DENY, 'PERMISSION_DENY'). +-define(IS_GRPC_RESULT_CODE(C), ( C =:= ?RESP_SUCCESS + orelse C =:= ?RESP_CONN_PROCESS_NOT_ALIVE + orelse C =:= ?RESP_REQUIRED_PARAMS_MISSED + orelse C =:= ?RESP_PERMISSION_DENY)). diff --git a/apps/emqx_exproto/priv/emqx_exproto.schema b/apps/emqx_exproto/priv/emqx_exproto.schema new file mode 100644 index 000000000..fb114dc77 --- /dev/null +++ b/apps/emqx_exproto/priv/emqx_exproto.schema @@ -0,0 +1,364 @@ +%% -*-: erlang -*- + +%%-------------------------------------------------------------------- +%% Services + +{mapping, "exproto.server.http.port", "emqx_exproto.servers", [ + {datatype, integer} +]}. + +{mapping, "exproto.server.https.port", "emqx_exproto.servers", [ + {datatype, integer} +]}. + +{mapping, "exproto.server.https.cacertfile", "emqx_exproto.servers", [ + {datatype, string} +]}. + +{mapping, "exproto.server.https.certfile", "emqx_exproto.servers", [ + {datatype, string} +]}. + +{mapping, "exproto.server.https.keyfile", "emqx_exproto.servers", [ + {datatype, string} +]}. + +{translation, "emqx_exproto.servers", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + Http = case cuttlefish:conf_get("exproto.server.http.port", Conf, undefined) of + undefined -> []; + P1 -> [{http, P1, []}] + end, + Https = case cuttlefish:conf_get("exproto.server.https.port", Conf, undefined) of + undefined -> []; + P2 -> + [{https, P2, + Filter([{ssl, true}, + {certfile, cuttlefish:conf_get("exproto.server.https.certfile", Conf)}, + {keyfile, cuttlefish:conf_get("exproto.server.https.keyfile", Conf)}, + {cacertfile, cuttlefish:conf_get("exproto.server.https.cacertfile", Conf)}])}] + end, + Http ++ Https +end}. + +%%-------------------------------------------------------------------- +%% Listeners + +{mapping, "exproto.listener.$proto", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.connection_handler_url", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.connection_handler_certfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.connection_handler_cacertfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.connection_handler_keyfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.acceptors", "emqx_exproto.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "exproto.listener.$proto.max_connections", "emqx_exproto.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "exproto.listener.$proto.max_conn_rate", "emqx_exproto.listeners", [ + {datatype, integer} +]}. + +{mapping, "exproto.listener.$proto.active_n", "emqx_exproto.listeners", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "exproto.listener.$proto.idle_timeout", "emqx_exproto.listeners", [ + {default, "30s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "exproto.listener.$proto.access.$id", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.proxy_protocol", "emqx_exproto.listeners", [ + {datatype, flag} +]}. + +{mapping, "exproto.listener.$proto.proxy_protocol_timeout", "emqx_exproto.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "exproto.listener.$proto.backlog", "emqx_exproto.listeners", [ + {datatype, integer}, + {default, 1024} +]}. + +{mapping, "exproto.listener.$proto.send_timeout", "emqx_exproto.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "exproto.listener.$proto.send_timeout_close", "emqx_exproto.listeners", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "exproto.listener.$proto.recbuf", "emqx_exproto.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "exproto.listener.$proto.sndbuf", "emqx_exproto.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "exproto.listener.$proto.buffer", "emqx_exproto.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "exproto.listener.$proto.tune_buffer", "emqx_exproto.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "exproto.listener.$proto.nodelay", "emqx_exproto.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "exproto.listener.$proto.reuseaddr", "emqx_exproto.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% TLS Options + +{mapping, "exproto.listener.$proto.tls_versions", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.ciphers", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.psk_ciphers", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.dhfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.keyfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.certfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.cacertfile", "emqx_exproto.listeners", [ + {datatype, string} +]}. + +{mapping, "exproto.listener.$proto.verify", "emqx_exproto.listeners", [ + {datatype, atom} +]}. + +{mapping, "exproto.listener.$proto.fail_if_no_peer_cert", "emqx_exproto.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "exproto.listener.$proto.secure_renegotiate", "emqx_exproto.listeners", [ + {datatype, flag} +]}. + +{mapping, "exproto.listener.$proto.reuse_sessions", "emqx_exproto.listeners", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "exproto.listener.$proto.honor_cipher_order", "emqx_exproto.listeners", [ + {datatype, flag} +]}. + +{translation, "emqx_exproto.listeners", fun(Conf) -> + + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + + Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, + + Access = fun(S) -> + [A, CIDR] = string:tokens(S, " "), + {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} + end, + + AccOpts = fun(Prefix) -> + case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of + [] -> []; + Rules -> [{access_rules, [Access(Rule) || {_, Rule} <- Rules]}] + end + end, + + RateLimit = fun(undefined) -> + undefined; + (Val) -> + [L, D] = string:tokens(Val, ", "), + Limit = case cuttlefish_bytesize:parse(L) of + Sz when is_integer(Sz) -> Sz; + {error, Reason} -> error(Reason) + end, + Duration = case cuttlefish_duration:parse(D, s) of + Secs when is_integer(Secs) -> Secs; + {error, Reason1} -> error(Reason1) + end, + Rate = Limit / Duration, + {Rate, Limit} + end, + + HandlerOpts = fun(Prefix) -> + Opts = + case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".connection_handler_url", Conf)) of + {ok, {http, _, Host, Port, _, _}} -> + [{scheme, http}, {host, Host}, {port, Port}]; + {ok, {https, _, Host, Port, _, _}} -> + [{scheme, https}, {host, Host}, {port, Port}, + {ssl_options, + Filter([{certfile, cuttlefish:conf_get(Prefix ++ ".connection_handler_certfile", Conf)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".connection_handler_keyfile", Conf)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".connection_handler_cacertfile", Conf)} + ])}]; + _ -> + error(invaild_connection_handler_url) + end, + [{handler, Opts}] + end, + + ConnOpts = fun(Prefix) -> + Filter([{active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)}, + {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}]) + end, + + LisOpts = fun(Prefix) -> + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, + {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, + {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)} | AccOpts(Prefix)]) + end, + + TcpOpts = fun(Prefix) -> + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) + end, + SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, + MapPSKCiphers = fun(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers) + end, + SslOpts = fun(Prefix) -> + Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of + undefined -> undefined; + L -> [list_to_atom(V) || V <- L] + end, + TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined), + PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined), + Ciphers = + case {TLSCiphers, PSKCiphers} of + {undefined, undefined} -> + cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent"); + {TLSCiphers, undefined} -> + SplitFun(TLSCiphers); + {undefined, PSKCiphers} -> + MapPSKCiphers(SplitFun(PSKCiphers)); + {_TLSCiphers, _PSKCiphers} -> + cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time") + end, + UserLookupFun = + case PSKCiphers of + undefined -> undefined; + _ -> {fun emqx_psk:lookup/3, <<>>} + end, + Filter([{versions, Versions}, + {ciphers, Ciphers}, + {user_lookup_fun, UserLookupFun}, + %{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, + {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, + {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, + {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, + {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, + {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) + end, + + UdpOpts = fun(Prefix) -> + Filter([{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) + end, + + ParseListenOn = fun(ListenOn) -> + case string:tokens(ListenOn, "://") of + [Port] -> {tcp, list_to_integer(Port)}; + [T, Ip, Port] + when T =:= "tcp"; T =:= "ssl"; + T =:= "udp"; T =:= "dtls" -> + {Atom(T), {Ip, list_to_integer(Port)}} + end + end, + + Listeners = fun(Proto) -> + Prefix = string:join(["exproto","listener", Proto], "."), + Opts = HandlerOpts(Prefix) ++ ConnOpts(Prefix) ++ LisOpts(Prefix), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> []; + ListenOn0 -> + case ParseListenOn(ListenOn0) of + {tcp, ListenOn} -> + [{Proto, tcp, ListenOn, [{tcp_options, TcpOpts(Prefix)} | Opts]}]; + {ssl, ListenOn} -> + [{Proto, ssl, ListenOn, [{tcp_options, TcpOpts(Prefix)}, + {ssl_options, SslOpts(Prefix)} | Opts]}]; + {udp, ListenOn} -> + [{Proto, udp, ListenOn, [{udp_options, UdpOpts(Prefix)} | Opts]}]; + {dtls, ListenOn} -> + [{Proto, dtls, ListenOn, [{udp_options, UdpOpts(Prefix)}, + {dtls_options, SslOpts(Prefix)} | Opts]}]; + {_, _} -> + cuttlefish:invalid("Not supported listener type") + end + end + end, + lists:flatten([Listeners(Proto) || {[_, "listener", Proto], ListenOn} + <- cuttlefish_variable:filter_by_prefix("exproto.listener", Conf)]) +end}. diff --git a/apps/emqx_exproto/priv/protos/exproto.proto b/apps/emqx_exproto/priv/protos/exproto.proto new file mode 100644 index 000000000..633cc1758 --- /dev/null +++ b/apps/emqx_exproto/priv/protos/exproto.proto @@ -0,0 +1,259 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//------------------------------------------------------------------------------ + +syntax = "proto3"; + +package emqx.exproto.v1; + +// The Broker side serivce. It provides a set of APIs to +// handle a protcol access +service ConnectionAdapter { + + // -- socket layer + + rpc Send(SendBytesRequest) returns (CodeResponse) {}; + + rpc Close(CloseSocketRequest) returns (CodeResponse) {}; + + // -- protocol layer + + rpc Authenticate(AuthenticateRequest) returns (CodeResponse) {}; + + rpc StartTimer(TimerRequest) returns (CodeResponse) {}; + + // -- pub/sub layer + + rpc Publish(PublishRequest) returns (CodeResponse) {}; + + rpc Subscribe(SubscribeRequest) returns (CodeResponse) {}; + + rpc Unsubscribe(UnsubscribeRequest) returns (CodeResponse) {}; +} + +service ConnectionHandler { + + // -- socket layer + + rpc OnSocketCreated(SocketCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSocketClosed(SocketClosedRequest) returns (EmptySuccess) {}; + + rpc OnReceivedBytes(ReceivedBytesRequest) returns (EmptySuccess) {}; + + // -- pub/sub layer + + rpc OnTimerTimeout(TimerTimeoutRequest) returns (EmptySuccess) {}; + + rpc OnReceivedMessages(ReceivedMessagesRequest) returns (EmptySuccess) {}; +} + +message EmptySuccess { } + +enum ResultCode { + + // Operation successfully + SUCCESS = 0; + + // Unknown Error + UNKNOWN = 1; + + // Connection process is not alive + CONN_PROCESS_NOT_ALIVE = 2; + + // Miss the required parameter + REQUIRED_PARAMS_MISSED = 3; + + // Params type or values incorrect + PARAMS_TYPE_ERROR = 4; + + // No permission or Pre-conditions not fulfilled + PERMISSION_DENY = 5; +} + +message CodeResponse { + + ResultCode code = 1; + + // The reason message if result is false + string message = 2; +} + +message SendBytesRequest { + + string conn = 1; + + bytes bytes = 2; +} + +message CloseSocketRequest { + + string conn = 1; +} + +message AuthenticateRequest { + + string conn = 1; + + ClientInfo clientinfo = 2; + + string password = 3; +} + +message TimerRequest { + + string conn = 1; + + TimerType type = 2; + + uint32 interval = 3; +} + +enum TimerType { + + KEEPALIVE = 0; +} + +message PublishRequest { + + string conn = 1; + + string topic = 2; + + uint32 qos = 3; + + bytes payload = 4; +} + +message SubscribeRequest { + + string conn = 1; + + string topic = 2; + + uint32 qos = 3; +} + +message UnsubscribeRequest { + + string conn = 1; + + string topic = 2; +} + +message SocketCreatedRequest { + + string conn = 1; + + ConnInfo conninfo = 2; +} + +message ReceivedBytesRequest { + + string conn = 1; + + bytes bytes = 2; +} + +message TimerTimeoutRequest { + + string conn = 1; + + TimerType type = 2; +} + +message SocketClosedRequest { + + string conn = 1; + + string reason = 2; +} + +message ReceivedMessagesRequest { + + string conn = 1; + + repeated Message messages = 2; +} + +//-------------------------------------------------------------------- +// Basic data types +//-------------------------------------------------------------------- + +message ConnInfo { + + SocketType socktype = 1; + + Address peername = 2; + + Address sockname = 3; + + CertificateInfo peercert = 4; +} + +enum SocketType { + + TCP = 0; + + SSL = 1; + + UDP = 2; + + DTLS = 3; +} + +message Address { + + string host = 1; + + uint32 port = 2; +} + +message CertificateInfo { + + string cn = 1; + + string dn = 2; +} + +message ClientInfo { + + string proto_name = 1; + + string proto_ver = 2; + + string clientid = 3; + + string username = 4; + + string mountpoint = 5; +} + +message Message { + + string node = 1; + + string id = 2; + + uint32 qos = 3; + + string from = 4; + + string topic = 5; + + bytes payload = 6; + + uint64 timestamp = 7; +} diff --git a/apps/emqx_exproto/rebar.config b/apps/emqx_exproto/rebar.config new file mode 100644 index 000000000..e989b5aeb --- /dev/null +++ b/apps/emqx_exproto/rebar.config @@ -0,0 +1,50 @@ +%%-*- mode: erlang -*- +{edoc_opts, [{preprocess, true}]}. + +{erl_opts, [warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard, + debug_info, + {parse_transform}]}. + +{plugins, + [{grpcbox_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {branch, "master"}}} + ]}. + +{deps, + [{grpcbox, {git, "https://github.com/tsloughter/grpcbox", {branch, "master"}}} + ]}. + +{grpc, + [{type, all}, + {protos, ["priv/protos"]}, + {gpb_opts, [{module_name_prefix, "emqx_"}, + {module_name_suffix, "_pb"}]} + ]}. + +{provider_hooks, + [{pre, [{compile, {grpc, gen}}]}]}. + +{xref_checks, [undefined_function_calls, undefined_functions, + locals_not_used, deprecated_function_calls, + warnings_as_errors, deprecated_functions]}. + +{xref_ignores, [emqx_exproto_pb]}. + +{cover_enabled, true}. +{cover_opts, [verbose]}. +{cover_export_enabled, true}. +{cover_excl_mods, [emqx_exproto_pb, + emqx_exproto_v_1_connection_adapter_client, + emqx_exproto_v_1_connection_adapter_bhvr, + emqx_exproto_v_1_connection_handler_client, + emqx_exproto_v_1_connection_handler_bhvr]}. + +{profiles, + [{test, [ + {deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.0"}}} + , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + ]} + ]} +]}. diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src new file mode 100644 index 000000000..2824202d7 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -0,0 +1,12 @@ +{application, emqx_exproto, + [{description, "EMQ X Extension for Protocol"}, + {vsn, "git"}, + {modules, []}, + {registered, []}, + {mod, {emqx_exproto_app, []}}, + {applications, [kernel,stdlib,grpcbox]}, + {env,[]}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}]} + ]}. diff --git a/apps/emqx_exproto/src/emqx_exproto.erl b/apps/emqx_exproto/src/emqx_exproto.erl new file mode 100644 index 000000000..8ec382901 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto.erl @@ -0,0 +1,183 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto). + +-include("emqx_exproto.hrl"). + +-export([ start_listeners/0 + , stop_listeners/0 + , start_listener/1 + , start_listener/4 + , stop_listener/4 + , stop_listener/1 + ]). + +-export([ start_servers/0 + , stop_servers/0 + , start_server/1 + , stop_server/1 + ]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec(start_listeners() -> ok). +start_listeners() -> + Listeners = application:get_env(?APP, listeners, []), + NListeners = [start_connection_handler_instance(Listener) + || Listener <- Listeners], + lists:foreach(fun start_listener/1, NListeners). + +-spec(stop_listeners() -> ok). +stop_listeners() -> + Listeners = application:get_env(?APP, listeners, []), + lists:foreach(fun stop_connection_handler_instance/1, Listeners), + lists:foreach(fun stop_listener/1, Listeners). + +-spec(start_servers() -> ok). +start_servers() -> + lists:foreach(fun start_server/1, application:get_env(?APP, servers, [])). + +-spec(stop_servers() -> ok). +stop_servers() -> + lists:foreach(fun stop_server/1, application:get_env(?APP, servers, [])). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +start_connection_handler_instance({_Proto, _LisType, _ListenOn, Opts}) -> + Name = name(_Proto, _LisType), + {value, {_, HandlerOpts}, LisOpts} = lists:keytake(handler, 1, Opts), + {Endpoints, ChannelOptions} = handler_opts(HandlerOpts), + case emqx_exproto_sup:start_grpc_client_channel(Name, Endpoints, ChannelOptions) of + {ok, _ClientChannelPid} -> + {_Proto, _LisType, _ListenOn, [{handler, Name} | LisOpts]}; + {error, Reason} -> + io:format(standard_error, "Failed to start ~s's connection handler - ~0p~n!", + [Name, Reason]), + error(Reason) + end. + +stop_connection_handler_instance({_Proto, _LisType, _ListenOn, _Opts}) -> + Name = name(_Proto, _LisType), + _ = emqx_exproto_sup:stop_grpc_client_channel(Name), + ok. + +start_server({Name, Port, SSLOptions}) -> + case emqx_exproto_sup:start_grpc_server(Name, Port, SSLOptions) of + {ok, _} -> + io:format("Start ~s gRPC server on ~w successfully.~n", + [Name, Port]); + {error, Reason} -> + io:format(standard_error, "Failed to start ~s gRPC server on ~w - ~0p~n!", + [Name, Port, Reason]), + error({failed_start_server, Reason}) + end. + +stop_server({Name, Port, _SSLOptions}) -> + ok = emqx_exproto_sup:stop_grpc_server(Name), + io:format("Stop ~s gRPC server on ~w successfully.~n", [Name, Port]). + +start_listener({Proto, LisType, ListenOn, Opts}) -> + Name = name(Proto, LisType), + case start_listener(LisType, Name, ListenOn, Opts) of + {ok, _} -> + io:format("Start ~s listener on ~s successfully.~n", + [Name, format(ListenOn)]); + {error, Reason} -> + io:format(standard_error, "Failed to start ~s listener on ~s - ~0p~n!", + [Name, format(ListenOn), Reason]), + error(Reason) + end. + +%% @private +start_listener(LisType, Name, ListenOn, LisOpts) + when LisType =:= tcp; + LisType =:= ssl -> + SockOpts = esockd:parse_opt(LisOpts), + esockd:open(Name, ListenOn, merge_tcp_default(SockOpts), + {emqx_exproto_conn, start_link, [LisOpts-- SockOpts]}); + +start_listener(udp, Name, ListenOn, LisOpts) -> + SockOpts = esockd:parse_opt(LisOpts), + esockd:open_udp(Name, ListenOn, merge_udp_default(SockOpts), + {emqx_exproto_conn, start_link, [LisOpts-- SockOpts]}); + +start_listener(dtls, Name, ListenOn, LisOpts) -> + SockOpts = esockd:parse_opt(LisOpts), + esockd:open_dtls(Name, ListenOn, merge_udp_default(SockOpts), + {emqx_exproto_conn, start_link, [LisOpts-- SockOpts]}). + +stop_listener({Proto, LisType, ListenOn, Opts}) -> + Name = name(Proto, LisType), + StopRet = stop_listener(LisType, Name, ListenOn, Opts), + case StopRet of + ok -> + io:format("Stop ~s listener on ~s successfully.~n", + [Name, format(ListenOn)]); + {error, Reason} -> + io:format(standard_error, "Failed to stop ~s listener on ~s - ~p~n.", + [Name, format(ListenOn), Reason]) + end, + StopRet. + +%% @private +stop_listener(_LisType, Name, ListenOn, _Opts) -> + esockd:close(Name, ListenOn). + +%% @private +name(Proto, LisType) -> + list_to_atom(lists:flatten(io_lib:format("~s:~s", [Proto, LisType]))). + +%% @private +format(Port) when is_integer(Port) -> + io_lib:format("0.0.0.0:~w", [Port]); +format({Addr, Port}) when is_list(Addr) -> + io_lib:format("~s:~w", [Addr, Port]); +format({Addr, Port}) when is_tuple(Addr) -> + io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). + +%% @private +merge_tcp_default(Opts) -> + case lists:keytake(tcp_options, 1, Opts) of + {value, {tcp_options, TcpOpts}, Opts1} -> + [{tcp_options, emqx_misc:merge_opts(?TCP_SOCKOPTS, TcpOpts)} | Opts1]; + false -> + [{tcp_options, ?TCP_SOCKOPTS} | Opts] + end. + +merge_udp_default(Opts) -> + case lists:keytake(udp_options, 1, Opts) of + {value, {udp_options, TcpOpts}, Opts1} -> + [{udp_options, emqx_misc:merge_opts(?UDP_SOCKOPTS, TcpOpts)} | Opts1]; + false -> + [{udp_options, ?UDP_SOCKOPTS} | Opts] + end. + +%% @private +handler_opts(Opts) -> + Scheme = proplists:get_value(scheme, Opts), + Host = proplists:get_value(host, Opts), + Port = proplists:get_value(port, Opts), + Options = proplists:get_value(options, Opts, []), + SslOpts = case Scheme of + https -> proplists:get_value(ssl_options, Opts, []); + _ -> [] + end, + {[{Scheme, Host, Port, SslOpts}], maps:from_list(Options)}. diff --git a/apps/emqx_exproto/src/emqx_exproto_app.erl b/apps/emqx_exproto/src/emqx_exproto_app.erl new file mode 100644 index 000000000..73e8a65bc --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto_app.erl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto_app). + +-behaviour(application). + +-emqx_plugin(extension). + +-export([start/2, prep_stop/1, stop/1]). + +start(_StartType, _StartArgs) -> + {ok, Sup} = emqx_exproto_sup:start_link(), + emqx_exproto:start_servers(), + emqx_exproto:start_listeners(), + {ok, Sup}. + +prep_stop(State) -> + emqx_exproto:stop_servers(), + emqx_exproto:stop_listeners(), + State. + +stop(_State) -> + ok. diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl new file mode 100644 index 000000000..ee76cfd93 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -0,0 +1,599 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto_channel). + +-include("emqx_exproto.hrl"). +-include_lib("emqx_libs/include/emqx.hrl"). +-include_lib("emqx_libs/include/emqx_mqtt.hrl"). +-include_lib("emqx_libs/include/types.hrl"). +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExProto Channel]"). + +-export([ info/1 + , info/2 + , stats/1 + ]). + +-export([ init/2 + , handle_in/2 + , handle_deliver/2 + , handle_timeout/3 + , handle_call/2 + , handle_cast/2 + , handle_info/2 + , terminate/2 + ]). + +-export_type([channel/0]). + +-record(channel, { + %% gRPC channel options + gcli :: map(), + %% Conn info + conninfo :: emqx_types:conninfo(), + %% Client info from `register` function + clientinfo :: maybe(map()), + %% Connection state + conn_state :: conn_state(), + %% Subscription + subscriptions = #{}, + %% Request queue + rqueue = queue:new(), + %% Inflight function name + inflight = undefined, + %% Keepalive + keepalive :: maybe(emqx_keepalive:keepalive()), + %% Timers + timers :: #{atom() => disabled | maybe(reference())}, + %% Closed reason + closed_reason = undefined + }). + +-opaque(channel() :: #channel{}). + +-type(conn_state() :: idle | connecting | connected | disconnected). + +-type(reply() :: {outgoing, binary()} + | {outgoing, [binary()]} + | {close, Reason :: atom()}). + +-type(replies() :: emqx_types:packet() | reply() | [reply()]). + +-define(TIMER_TABLE, #{ + alive_timer => keepalive, + force_timer => force_close + }). + +-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). + +-define(SESSION_STATS_KEYS, + [subscriptions_cnt, + subscriptions_max, + inflight_cnt, + inflight_max, + mqueue_len, + mqueue_max, + mqueue_dropped, + next_pkt_id, + awaiting_rel_cnt, + awaiting_rel_max + ]). + +%%-------------------------------------------------------------------- +%% Info, Attrs and Caps +%%-------------------------------------------------------------------- + +%% @doc Get infos of the channel. +-spec(info(channel()) -> emqx_types:infos()). +info(Channel) -> + maps:from_list(info(?INFO_KEYS, Channel)). + +-spec(info(list(atom())|atom(), channel()) -> term()). +info(Keys, Channel) when is_list(Keys) -> + [{Key, info(Key, Channel)} || Key <- Keys]; +info(conninfo, #channel{conninfo = ConnInfo}) -> + ConnInfo; +info(clientid, #channel{clientinfo = ClientInfo}) -> + maps:get(clientid, ClientInfo, undefined); +info(clientinfo, #channel{clientinfo = ClientInfo}) -> + ClientInfo; +info(session, #channel{subscriptions = Subs, + conninfo = ConnInfo}) -> + #{subscriptions => Subs, + upgrade_qos => false, + retry_interval => 0, + await_rel_timeout => 0, + created_at => maps:get(connected_at, ConnInfo)}; +info(conn_state, #channel{conn_state = ConnState}) -> + ConnState; +info(will_msg, _) -> + undefined. + +-spec(stats(channel()) -> emqx_types:stats()). +stats(#channel{subscriptions = Subs}) -> + [{subscriptions_cnt, maps:size(Subs)}, + {subscriptions_max, 0}, + {inflight_cnt, 0}, + {inflight_max, 0}, + {mqueue_len, 0}, + {mqueue_max, 0}, + {mqueue_dropped, 0}, + {next_pkt_id, 0}, + {awaiting_rel_cnt, 0}, + {awaiting_rel_max, 0}]. + +%%-------------------------------------------------------------------- +%% Init the channel +%%-------------------------------------------------------------------- + +-spec(init(emqx_exproto_types:conninfo(), proplists:proplist()) -> channel()). +init(ConnInfo = #{socktype := Socktype, + peername := Peername, + sockname := Sockname, + peercert := Peercert}, Options) -> + GRpcChann = proplists:get_value(handler, Options), + NConnInfo = default_conninfo(ConnInfo), + ClientInfo = default_clientinfo(ConnInfo), + Channel = #channel{gcli = #{channel => GRpcChann}, + conninfo = NConnInfo, + clientinfo = ClientInfo, + conn_state = connecting, + timers = #{} + }, + + Req = #{conninfo => + peercert(Peercert, + #{socktype => socktype(Socktype), + peername => address(Peername), + sockname => address(Sockname)})}, + try_dispatch(on_socket_created, wrap(Req), Channel). + +%% @private +peercert(nossl, ConnInfo) -> + ConnInfo; +peercert(Peercert, ConnInfo) -> + ConnInfo#{peercert => + #{cn => esockd_peercert:common_name(Peercert), + dn => esockd_peercert:subject(Peercert)}}. + +%% @private +socktype(tcp) -> 'TCP'; +socktype(ssl) -> 'SSL'; +socktype(udp) -> 'UDP'; +socktype(dtls) -> 'DTLS'. + +%% @private +address({Host, Port}) -> + #{host => inet:ntoa(Host), port => Port}. + +%%-------------------------------------------------------------------- +%% Handle incoming packet +%%-------------------------------------------------------------------- + +-spec(handle_in(binary(), channel()) + -> {ok, channel()} + | {shutdown, Reason :: term(), channel()}). +handle_in(Data, Channel) -> + Req = #{bytes => Data}, + {ok, try_dispatch(on_received_bytes, wrap(Req), Channel)}. + +-spec(handle_deliver(list(emqx_types:deliver()), channel()) + -> {ok, channel()} + | {shutdown, Reason :: term(), channel()}). +handle_deliver(Delivers, Channel = #channel{clientinfo = ClientInfo}) -> + %% XXX: ?? Nack delivers from shared subscriptions + Mountpoint = maps:get(mountpoint, ClientInfo), + NodeStr = atom_to_binary(node(), utf8), + Msgs = lists:map(fun({_, _, Msg}) -> + ok = emqx_metrics:inc('messages.delivered'), + Msg1 = emqx_hooks:run_fold('message.delivered', + [ClientInfo], Msg), + NMsg = emqx_mountpoint:unmount(Mountpoint, Msg1), + #{node => NodeStr, + id => hexstr(emqx_message:id(NMsg)), + qos => emqx_message:qos(NMsg), + from => fmt_from(emqx_message:from(NMsg)), + topic => emqx_message:topic(NMsg), + payload => emqx_message:payload(NMsg), + timestamp => emqx_message:timestamp(NMsg) + } + end, Delivers), + Req = #{messages => Msgs}, + {ok, try_dispatch(on_received_messages, wrap(Req), Channel)}. + +-spec(handle_timeout(reference(), Msg :: term(), channel()) + -> {ok, channel()} + | {shutdown, Reason :: term(), channel()}). +handle_timeout(_TRef, {keepalive, _StatVal}, + Channel = #channel{keepalive = undefined}) -> + {ok, Channel}; +handle_timeout(_TRef, {keepalive, StatVal}, + Channel = #channel{keepalive = Keepalive}) -> + case emqx_keepalive:check(StatVal, Keepalive) of + {ok, NKeepalive} -> + NChannel = Channel#channel{keepalive = NKeepalive}, + {ok, reset_timer(alive_timer, NChannel)}; + {error, timeout} -> + Req = #{type => 'KEEPALIVE'}, + {ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)} + end; + +handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> + {shutdown, {error, {force_close, Reason}}, Channel}; + +handle_timeout(_TRef, Msg, Channel) -> + ?WARN("Unexpected timeout: ~p", [Msg]), + {ok, Channel}. + +-spec(handle_call(any(), channel()) + -> {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()}). + +handle_call({send, Data}, Channel) -> + {reply, ok, [{outgoing, Data}], Channel}; + +handle_call(close, Channel = #channel{conn_state = connected}) -> + {reply, ok, [{event, disconnected}, {close, normal}], Channel}; +handle_call(close, Channel) -> + {reply, ok, [{close, normal}], Channel}; + +handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) -> + ?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]), + {ok, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel}; +handle_call({auth, ClientInfo0, Password}, + Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo), + NConnInfo = enrich_conninfo(ClientInfo1, ConnInfo), + + Channel1 = Channel#channel{conninfo = NConnInfo, + clientinfo = ClientInfo1}, + + #{clientid := ClientId, username := Username} = ClientInfo1, + + case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of + {ok, AuthResult} -> + emqx_logger:set_metadata_clientid(ClientId), + is_anonymous(AuthResult) andalso + emqx_metrics:inc('client.auth.anonymous'), + NClientInfo = maps:merge(ClientInfo1, AuthResult), + NChannel = Channel1#channel{clientinfo = NClientInfo}, + case emqx_cm:open_session(true, NClientInfo, NConnInfo) of + {ok, _Session} -> + ?LOG(debug, "Client ~s (Username: '~s') authorized successfully!", + [ClientId, Username]), + {reply, ok, [{event, connected}], ensure_connected(NChannel)}; + {error, Reason} -> + ?LOG(warning, "Client ~s (Username: '~s') open session failed for ~0p", + [ClientId, Username, Reason]), + {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel} + end; + {error, Reason} -> + ?LOG(warning, "Client ~s (Username: '~s') login failed for ~0p", + [ClientId, Username, Reason]), + {reply, {error, ?RESP_PERMISSION_DENY, Reason}, Channel} + end; + +handle_call({start_timer, keepalive, Interval}, + Channel = #channel{ + conninfo = ConnInfo, + clientinfo = ClientInfo + }) -> + NConnInfo = ConnInfo#{keepalive => Interval}, + NClientInfo = ClientInfo#{keepalive => Interval}, + NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, + {reply, ok, ensure_keepalive(NChannel)}; + +handle_call({subscribe, TopicFilter, Qos}, + Channel = #channel{ + conn_state = connected, + clientinfo = ClientInfo}) -> + case is_acl_enabled(ClientInfo) andalso + emqx_access_control:check_acl(ClientInfo, subscribe, TopicFilter) of + deny -> + {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; + _ -> + {ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), + {reply, ok, NChannel} + end; + +handle_call({unsubscribe, TopicFilter}, + Channel = #channel{conn_state = connected}) -> + {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), + {reply, ok, NChannel}; + +handle_call({publish, Topic, Qos, Payload}, + Channel = #channel{ + conn_state = connected, + clientinfo = ClientInfo + = #{clientid := From, + mountpoint := Mountpoint}}) -> + case is_acl_enabled(ClientInfo) andalso + emqx_access_control:check_acl(ClientInfo, publish, Topic) of + deny -> + {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; + _ -> + Msg = emqx_message:make(From, Qos, Topic, Payload), + NMsg = emqx_mountpoint:mount(Mountpoint, Msg), + emqx:publish(NMsg), + {reply, ok, Channel} + end; + +handle_call(kick, Channel) -> + {shutdown, kicked, ok, Channel}; + +handle_call(Req, Channel) -> + ?LOG(warning, "Unexpected call: ~p", [Req]), + {reply, {error, unexpected_call}, Channel}. + +-spec(handle_cast(any(), channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()}). +handle_cast(Req, Channel) -> + ?WARN("Unexpected call: ~p", [Req]), + {ok, Channel}. + +-spec(handle_info(any(), channel()) + -> {ok, channel()} + | {shutdown, Reason :: term(), channel()}). +handle_info({subscribe, TopicFilters}, Channel) -> + do_subscribe(TopicFilters, Channel); + +handle_info({unsubscribe, TopicFilters}, Channel) -> + do_unsubscribe(TopicFilters, Channel); + +handle_info({sock_closed, Reason}, + Channel = #channel{rqueue = Queue, inflight = Inflight}) -> + case queue:len(Queue) =:= 0 + andalso Inflight =:= undefined of + true -> + {shutdown, {sock_closed, Reason}, Channel}; + _ -> + %% delayed close process for flushing all callback funcs to gRPC server + Channel1 = Channel#channel{closed_reason = {sock_closed, Reason}}, + Channel2 = ensure_timer(force_timer, Channel1), + {ok, ensure_disconnected({sock_closed, Reason}, Channel2)} + end; + +handle_info({hreply, on_socket_created, {ok, _}}, Channel) -> + dispatch_or_close_process(Channel#channel{inflight = undefined}); +handle_info({hreply, FunName, {ok, _}}, Channel) + when FunName == on_socket_closed; + FunName == on_received_bytes; + FunName == on_received_messages; + FunName == on_timer_timeout -> + dispatch_or_close_process(Channel#channel{inflight = undefined}); +handle_info({hreply, FunName, {error, Reason}}, Channel) -> + {shutdown, {error, {FunName, Reason}}, Channel}; + +handle_info(Info, Channel) -> + ?LOG(warning, "Unexpected info: ~p", [Info]), + {ok, Channel}. + +-spec(terminate(any(), channel()) -> channel()). +terminate(Reason, Channel) -> + Req = #{reason => stringfy(Reason)}, + try_dispatch(on_socket_closed, wrap(Req), Channel). + +is_anonymous(#{anonymous := true}) -> true; +is_anonymous(_AuthResult) -> false. + +%%-------------------------------------------------------------------- +%% Sub/UnSub +%%-------------------------------------------------------------------- + +do_subscribe(TopicFilters, Channel) -> + NChannel = lists:foldl( + fun({TopicFilter, SubOpts}, ChannelAcc) -> + do_subscribe(TopicFilter, SubOpts, ChannelAcc) + end, Channel, parse_topic_filters(TopicFilters)), + {ok, NChannel}. + +%% @private +do_subscribe(TopicFilter, SubOpts, Channel = + #channel{clientinfo = ClientInfo = #{mountpoint := Mountpoint}, + subscriptions = Subs}) -> + %% Mountpoint first + NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), + NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), + SubId = maps:get(clientid, ClientInfo, undefined), + IsNew = not maps:is_key(NTopicFilter, Subs), + case IsNew of + true -> + ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), + ok = emqx_hooks:run('session.subscribed', + [ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]), + Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}; + _ -> + %% Update subopts + ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), + Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}} + end. + +do_unsubscribe(TopicFilters, Channel) -> + NChannel = lists:foldl( + fun({TopicFilter, SubOpts}, ChannelAcc) -> + do_unsubscribe(TopicFilter, SubOpts, ChannelAcc) + end, Channel, parse_topic_filters(TopicFilters)), + {ok, NChannel}. + +%% @private +do_unsubscribe(TopicFilter, UnSubOpts, Channel = + #channel{clientinfo = ClientInfo = #{mountpoint := Mountpoint}, + subscriptions = Subs}) -> + NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), + case maps:find(NTopicFilter, Subs) of + {ok, SubOpts} -> + ok = emqx:unsubscribe(NTopicFilter), + ok = emqx_hooks:run('session.unsubscribed', + [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), + Channel#channel{subscriptions = maps:remove(NTopicFilter, Subs)}; + _ -> + Channel + end. + +%% @private +parse_topic_filters(TopicFilters) -> + lists:map(fun emqx_topic:parse/1, TopicFilters). + +-compile({inline, [is_acl_enabled/1]}). +is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> + (not IsSuperuser) andalso emqx_zone:enable_acl(Zone). + +%%-------------------------------------------------------------------- +%% Ensure & Hooks +%%-------------------------------------------------------------------- + +ensure_connected(Channel = #channel{conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)}, + ok = run_hooks('client.connected', [ClientInfo, NConnInfo]), + Channel#channel{conninfo = NConnInfo, + conn_state = connected + }. + +ensure_disconnected(Reason, Channel = #channel{ + conn_state = connected, + conninfo = ConnInfo, + clientinfo = ClientInfo}) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + ok = run_hooks('client.disconnected', [ClientInfo, Reason, NConnInfo]), + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}; + +ensure_disconnected(_Reason, Channel = #channel{conninfo = ConnInfo}) -> + NConnInfo = ConnInfo#{disconnected_at => erlang:system_time(millisecond)}, + Channel#channel{conninfo = NConnInfo, conn_state = disconnected}. + +run_hooks(Name, Args) -> + ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args). + +%%-------------------------------------------------------------------- +%% Enrich Keepalive + +ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) -> + ensure_keepalive_timer(maps:get(keepalive, ClientInfo, 0), Channel). + +ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> + Channel; +ensure_keepalive_timer(Interval, Channel) -> + Keepalive = emqx_keepalive:init(timer:seconds(Interval)), + ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). + +ensure_timer(Name, Channel = #channel{timers = Timers}) -> + TRef = maps:get(Name, Timers, undefined), + Time = interval(Name, Channel), + case TRef == undefined andalso Time > 0 of + true -> ensure_timer(Name, Time, Channel); + false -> Channel %% Timer disabled or exists + end. + +ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> + Msg = maps:get(Name, ?TIMER_TABLE), + TRef = emqx_misc:start_timer(Time, Msg), + Channel#channel{timers = Timers#{Name => TRef}}. + +reset_timer(Name, Channel) -> + ensure_timer(Name, clean_timer(Name, Channel)). + +clean_timer(Name, Channel = #channel{timers = Timers}) -> + Channel#channel{timers = maps:remove(Name, Timers)}. + +interval(force_timer, _) -> + 15000; +interval(alive_timer, #channel{keepalive = Keepalive}) -> + emqx_keepalive:info(interval, Keepalive). + +%%-------------------------------------------------------------------- +%% Dispatch +%%-------------------------------------------------------------------- + +wrap(Req) -> + Req#{conn => pid_to_list(self())}. + +dispatch_or_close_process(Channel = #channel{ + rqueue = Queue, + inflight = undefined, + gcli = GClient}) -> + case queue:out(Queue) of + {empty, _} -> + case Channel#channel.conn_state of + disconnected -> + {shutdown, Channel#channel.closed_reason, Channel}; + _ -> + {ok, Channel} + end; + {{value, {FunName, Req}}, NQueue} -> + emqx_exproto_gcli:async_call(FunName, Req, GClient), + {ok, Channel#channel{inflight = FunName, rqueue = NQueue}} + end. + +try_dispatch(FunName, Req, Channel = #channel{inflight = undefined, gcli = GClient}) -> + emqx_exproto_gcli:async_call(FunName, Req, GClient), + Channel#channel{inflight = FunName}; +try_dispatch(FunName, Req, Channel = #channel{rqueue = Queue}) -> + Channel#channel{rqueue = queue:in({FunName, Req}, Queue)}. + +%%-------------------------------------------------------------------- +%% Format +%%-------------------------------------------------------------------- + +enrich_conninfo(InClientInfo, ConnInfo) -> + Ks = [proto_name, proto_ver, clientid, username], + maps:merge(ConnInfo, maps:with(Ks, InClientInfo)). + +enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) -> + Ks = [clientid, username, mountpoint], + NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)), + NClientInfo#{protocol => ProtoName}. + +default_conninfo(ConnInfo) -> + ConnInfo#{proto_name => undefined, + proto_ver => undefined, + clean_start => true, + clientid => undefined, + username => undefined, + conn_props => [], + connected => true, + connected_at => erlang:system_time(millisecond), + keepalive => undefined, + receive_maximum => 0, + expiry_interval => 0}. + +default_clientinfo(#{peername := {PeerHost, _}, + sockname := {_, SockPort}}) -> + #{zone => external, + protocol => undefined, + peerhost => PeerHost, + sockport => SockPort, + clientid => undefined, + username => undefined, + is_bridge => false, + is_superuser => false, + mountpoint => undefined}. + +stringfy(Reason) -> + unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). + +hexstr(Bin) -> + [io_lib:format("~2.16.0B",[X]) || <> <= Bin]. + +fmt_from(undefined) -> <<>>; +fmt_from(Bin) when is_binary(Bin) -> Bin; +fmt_from(T) -> stringfy(T). diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl new file mode 100644 index 000000000..6b3b83ba8 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -0,0 +1,685 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% TCP/TLS/UDP/DTLS Connection +-module(emqx_exproto_conn). + +-include_lib("emqx_libs/include/types.hrl"). +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExProto Conn]"). + +%% API +-export([ start_link/3 + , stop/1 + ]). + +-export([ info/1 + , stats/1 + ]). + +-export([ call/2 + , cast/2 + ]). + +%% Callback +-export([init/4]). + +%% Sys callbacks +-export([ system_continue/3 + , system_terminate/4 + , system_code_change/4 + , system_get_state/1 + ]). + +%% Internal callback +-export([wakeup_from_hib/2]). + +-import(emqx_misc, [start_timer/2]). + +-record(state, { + %% TCP/SSL/UDP/DTLS Wrapped Socket + socket :: esockd:socket(), + %% Peername of the connection + peername :: emqx_types:peername(), + %% Sockname of the connection + sockname :: emqx_types:peername(), + %% Sock State + sockstate :: emqx_types:sockstate(), + %% The {active, N} option + active_n :: pos_integer(), + %% Send function + sendfun :: function(), + %% Limiter + limiter :: maybe(emqx_limiter:limiter()), + %% Limit Timer + limit_timer :: maybe(reference()), + %% Channel State + channel :: emqx_exproto_channel:channel(), + %% GC State + gc_state :: maybe(emqx_gc:gc_state()), + %% Stats Timer + stats_timer :: disabled | maybe(reference()), + %% Idle Timeout + idle_timeout :: integer(), + %% Idle Timer + idle_timer :: maybe(reference()) + }). + +-type(state() :: #state{}). + +-define(ACTIVE_N, 100). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). +-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). + +-define(ENABLED(X), (X =/= undefined)). + +-dialyzer({nowarn_function, + [ system_terminate/4 + , handle_call/3 + , handle_msg/2 + , shutdown/3 + , stop/3 + ]}). + +%% udp +start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) -> + Args = [self(), Socket, Peername, Options], + {ok, proc_lib:spawn_link(?MODULE, init, Args)}; + +%% tcp/ssl/dtls +start_link(esockd_transport, Sock, Options) -> + Socket = {esockd_transport, Sock}, + case esockd_transport:peername(Sock) of + {ok, Peername} -> + Args = [self(), Socket, Peername, Options], + {ok, proc_lib:spawn_link(?MODULE, init, Args)}; + R = {error, _} -> R + end. + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +%% @doc Get infos of the connection/channel. +-spec(info(pid()|state()) -> emqx_types:infos()). +info(CPid) when is_pid(CPid) -> + call(CPid, info); +info(State = #state{channel = Channel}) -> + ChanInfo = emqx_exproto_channel:info(Channel), + SockInfo = maps:from_list( + info(?INFO_KEYS, State)), + ChanInfo#{sockinfo => SockInfo}. + +info(Keys, State) when is_list(Keys) -> + [{Key, info(Key, State)} || Key <- Keys]; +info(socktype, #state{socket = Socket}) -> + esockd_type(Socket); +info(peername, #state{peername = Peername}) -> + Peername; +info(sockname, #state{sockname = Sockname}) -> + Sockname; +info(sockstate, #state{sockstate = SockSt}) -> + SockSt; +info(active_n, #state{active_n = ActiveN}) -> + ActiveN. + +-spec(stats(pid()|state()) -> emqx_types:stats()). +stats(CPid) when is_pid(CPid) -> + call(CPid, stats); +stats(#state{socket = Socket, + channel = Channel}) -> + SockStats = case esockd_getstat(Socket, ?SOCK_STATS) of + {ok, Ss} -> Ss; + {error, _} -> [] + end, + ConnStats = emqx_pd:get_counters(?CONN_STATS), + ChanStats = emqx_exproto_channel:stats(Channel), + ProcStats = emqx_misc:proc_stats(), + lists:append([SockStats, ConnStats, ChanStats, ProcStats]). + +call(Pid, Req) -> + gen_server:call(Pid, Req, infinity). + +cast(Pid, Req) -> + gen_server:cast(Pid, Req). + +stop(Pid) -> + gen_server:stop(Pid). + +%%-------------------------------------------------------------------- +%% Wrapped funcs +%%-------------------------------------------------------------------- + +esockd_wait(Socket = {udp, _SockPid, _Sock}) -> + {ok, Socket}; +esockd_wait({esockd_transport, Sock}) -> + case esockd_transport:wait(Sock) of + {ok, NSock} -> {ok, {esockd_transport, NSock}}; + R = {error, _} -> R + end. + +esockd_close({udp, _SockPid, _Sock}) -> + %% nothing to do for udp socket + %%gen_udp:close(Sock); + ok; +esockd_close({esockd_transport, Sock}) -> + esockd_transport:fast_close(Sock). + +esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) -> + nossl; +esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) -> + esockd_transport:ensure_ok_or_exit(Fun, [Sock]); +esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) -> + esockd_transport:ensure_ok_or_exit(Fun, [Socket]). + +esockd_type({udp, _, _}) -> + udp; +esockd_type({esockd_transport, Socket}) -> + esockd_transport:type(Socket). + +esockd_setopts({udp, _, _}, _) -> + ok; +esockd_setopts({esockd_transport, Socket}, Opts) -> + %% FIXME: DTLS works?? + esockd_transport:setopts(Socket, Opts). + +esockd_getstat({udp, _SockPid, Sock}, Stats) -> + inet:getstat(Sock, Stats); +esockd_getstat({esockd_transport, Sock}, Stats) -> + esockd_transport:getstat(Sock, Stats). + +sendfun({udp, _SockPid, Sock}, {Ip, Port}) -> + fun(Data) -> + gen_udp:send(Sock, Ip, Port, Data) + end; +sendfun({esockd_transport, Sock}, _) -> + fun(Data) -> + esockd_transport:async_send(Sock, Data) + end. + +%%-------------------------------------------------------------------- +%% callbacks +%%-------------------------------------------------------------------- + +-define(DEFAULT_GC_OPTS, #{count => 1000, bytes => 1024*1024}). +-define(DEFAULT_IDLE_TIMEOUT, 30000). +-define(DEFAULT_OOM_POLICY, #{max_heap_size => 4194304,message_queue_len => 32000}). + +init(Parent, WrappedSock, Peername, Options) -> + case esockd_wait(WrappedSock) of + {ok, NWrappedSock} -> + run_loop(Parent, init_state(NWrappedSock, Peername, Options)); + {error, Reason} -> + ok = esockd_close(WrappedSock), + exit_on_sock_error(Reason) + end. + +init_state(WrappedSock, Peername, Options) -> + {ok, Sockname} = esockd_ensure_ok_or_exit(sockname, WrappedSock), + Peercert = esockd_ensure_ok_or_exit(peercert, WrappedSock), + ConnInfo = #{socktype => esockd_type(WrappedSock), + peername => Peername, + sockname => Sockname, + peercert => Peercert, + conn_mod => ?MODULE + }, + + ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), + + %% FIXME: + %%Limiter = emqx_limiter:init(Options), + + Channel = emqx_exproto_channel:init(ConnInfo, Options), + + GcState = emqx_gc:init(?DEFAULT_GC_OPTS), + + IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT), + IdleTimer = start_timer(IdleTimeout, idle_timeout), + #state{socket = WrappedSock, + peername = Peername, + sockname = Sockname, + sockstate = idle, + active_n = ActiveN, + sendfun = sendfun(WrappedSock, Peername), + limiter = undefined, + channel = Channel, + gc_state = GcState, + stats_timer = undefined, + idle_timeout = IdleTimeout, + idle_timer = IdleTimer + }. + +run_loop(Parent, State = #state{socket = Socket, + peername = Peername}) -> + emqx_logger:set_metadata_peername(esockd:format(Peername)), + emqx_misc:tune_heap_size(?DEFAULT_OOM_POLICY), + case activate_socket(State) of + {ok, NState} -> + hibernate(Parent, NState); + {error, Reason} -> + ok = esockd_close(Socket), + exit_on_sock_error(Reason) + end. + +exit_on_sock_error(Reason) when Reason =:= einval; + Reason =:= enotconn; + Reason =:= closed -> + erlang:exit(normal); +exit_on_sock_error(timeout) -> + erlang:exit({shutdown, ssl_upgrade_timeout}); +exit_on_sock_error(Reason) -> + erlang:exit({shutdown, Reason}). + +%%-------------------------------------------------------------------- +%% Recv Loop + +recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> + receive + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); + {'EXIT', Parent, Reason} -> + terminate(Reason, State); + Msg -> + process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State)) + after + IdleTimeout -> + hibernate(Parent, cancel_stats_timer(State)) + end. + +hibernate(Parent, State) -> + proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]). + +%% Maybe do something here later. +wakeup_from_hib(Parent, State) -> recvloop(Parent, State). + +%%-------------------------------------------------------------------- +%% Ensure/cancel stats timer + +-compile({inline, [ensure_stats_timer/2]}). +ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> + State#state{stats_timer = start_timer(Timeout, emit_stats)}; +ensure_stats_timer(_Timeout, State) -> State. + +-compile({inline, [cancel_stats_timer/1]}). +cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -> + ok = emqx_misc:cancel_timer(TRef), + State#state{stats_timer = undefined}; +cancel_stats_timer(State) -> State. + +%%-------------------------------------------------------------------- +%% Process next Msg + +process_msg([], Parent, State) -> recvloop(Parent, State); + +process_msg([Msg|More], Parent, State) -> + case catch handle_msg(Msg, State) of + ok -> + process_msg(More, Parent, State); + {ok, NState} -> + process_msg(More, Parent, NState); + {ok, Msgs, NState} -> + process_msg(append_msg(More, Msgs), Parent, NState); + {stop, Reason} -> + terminate(Reason, State); + {stop, Reason, NState} -> + terminate(Reason, NState); + {'EXIT', Reason} -> + terminate(Reason, State) + end. + +-compile({inline, [append_msg/2]}). +append_msg([], Msgs) when is_list(Msgs) -> + Msgs; +append_msg([], Msg) -> [Msg]; +append_msg(Q, Msgs) when is_list(Msgs) -> + lists:append(Q, Msgs); +append_msg(Q, Msg) -> + lists:append(Q, [Msg]). + +%%-------------------------------------------------------------------- +%% Handle a Msg + +handle_msg({'$gen_call', From, Req}, State) -> + case handle_call(From, Req, State) of + {reply, Reply, NState} -> + gen_server:reply(From, Reply), + {ok, NState}; + {reply, Reply, Msgs, NState} -> + gen_server:reply(From, Reply), + {ok, next_msgs(Msgs), NState}; + {stop, Reason, Reply, NState} -> + gen_server:reply(From, Reply), + stop(Reason, NState) + end; + +handle_msg({'$gen_cast', Req}, State) -> + with_channel(handle_cast, [Req], State); + +handle_msg({datagram, _SockPid, Data}, State) -> + process_incoming(Data, State); + +handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> + process_incoming(Data, State); + +handle_msg({outgoing, Data}, State) -> + handle_outgoing(Data, State); + +handle_msg({Error, _Sock, Reason}, State) + when Error == tcp_error; Error == ssl_error -> + handle_info({sock_error, Reason}, State); + +handle_msg({Closed, _Sock}, State) + when Closed == tcp_closed; Closed == ssl_closed -> + handle_info({sock_closed, Closed}, close_socket(State)); + +%% TODO: udp_passive??? +handle_msg({Passive, _Sock}, State) + when Passive == tcp_passive; Passive == ssl_passive -> + %% In Stats + Bytes = emqx_pd:reset_counter(incoming_bytes), + Pubs = emqx_pd:reset_counter(incoming_pkt), + InStats = #{cnt => Pubs, oct => Bytes}, + %% Ensure Rate Limit + NState = ensure_rate_limit(InStats, State), + %% Run GC and Check OOM + NState1 = check_oom(run_gc(InStats, NState)), + handle_info(activate_socket, NState1); + +handle_msg(Deliver = {deliver, _Topic, _Msg}, + State = #state{active_n = ActiveN}) -> + Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + with_channel(handle_deliver, [Delivers], State); + +%% Something sent +%% TODO: Who will deliver this message? +handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> + case emqx_pd:get_counter(outgoing_pkt) > ActiveN of + true -> + Pubs = emqx_pd:reset_counter(outgoing_pkt), + Bytes = emqx_pd:reset_counter(outgoing_bytes), + OutStats = #{cnt => Pubs, oct => Bytes}, + {ok, check_oom(run_gc(OutStats, State))}; + false -> ok + end; + +handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> + handle_info({sock_error, Reason}, State); + +handle_msg({close, Reason}, State) -> + ?LOG(debug, "Force to close the socket due to ~p", [Reason]), + handle_info({sock_closed, Reason}, close_socket(State)); + +handle_msg({event, connected}, State = #state{channel = Channel}) -> + ClientId = emqx_exproto_channel:info(clientid, Channel), + emqx_cm:register_channel(ClientId, info(State), stats(State)); + +handle_msg({event, disconnected}, State = #state{channel = Channel}) -> + ClientId = emqx_exproto_channel:info(clientid, Channel), + emqx_cm:set_chan_info(ClientId, info(State)), + emqx_cm:connection_closed(ClientId), + {ok, State}; + +%handle_msg({event, _Other}, State = #state{channel = Channel}) -> +% ClientId = emqx_exproto_channel:info(clientid, Channel), +% emqx_cm:set_chan_info(ClientId, info(State)), +% emqx_cm:set_chan_stats(ClientId, stats(State)), +% {ok, State}; + +handle_msg({timeout, TRef, TMsg}, State) -> + handle_timeout(TRef, TMsg, State); + +handle_msg(Shutdown = {shutdown, _Reason}, State) -> + stop(Shutdown, State); + +handle_msg(Msg, State) -> + handle_info(Msg, State). + +%%-------------------------------------------------------------------- +%% Terminate + +terminate(Reason, State = #state{channel = Channel}) -> + ?LOG(debug, "Terminated due to ~p", [Reason]), + emqx_exproto_channel:terminate(Reason, Channel), + close_socket(State), + exit(Reason). + +%%-------------------------------------------------------------------- +%% Sys callbacks + +system_continue(Parent, _Debug, State) -> + recvloop(Parent, State). + +system_terminate(Reason, _Parent, _Debug, State) -> + terminate(Reason, State). + +system_code_change(State, _Mod, _OldVsn, _Extra) -> + {ok, State}. + +system_get_state(State) -> {ok, State}. + +%%-------------------------------------------------------------------- +%% Handle call + +handle_call(_From, info, State) -> + {reply, info(State), State}; + +handle_call(_From, stats, State) -> + {reply, stats(State), State}; + +handle_call(_From, Req, State = #state{channel = Channel}) -> + case emqx_exproto_channel:handle_call(Req, Channel) of + {reply, Reply, NChannel} -> + {reply, Reply, State#state{channel = NChannel}}; + {reply, Reply, Replies, NChannel} -> + {reply, Reply, Replies, State#state{channel = NChannel}}; + {shutdown, Reason, Reply, NChannel} -> + shutdown(Reason, Reply, State#state{channel = NChannel}) + end. + +%%-------------------------------------------------------------------- +%% Handle timeout + +handle_timeout(_TRef, idle_timeout, State) -> + shutdown(idle_timeout, State); + +handle_timeout(_TRef, limit_timeout, State) -> + NState = State#state{sockstate = idle, + limit_timer = undefined + }, + handle_info(activate_socket, NState); +handle_timeout(TRef, keepalive, State = #state{socket = Socket, + channel = Channel})-> + case emqx_exproto_channel:info(conn_state, Channel) of + disconnected -> {ok, State}; + _ -> + case esockd_getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> + handle_timeout(TRef, {keepalive, RecvOct}, State); + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end + end; +handle_timeout(_TRef, emit_stats, State = + #state{channel = Channel}) -> + ClientId = emqx_exproto_channel:info(clientid, Channel), + emqx_cm:set_chan_stats(ClientId, stats(State)), + {ok, State#state{stats_timer = undefined}}; + +handle_timeout(TRef, Msg, State) -> + with_channel(handle_timeout, [TRef, Msg], State). + +%%-------------------------------------------------------------------- +%% Parse incoming data + +-compile({inline, [process_incoming/2]}). +process_incoming(Data, State = #state{idle_timer = IdleTimer}) -> + ?LOG(debug, "RECV ~0p", [Data]), + Oct = iolist_size(Data), + emqx_pd:inc_counter(incoming_bytes, Oct), + emqx_pd:inc_counter(incoming_pkt, 1), + emqx_pd:inc_counter(recv_pkt, 1), + emqx_pd:inc_counter(recv_msg, 1), + % TODO: + %ok = emqx_metrics:inc('bytes.received', Oct), + + ok = emqx_misc:cancel_timer(IdleTimer), + NState = State#state{idle_timer = undefined}, + + with_channel(handle_in, [Data], NState). + +%%-------------------------------------------------------------------- +%% With Channel + +with_channel(Fun, Args, State = #state{channel = Channel}) -> + case erlang:apply(emqx_exproto_channel, Fun, Args ++ [Channel]) of + ok -> {ok, State}; + {ok, NChannel} -> + {ok, State#state{channel = NChannel}}; + {ok, Replies, NChannel} -> + {ok, next_msgs(Replies), State#state{channel = NChannel}}; + {shutdown, Reason, NChannel} -> + shutdown(Reason, State#state{channel = NChannel}) + end. + +%%-------------------------------------------------------------------- +%% Handle outgoing packets + +handle_outgoing(IoData, #state{socket = Socket, sendfun = SendFun}) -> + ?LOG(debug, "SEND ~0p", [IoData]), + + Oct = iolist_size(IoData), + + emqx_pd:inc_counter(send_pkt, 1), + emqx_pd:inc_counter(send_msg, 1), + emqx_pd:inc_counter(outgoing_pkt, 1), + emqx_pd:inc_counter(outgoing_bytes, Oct), + + %% FIXME: + %%ok = emqx_metrics:inc('bytes.sent', Oct), + case SendFun(IoData) of + ok -> ok; + Error = {error, _Reason} -> + %% Send an inet_reply to postpone handling the error + self() ! {inet_reply, Socket, Error}, + ok + end. + +%%-------------------------------------------------------------------- +%% Handle Info + +handle_info(activate_socket, State = #state{sockstate = OldSst}) -> + case activate_socket(State) of + {ok, NState = #state{sockstate = NewSst}} -> + if OldSst =/= NewSst -> + {ok, {event, NewSst}, NState}; + true -> {ok, NState} + end; + {error, Reason} -> + handle_info({sock_error, Reason}, State) + end; + +handle_info({sock_error, Reason}, State) -> + ?LOG(debug, "Socket error: ~p", [Reason]), + handle_info({sock_closed, Reason}, close_socket(State)); + +handle_info(Info, State) -> + with_channel(handle_info, [Info], State). + +%%-------------------------------------------------------------------- +%% Ensure rate limit + +ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> + case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of + false -> State; + {ok, Limiter1} -> + State#state{limiter = Limiter1}; + {pause, Time, Limiter1} -> + ?LOG(warning, "Pause ~pms due to rate limit", [Time]), + TRef = start_timer(Time, limit_timeout), + State#state{sockstate = blocked, + limiter = Limiter1, + limit_timer = TRef + } + end. + +%%-------------------------------------------------------------------- +%% Run GC and Check OOM + +run_gc(Stats, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; + {_IsGC, GcSt1} -> + State#state{gc_state = GcSt1} + end. + +check_oom(State) -> + OomPolicy = ?DEFAULT_OOM_POLICY, + case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of + Shutdown = {shutdown, _Reason} -> + erlang:send(self(), Shutdown); + _Other -> ok + end, + State. + +%%-------------------------------------------------------------------- +%% Activate Socket + +-compile({inline, [activate_socket/1]}). +activate_socket(State = #state{sockstate = closed}) -> + {ok, State}; +activate_socket(State = #state{sockstate = blocked}) -> + {ok, State}; +activate_socket(State = #state{socket = Socket, + active_n = N}) -> + %% FIXME: Works on dtls/udp ??? + %% How to hanlde buffer? + case esockd_setopts(Socket, [{active, N}]) of + ok -> {ok, State#state{sockstate = running}}; + Error -> Error + end. + +%%-------------------------------------------------------------------- +%% Close Socket + +close_socket(State = #state{sockstate = closed}) -> State; +close_socket(State = #state{socket = Socket}) -> + ok = esockd_close(Socket), + State#state{sockstate = closed}. + +%%-------------------------------------------------------------------- +%% Helper functions + +-compile({inline, [next_msgs/1]}). +next_msgs(Event) when is_tuple(Event) -> + Event; +next_msgs(More) when is_list(More) -> + More. + +-compile({inline, [shutdown/2, shutdown/3]}). +shutdown(Reason, State) -> + stop({shutdown, Reason}, State). + +shutdown(Reason, Reply, State) -> + stop({shutdown, Reason}, Reply, State). + +-compile({inline, [stop/2, stop/3]}). +stop(Reason, State) -> + {stop, Reason, State}. + +stop(Reason, Reply, State) -> + {stop, Reason, Reply, State}. diff --git a/apps/emqx_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_exproto/src/emqx_exproto_gcli.erl new file mode 100644 index 000000000..b4e654bd5 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto_gcli.erl @@ -0,0 +1,110 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% the gRPC client worker for ConnectionHandler service +-module(emqx_exproto_gcli). + +-behaviour(gen_server). + +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExProto gClient]"). + +%% APIs +-export([async_call/3]). + +-export([start_link/2]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-define(CONN_ADAPTER_MOD, emqx_exproto_v_1_connection_handler_client). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start_link(Pool, Id) -> + gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, [Pool, Id], []). + +async_call(FunName, Req = #{conn := Conn}, Options) -> + cast(pick(Conn), {rpc, FunName, Req, Options, self()}). + +%%-------------------------------------------------------------------- +%% cast, pick +%%-------------------------------------------------------------------- + +-compile({inline, [cast/2, pick/1]}). + +cast(Deliver, Msg) -> + gen_server:cast(Deliver, Msg). + +pick(Conn) -> + gproc_pool:pick_worker(exproto_gcli_pool, Conn). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id]) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{pool => Pool, id => Id}}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({rpc, Fun, Req, Options, From}, State) -> + case catch apply(?CONN_ADAPTER_MOD, Fun, [Req, Options]) of + {ok, Resp, _Metadata} -> + ?LOG(debug, "~p got {ok, ~0p, ~0p}", [Fun, Resp, _Metadata]), + reply(From, Fun, {ok, Resp}); + {error, {Code, Msg}, _Metadata} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", + [?CONN_ADAPTER_MOD, Fun, Req, Options, Code, Msg]), + reply(From, Fun, {error, {Code, Msg}}); + {error, Reason} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", + [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason]), + reply(From, Fun, {error, Reason}); + {'EXIT', Reason, Stk} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~p", + [?CONN_ADAPTER_MOD, Fun, Req, Options, Reason, Stk]), + reply(From, Fun, {error, Reason}) + end, + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +reply(Pid, Fun, Result) -> + Pid ! {hreply, Fun, Result}. diff --git a/apps/emqx_exproto/src/emqx_exproto_gsvr.erl b/apps/emqx_exproto/src/emqx_exproto_gsvr.erl new file mode 100644 index 000000000..0c0dc3a25 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto_gsvr.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% The gRPC server for ConnectionAdapter +-module(emqx_exproto_gsvr). + +-behavior(emqx_exproto_v_1_connection_adapter_bhvr). + +-include("emqx_exproto.hrl"). +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExProto gServer]"). + +-define(IS_QOS(X), (X =:= 0 orelse X =:= 1 orelse X =:= 2)). + +%% gRPC server callbacks +-export([ send/2 + , close/2 + , authenticate/2 + , start_timer/2 + , publish/2 + , subscribe/2 + , unsubscribe/2 + ]). + +%%-------------------------------------------------------------------- +%% gRPC ConnectionAdapter service +%%-------------------------------------------------------------------- + +-spec send(ctx:ctx(), emqx_exproto_pb:send_bytes_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +send(Ctx, Req = #{conn := Conn, bytes := Bytes}) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response(call(Conn, {send, Bytes})), Ctx}. + +-spec close(ctx:ctx(), emqx_exproto_pb:close_socket_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +close(Ctx, Req = #{conn := Conn}) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response(call(Conn, close)), Ctx}. + +-spec authenticate(ctx:ctx(), emqx_exproto_pb:authenticate_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +authenticate(Ctx, Req = #{conn := Conn, + password := Password, + clientinfo := ClientInfo}) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + case validate(clientinfo, ClientInfo) of + false -> + {ok, response({error, ?RESP_REQUIRED_PARAMS_MISSED}), Ctx}; + _ -> + {ok, response(call(Conn, {auth, ClientInfo, Password})), Ctx} + end. + +-spec start_timer(ctx:ctx(), emqx_exproto_pb:publish_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +start_timer(Ctx, Req = #{conn := Conn, type := Type, interval := Interval}) + when Type =:= 'KEEPALIVE' andalso Interval > 0 -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response(call(Conn, {start_timer, keepalive, Interval})), Ctx}; +start_timer(Ctx, Req) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Ctx}. + +-spec publish(ctx:ctx(), emqx_exproto_pb:publish_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +publish(Ctx, Req = #{conn := Conn, topic := Topic, qos := Qos, payload := Payload}) + when ?IS_QOS(Qos) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response(call(Conn, {publish, Topic, Qos, Payload})), Ctx}; + +publish(Ctx, Req) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Ctx}. + +-spec subscribe(ctx:ctx(), emqx_exproto_pb:subscribe_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +subscribe(Ctx, Req = #{conn := Conn, topic := Topic, qos := Qos}) + when ?IS_QOS(Qos) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response(call(Conn, {subscribe, Topic, Qos})), Ctx}; + +subscribe(Ctx, Req) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response({error, ?RESP_PARAMS_TYPE_ERROR}), Ctx}. + +-spec unsubscribe(ctx:ctx(), emqx_exproto_pb:unsubscribe_request()) + -> {ok, emqx_exproto_pb:code_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +unsubscribe(Ctx, Req = #{conn := Conn, topic := Topic}) -> + ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), + {ok, response(call(Conn, {unsubscribe, Topic})), Ctx}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +to_pid(ConnStr) -> + list_to_pid(binary_to_list(ConnStr)). + +call(ConnStr, Req) -> + case catch to_pid(ConnStr) of + {'EXIT', {badarg, _}} -> + {error, ?RESP_PARAMS_TYPE_ERROR, + <<"The conn type error">>}; + Pid when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + true -> + emqx_exproto_conn:call(Pid, Req); + false -> + {error, ?RESP_CONN_PROCESS_NOT_ALIVE, + <<"Connection process is not alive">>} + end + end. + +%%-------------------------------------------------------------------- +%% Data types + +stringfy(Reason) -> + unicode:characters_to_binary((io_lib:format("~0p", [Reason]))). + +validate(clientinfo, M) -> + Required = [proto_name, proto_ver, clientid], + lists:all(fun(K) -> maps:is_key(K, M) end, Required). + +response(ok) -> + #{code => ?RESP_SUCCESS}; +response({error, Code, Reason}) + when ?IS_GRPC_RESULT_CODE(Code) -> + #{code => Code, message => stringfy(Reason)}; +response({error, Code}) + when ?IS_GRPC_RESULT_CODE(Code) -> + #{code => Code}; +response(Other) -> + #{code => ?RESP_UNKNOWN, message => stringfy(Other)}. diff --git a/apps/emqx_exproto/src/emqx_exproto_sup.erl b/apps/emqx_exproto/src/emqx_exproto_sup.erl new file mode 100644 index 000000000..1ff4b0575 --- /dev/null +++ b/apps/emqx_exproto/src/emqx_exproto_sup.erl @@ -0,0 +1,86 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([ start_grpc_server/3 + , stop_grpc_server/1 + , start_grpc_client_channel/3 + , stop_grpc_client_channel/1 + ]). + +-export([init/1]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start_grpc_server(atom(), inet:port_number(), list()) + -> {ok, pid()} | {error, term()}. +start_grpc_server(Name, Port, SSLOptions) -> + ServerOpts = #{}, + GrpcOpts = #{service_protos => [emqx_exproto_pb], + services => #{'emqx.exproto.v1.ConnectionAdapter' => emqx_exproto_gsvr}}, + ListenOpts = #{port => Port, socket_options => [{reuseaddr, true}]}, + PoolOpts = #{size => 8}, + TransportOpts = maps:from_list(SSLOptions), + Spec = #{id => Name, + start => {grpcbox_services_sup, start_link, + [ServerOpts, GrpcOpts, ListenOpts, + PoolOpts, TransportOpts]}, + type => supervisor, + restart => permanent, + shutdown => infinity}, + supervisor:start_child(?MODULE, Spec). + +-spec stop_grpc_server(atom()) -> ok. +stop_grpc_server(Name) -> + ok = supervisor:terminate_child(?MODULE, Name), + ok = supervisor:delete_child(?MODULE, Name). + +-spec start_grpc_client_channel( + atom(), + [grpcbox_channel:endpoint()], + grpcbox_channel:options()) -> {ok, pid()} | {error, term()}. +start_grpc_client_channel(Name, Endpoints, Options0) -> + Options = Options0#{sync_start => true}, + Spec = #{id => Name, + start => {grpcbox_channel, start_link, [Name, Endpoints, Options]}, + type => worker}, + supervisor:start_child(?MODULE, Spec). + +-spec stop_grpc_client_channel(atom()) -> ok. +stop_grpc_client_channel(Name) -> + ok = supervisor:terminate_child(?MODULE, Name), + ok = supervisor:delete_child(?MODULE, Name). + +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- + +init([]) -> + %% gRPC Client Pool + PoolSize = emqx_vm:schedulers() * 2, + Pool = emqx_pool_sup:spec([exproto_gcli_pool, hash, PoolSize, + {emqx_exproto_gcli, start_link, []}]), + {ok, {{one_for_one, 10, 5}, [Pool]}}. diff --git a/apps/emqx_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl new file mode 100644 index 000000000..dc6a25c06 --- /dev/null +++ b/apps/emqx_exproto/test/emqx_exproto_SUITE.erl @@ -0,0 +1,454 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-import(emqx_exproto_echo_svr, + [ frame_connect/2 + , frame_connack/1 + , frame_publish/3 + , frame_puback/1 + , frame_subscribe/2 + , frame_suback/1 + , frame_unsubscribe/1 + , frame_unsuback/1 + , frame_disconnect/0 + ]). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-define(TCPOPTS, [binary, {active, false}]). +-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> + [{group, Name} || Name <- metrics()]. + +groups() -> + Cases = emqx_ct:all(?MODULE), + [{Name, Cases} || Name <- metrics()]. + +%% @private +metrics() -> + [tcp, ssl, udp, dtls]. + +init_per_group(GrpName, Cfg) -> + put(grpname, GrpName), + Svrs = emqx_exproto_echo_svr:start(), + emqx_ct_helpers:start_apps([emqx_exproto], fun set_sepecial_cfg/1), + emqx_logger:set_log_level(debug), + [{servers, Svrs}, {listener_type, GrpName} | Cfg]. + +end_per_group(_, Cfg) -> + emqx_ct_helpers:stop_apps([emqx_exproto]), + emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)). + +set_sepecial_cfg(emqx_exproto) -> + LisType = get(grpname), + Listeners = application:get_env(emqx_exproto, listeners, []), + SockOpts = socketopts(LisType), + UpgradeOpts = fun(Opts) -> + Opts2 = lists:keydelete(tcp_options, 1, Opts), + Opts3 = lists:keydelete(ssl_options, 1, Opts2), + Opts4 = lists:keydelete(udp_options, 1, Opts3), + Opts5 = lists:keydelete(dtls_options, 1, Opts4), + SockOpts ++ Opts5 + end, + NListeners = [{Proto, LisType, LisOn, UpgradeOpts(Opts)} + || {Proto, _Type, LisOn, Opts} <- Listeners], + application:set_env(emqx_exproto, listeners, NListeners); +set_sepecial_cfg(emqx) -> + application:set_env(emqx, allow_anonymous, true), + application:set_env(emqx, enable_acl_cache, false), + ok. + +%%-------------------------------------------------------------------- +%% Tests cases +%%-------------------------------------------------------------------- + +t_start_stop(_) -> + ok. + +t_mountpoint_echo(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + mountpoint => <<"ct/">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(0), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)), + PubBin1 = frame_publish(<<"t/dn">>, 0, <<"echo">>), + {ok, PubBin1} = recv(Sock, 5000), + + PubBin2 = frame_publish(<<"t/up">>, 0, <<"echo">>), + PubAckBin = frame_puback(0), + + emqx:subscribe(<<"ct/t/up">>), + + send(Sock, PubBin2), + {ok, PubAckBin} = recv(Sock, 5000), + + receive + {deliver, _, _} -> ok + after 1000 -> + error(echo_not_running) + end, + close(Sock). + +t_auth_deny(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, authenticate, + fun(_) -> {error, ?RC_NOT_AUTHORIZED} end), + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(1), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SockType =/= udp andalso begin + {error, closed} = recv(Sock, 5000) + end, + meck:unload([emqx_access_control]). + +t_acl_deny(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]), + ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> deny end), + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(1), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)), + + PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>), + PubBinFailedAck = frame_puback(1), + PubBinSuccesAck = frame_puback(0), + + send(Sock, PubBin), + {ok, PubBinFailedAck} = recv(Sock, 5000), + + meck:unload([emqx_access_control]), + + send(Sock, PubBin), + {ok, PubBinSuccesAck} = recv(Sock, 5000), + close(Sock). + +t_keepalive_timeout(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">>, + keepalive => 2 + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + DisconnectBin = frame_disconnect(), + {ok, DisconnectBin} = recv(Sock, 10000), + + SockType =/= udp andalso begin + {error, closed} = recv(Sock, 5000) + end, ok. + +t_hook_connected_disconnected(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + Parent = self(), + HookFun1 = fun(_, _) -> Parent ! connected, ok end, + HookFun2 = fun(_, _, _) -> Parent ! disconnected, ok end, + emqx:hook('client.connected', HookFun1), + emqx:hook('client.disconnected', HookFun2), + + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + receive + connected -> ok + after 1000 -> + error(hook_is_not_running) + end, + + DisconnectBin = frame_disconnect(), + send(Sock, DisconnectBin), + + receive + disconnected -> ok + after 1000 -> + error(hook_is_not_running) + end, + + SockType =/= udp andalso begin + {error, closed} = recv(Sock, 5000) + end, + emqx:unhook('client.connected', HookFun1), + emqx:unhook('client.disconnected', HookFun2). + +t_hook_session_subscribed_unsubscribed(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + Parent = self(), + HookFun1 = fun(_, _, _) -> Parent ! subscribed, ok end, + HookFun2 = fun(_, _, _) -> Parent ! unsubscribed, ok end, + emqx:hook('session.subscribed', HookFun1), + emqx:hook('session.unsubscribed', HookFun2), + + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(0), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + receive + subscribed -> ok + after 1000 -> + error(hook_is_not_running) + end, + + UnsubBin = frame_unsubscribe(<<"t/#">>), + UnsubAckBin = frame_unsuback(0), + + send(Sock, UnsubBin), + {ok, UnsubAckBin} = recv(Sock, 5000), + + receive + unsubscribed -> ok + after 1000 -> + error(hook_is_not_running) + end, + + close(Sock), + emqx:unhook('session.subscribed', HookFun1), + emqx:unhook('session.unsubscribed', HookFun2). + +t_hook_message_delivered(Cfg) -> + SockType = proplists:get_value(listener_type, Cfg), + Sock = open(SockType), + + Client = #{proto_name => <<"demo">>, + proto_ver => <<"v0.1">>, + clientid => <<"test_client_1">> + }, + Password = <<"123456">>, + + ConnBin = frame_connect(Client, Password), + ConnAckBin = frame_connack(0), + + send(Sock, ConnBin), + {ok, ConnAckBin} = recv(Sock, 5000), + + SubBin = frame_subscribe(<<"t/#">>, 1), + SubAckBin = frame_suback(0), + + send(Sock, SubBin), + {ok, SubAckBin} = recv(Sock, 5000), + + HookFun1 = fun(_, Msg) -> {ok, Msg#message{payload = <<"2">>}} end, + emqx:hook('message.delivered', HookFun1), + + emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)), + PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>), + {ok, PubBin1} = recv(Sock, 5000), + + close(Sock), + emqx:unhook('message.delivered', HookFun1). + +%%-------------------------------------------------------------------- +%% Utils + +rand_bytes() -> + crypto:strong_rand_bytes(rand:uniform(256)). + +%%-------------------------------------------------------------------- +%% Sock funcs + +open(tcp) -> + {ok, Sock} = gen_tcp:connect("127.0.0.1", 7993, ?TCPOPTS), + {tcp, Sock}; +open(udp) -> + {ok, Sock} = gen_udp:open(0, ?TCPOPTS), + {udp, Sock}; +open(ssl) -> + SslOpts = client_ssl_opts(), + {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?TCPOPTS ++ SslOpts), + {ssl, SslSock}; +open(dtls) -> + SslOpts = client_ssl_opts(), + {ok, SslSock} = ssl:connect("127.0.0.1", 7993, ?DTLSOPTS ++ SslOpts), + {dtls, SslSock}. + +send({tcp, Sock}, Bin) -> + gen_tcp:send(Sock, Bin); +send({udp, Sock}, Bin) -> + gen_udp:send(Sock, "127.0.0.1", 7993, Bin); +send({ssl, Sock}, Bin) -> + ssl:send(Sock, Bin); +send({dtls, Sock}, Bin) -> + ssl:send(Sock, Bin). + +recv({tcp, Sock}, Ts) -> + gen_tcp:recv(Sock, 0, Ts); +recv({udp, Sock}, Ts) -> + {ok, {_, _, Bin}} = gen_udp:recv(Sock, 0, Ts), + {ok, Bin}; +recv({ssl, Sock}, Ts) -> + ssl:recv(Sock, 0, Ts); +recv({dtls, Sock}, Ts) -> + ssl:recv(Sock, 0, Ts). + +close({tcp, Sock}) -> + gen_tcp:close(Sock); +close({udp, Sock}) -> + gen_udp:close(Sock); +close({ssl, Sock}) -> + ssl:close(Sock); +close({dtls, Sock}) -> + ssl:close(Sock). + +%%-------------------------------------------------------------------- +%% Server-Opts + +socketopts(tcp) -> + [{tcp_options, tcp_opts()}]; +socketopts(ssl) -> + [{tcp_options, tcp_opts()}, + {ssl_options, ssl_opts()}]; +socketopts(udp) -> + [{udp_options, udp_opts()}]; +socketopts(dtls) -> + [{udp_options, udp_opts()}, + {dtls_options, dtls_opts()}]. + +tcp_opts() -> + [{send_timeout, 15000}, + {send_timeout_close, true}, + {backlog, 100}, + {nodelay, true} | udp_opts()]. + +udp_opts() -> + [{recbuf, 1024}, + {sndbuf, 1024}, + {buffer, 1024}, + {reuseaddr, true}]. + +ssl_opts() -> + Path = emqx_ct_helpers:deps_path(emqx, "etc/certs"), + [{versions, ['tlsv1.2','tlsv1.1',tlsv1]}, + {ciphers, ciphers()}, + {keyfile, Path ++ "/key.pem"}, + {certfile, Path ++ "/cert.pem"}, + {cacertfile, Path ++ "/cacert.pem"}, + {verify, verify_peer}, + {fail_if_no_peer_cert, true}, + {secure_renegotiate, false}, + {reuse_sessions, true}, + {honor_cipher_order, true}]. + +dtls_opts() -> + Opts = ssl_opts(), + lists:keyreplace(versions, 1, Opts, {versions, ['dtlsv1.2', 'dtlsv1']}). + +ciphers() -> + proplists:get_value(ciphers, emqx_ct_helpers:client_ssl()). + +%%-------------------------------------------------------------------- +%% Client-Opts + +client_ssl_opts() -> + Path = emqx_ct_helpers:deps_path(emqx, "etc/certs"), + [{keyfile, Path ++ "/client-key.pem"}, + {certfile, Path ++ "/client-cert.pem"}, + {cacertfile, Path ++ "/cacert.pem"}]. diff --git a/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl b/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl new file mode 100644 index 000000000..8d9963fb4 --- /dev/null +++ b/apps/emqx_exproto/test/emqx_exproto_echo_svr.erl @@ -0,0 +1,238 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exproto_echo_svr). + +-behavior(emqx_exproto_v_1_connection_handler_bhvr). + +-export([ start/0 + , stop/1 + ]). + +-export([ frame_connect/2 + , frame_connack/1 + , frame_publish/3 + , frame_puback/1 + , frame_subscribe/2 + , frame_suback/1 + , frame_unsubscribe/1 + , frame_unsuback/1 + , frame_disconnect/0 + ]). + +-export([ on_socket_created/2 + , on_received_bytes/2 + , on_socket_closed/2 + , on_timer_timeout/2 + , on_received_messages/2 + ]). + +-define(HTTP, #{grpc_opts => #{service_protos => [emqx_exproto_pb], + services => #{'emqx.exproto.v1.ConnectionHandler' => ?MODULE}}, + listen_opts => #{port => 9001, + socket_options => []}, + pool_opts => #{size => 8}, + transport_opts => #{ssl => false}}). + +-define(CLIENT, emqx_exproto_v_1_connection_adapter_client). +-define(send(Req), ?CLIENT:send(Req, #{channel => ct_test_channel})). +-define(close(Req), ?CLIENT:close(Req, #{channel => ct_test_channel})). +-define(authenticate(Req), ?CLIENT:authenticate(Req, #{channel => ct_test_channel})). +-define(start_timer(Req), ?CLIENT:start_timer(Req, #{channel => ct_test_channel})). +-define(publish(Req), ?CLIENT:publish(Req, #{channel => ct_test_channel})). +-define(subscribe(Req), ?CLIENT:subscribe(Req, #{channel => ct_test_channel})). +-define(unsubscribe(Req), ?CLIENT:unsubscribe(Req, #{channel => ct_test_channel})). + +-define(TYPE_CONNECT, 1). +-define(TYPE_CONNACK, 2). +-define(TYPE_PUBLISH, 3). +-define(TYPE_PUBACK, 4). +-define(TYPE_SUBSCRIBE, 5). +-define(TYPE_SUBACK, 6). +-define(TYPE_UNSUBSCRIBE, 7). +-define(TYPE_UNSUBACK, 8). +-define(TYPE_DISCONNECT, 9). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +start() -> + application:ensure_all_started(grpcbox), + [start_channel(), start_server()]. + +start_channel() -> + grpcbox_channel_sup:start_child(ct_test_channel, [{http, "localhost", 9100, []}], #{}). + +start_server() -> + grpcbox:start_server(?HTTP). + +stop([ChannPid, SvrPid]) -> + supervisor:terminate_child(grpcbox_channel_sup, ChannPid), + supervisor:terminate_child(grpcbox_services_simple_sup, SvrPid). + +%%-------------------------------------------------------------------- +%% Protocol Adapter callbacks +%%-------------------------------------------------------------------- + +-spec on_socket_created(ctx:ctx(), emqx_exproto_pb:created_socket_request()) -> + {ok, emqx_exproto_pb:empty_success(), ctx:ctx()} | grpcbox_stream:grpc_error_response(). +on_socket_created(Ctx, Req) -> + io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_received_bytes(ctx:ctx(), emqx_exproto_pb:received_bytes_request()) -> + {ok, emqx_exproto_pb:empty_success(), ctx:ctx()} | grpcbox_stream:grpc_error_response(). +on_received_bytes(Ctx, Req = #{conn := Conn, bytes := Bytes}) -> + io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), + #{<<"type">> := Type} = Params = emqx_json:decode(Bytes, [return_maps]), + _ = handle_in(Conn, Type, Params), + {ok, #{}, Ctx}. + +-spec on_socket_closed(ctx:ctx(), emqx_exproto_pb:socket_closed_request()) -> + {ok, emqx_exproto_pb:empty_success(), ctx:ctx()} | grpcbox_stream:grpc_error_response(). +on_socket_closed(Ctx, Req) -> + io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +on_timer_timeout(Ctx, Req = #{conn := Conn, type := 'KEEPALIVE'}) -> + io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), + handle_out(Conn, ?TYPE_DISCONNECT), + ?close(#{conn => Conn}), + {ok, #{}, Ctx}. + +-spec on_received_messages(ctx:ctx(), emqx_exproto_pb:received_messages_request()) -> + {ok, emqx_exproto_pb:empty_success(), ctx:ctx()} | grpcbox_stream:grpc_error_response(). +on_received_messages(Ctx, Req = #{conn := Conn, messages := Messages}) -> + io:format("~p: ~0p~n", [?FUNCTION_NAME, Req]), + lists:foreach(fun(Message) -> + handle_out(Conn, ?TYPE_PUBLISH, Message) + end, Messages), + {ok, #{}, Ctx}. + +%%-------------------------------------------------------------------- +%% The Protocol Example: +%% CONN: +%% {"type": 1, "clientinfo": {...}} +%% +%% CONNACK: +%% {"type": 2, "code": 0} +%% +%% PUBLISH: +%% {"type": 3, "topic": "xxx", "payload": "", "qos": 0} +%% +%% PUBACK: +%% {"type": 4, "code": 0} +%% +%% SUBSCRIBE: +%% {"type": 5, "topic": "xxx", "qos": 1} +%% +%% SUBACK: +%% {"type": 6, "code": 0} +%% +%% DISCONNECT: +%% {"type": 7, "code": 1} +%%-------------------------------------------------------------------- + +handle_in(Conn, ?TYPE_CONNECT, #{<<"clientinfo">> := ClientInfo, <<"password">> := Password}) -> + NClientInfo = maps:from_list([{binary_to_atom(K, utf8), V} || {K, V} <- maps:to_list(ClientInfo)]), + case ?authenticate(#{conn => Conn, clientinfo => NClientInfo, password => Password}) of + {ok, #{code := 'SUCCESS'}, _} -> + case maps:get(keepalive, NClientInfo, 0) of + 0 -> ok; + Intv -> + io:format("Try call start_timer with ~ps", [Intv]), + ?start_timer(#{conn => Conn, type => 'KEEPALIVE', interval => Intv}) + end, + handle_out(Conn, ?TYPE_CONNACK, 0); + _ -> + handle_out(Conn, ?TYPE_CONNACK, 1), + ?close(#{conn => Conn}) + end; +handle_in(Conn, ?TYPE_PUBLISH, #{<<"topic">> := Topic, + <<"qos">> := Qos, + <<"payload">> := Payload}) -> + case ?publish(#{conn => Conn, topic => Topic, qos => Qos, payload => Payload}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_PUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_PUBACK, 1) + end; +handle_in(Conn, ?TYPE_SUBSCRIBE, #{<<"qos">> := Qos, <<"topic">> := Topic}) -> + case ?subscribe(#{conn => Conn, topic => Topic, qos => Qos}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_SUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_SUBACK, 1) + end; +handle_in(Conn, ?TYPE_UNSUBSCRIBE, #{<<"topic">> := Topic}) -> + case ?unsubscribe(#{conn => Conn, topic => Topic}) of + {ok, #{code := 'SUCCESS'}, _} -> + handle_out(Conn, ?TYPE_UNSUBACK, 0); + _ -> + handle_out(Conn, ?TYPE_UNSUBACK, 1) + end; + +handle_in(Conn, ?TYPE_DISCONNECT, _) -> + ?close(#{conn => Conn}). + +handle_out(Conn, ?TYPE_CONNACK, Code) -> + ?send(#{conn => Conn, bytes => frame_connack(Code)}); +handle_out(Conn, ?TYPE_PUBACK, Code) -> + ?send(#{conn => Conn, bytes => frame_puback(Code)}); +handle_out(Conn, ?TYPE_SUBACK, Code) -> + ?send(#{conn => Conn, bytes => frame_suback(Code)}); +handle_out(Conn, ?TYPE_UNSUBACK, Code) -> + ?send(#{conn => Conn, bytes => frame_unsuback(Code)}); +handle_out(Conn, ?TYPE_PUBLISH, #{qos := Qos, topic := Topic, payload := Payload}) -> + ?send(#{conn => Conn, bytes => frame_publish(Topic, Qos, Payload)}). + +handle_out(Conn, ?TYPE_DISCONNECT) -> + ?send(#{conn => Conn, bytes => frame_disconnect()}). + +%%-------------------------------------------------------------------- +%% Frame + +frame_connect(ClientInfo, Password) -> + emqx_json:encode(#{type => ?TYPE_CONNECT, + clientinfo => ClientInfo, + password => Password}). +frame_connack(Code) -> + emqx_json:encode(#{type => ?TYPE_CONNACK, code => Code}). + +frame_publish(Topic, Qos, Payload) -> + emqx_json:encode(#{type => ?TYPE_PUBLISH, + topic => Topic, + qos => Qos, + payload => Payload}). + +frame_puback(Code) -> + emqx_json:encode(#{type => ?TYPE_PUBACK, code => Code}). + +frame_subscribe(Topic, Qos) -> + emqx_json:encode(#{type => ?TYPE_SUBSCRIBE, topic => Topic, qos => Qos}). + +frame_suback(Code) -> + emqx_json:encode(#{type => ?TYPE_SUBACK, code => Code}). + +frame_unsubscribe(Topic) -> + emqx_json:encode(#{type => ?TYPE_UNSUBSCRIBE, topic => Topic}). + +frame_unsuback(Code) -> + emqx_json:encode(#{type => ?TYPE_UNSUBACK, code => Code}). + +frame_disconnect() -> + emqx_json:encode(#{type => ?TYPE_DISCONNECT}). diff --git a/etc/emqx_cloud.d/emqx_exproto.conf b/etc/emqx_cloud.d/emqx_exproto.conf new file mode 100644 index 000000000..a64153791 --- /dev/null +++ b/etc/emqx_cloud.d/emqx_exproto.conf @@ -0,0 +1,252 @@ +##==================================================================== +## EMQ X ExProto +##==================================================================== + +exproto.server.http.port = 9100 + +exproto.server.https.port = 9101 +exproto.server.https.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem +exproto.server.https.certfile = {{ platform_etc_dir }}/certs/cert.pem +exproto.server.https.keyfile = {{ platform_etc_dir }}/certs/key.pem + +##-------------------------------------------------------------------- +## Listeners +##-------------------------------------------------------------------- + +##-------------------------------------------------------------------- +## MQTT/TCP - External TCP Listener for MQTT Protocol + +## The IP address and port that the listener will bind. +## +## Value: ://: +## +## Examples: tcp://0.0.0.0:7993 | ssl://127.0.0.1:7994 +exproto.listener.protoname = tcp://0.0.0.0:7993 + +## The ConnectionHandler server address +## +exproto.listener.protoname.connection_handler_url = http://127.0.0.1:9001 + +#exproto.listener.protoname.connection_handler_certfile = +#exproto.listener.protoname.connection_handler_cacertfile = +#exproto.listener.protoname.connection_handler_keyfile = + +## The acceptor pool for external MQTT/TCP listener. +## +## Value: Number +exproto.listener.protoname.acceptors = 8 + +## Maximum number of concurrent MQTT/TCP connections. +## +## Value: Number +exproto.listener.protoname.max_connections = 1024000 + +## Maximum external connections per second. +## +## Value: Number +exproto.listener.protoname.max_conn_rate = 1000 + +## Specify the {active, N} option for the external MQTT/TCP Socket. +## +## Value: Number +exproto.listener.protoname.active_n = 100 + +## Idle timeout +## +## Value: Duration +exproto.listener.protoname.idle_timeout = 30s + +## The access control rules for the MQTT/TCP listener. +## +## See: https://github.com/emqtt/esockd#allowdeny +## +## Value: ACL Rule +## +## Example: allow 192.168.0.0/24 +exproto.listener.protoname.access.1 = allow all + +## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed +## behind HAProxy or Nginx. +## +## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ +## +## Value: on | off +## exproto.listener.protoname.proxy_protocol = on + +## Sets the timeout for proxy protocol. EMQ X will close the TCP connection +## if no proxy protocol packet recevied within the timeout. +## +## Value: Duration +#exproto.listener.protoname.proxy_protocol_timeout = 3s + +## The TCP backlog defines the maximum length that the queue of pending +## connections can grow to. +## +## Value: Number >= 0 +exproto.listener.protoname.backlog = 1024 + +## The TCP send timeout for external MQTT connections. +## +## Value: Duration +exproto.listener.protoname.send_timeout = 15s + +## Close the TCP connection if send timeout. +## +## Value: on | off +exproto.listener.protoname.send_timeout_close = on + +## The TCP receive buffer(os kernel) for MQTT connections. +## +## See: http://erlang.org/doc/man/inet.html +## +## Value: Bytes +#exproto.listener.protoname.recbuf = 2KB + +## The TCP send buffer(os kernel) for MQTT connections. +## +## See: http://erlang.org/doc/man/inet.html +## +## Value: Bytes +#exproto.listener.protoname.sndbuf = 2KB + +## The size of the user-level software buffer used by the driver. +## Not to be confused with options sndbuf and recbuf, which correspond +## to the Kernel socket buffers. It is recommended to have val(buffer) +## >= max(val(sndbuf),val(recbuf)) to avoid performance issues because +## of unnecessary copying. val(buffer) is automatically set to the above +## maximum when values sndbuf or recbuf are set. +## +## See: http://erlang.org/doc/man/inet.html +## +## Value: Bytes +#exproto.listener.protoname.buffer = 2KB + +## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. +## +## Value: on | off +#exproto.listener.protoname.tune_buffer = off + +## The TCP_NODELAY flag for MQTT connections. Small amounts of data are +## sent immediately if the option is enabled. +## +## Value: true | false +exproto.listener.protoname.nodelay = true + +## The SO_REUSEADDR flag for TCP listener. +## +## Value: true | false +exproto.listener.protoname.reuseaddr = true + + +##-------------------------------------------------------------------- +## TLS/DTLS options + +## TLS versions only to protect from POODLE attack. +## +## See: http://erlang.org/doc/man/ssl.html +## +## Value: String, seperated by ',' +#exproto.listener.protoname.tls_versions = tlsv1.2,tlsv1.1,tlsv1 + +## Path to the file containing the user's private PEM-encoded key. +## +## See: http://erlang.org/doc/man/ssl.html +## +## Value: File +#exproto.listener.protoname.keyfile = {{ platform_etc_dir }}/certs/key.pem + +## Path to a file containing the user certificate. +## +## See: http://erlang.org/doc/man/ssl.html +## +## Value: File +#exproto.listener.protoname.certfile = {{ platform_etc_dir }}/certs/cert.pem + +## Path to the file containing PEM-encoded CA certificates. The CA certificates +## are used during server authentication and when building the client certificate chain. +## +## Value: File +#exproto.listener.protoname.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## The Ephemeral Diffie-Helman key exchange is a very effective way of +## ensuring Forward Secrecy by exchanging a set of keys that never hit +## the wire. Since the DH key is effectively signed by the private key, +## it needs to be at least as strong as the private key. In addition, +## the default DH groups that most of the OpenSSL installations have +## are only a handful (since they are distributed with the OpenSSL +## package that has been built for the operating system it’s running on) +## and hence predictable (not to mention, 1024 bits only). +## In order to escape this situation, first we need to generate a fresh, +## strong DH group, store it in a file and then use the option above, +## to force our SSL application to use the new DH group. Fortunately, +## OpenSSL provides us with a tool to do that. Simply run: +## openssl dhparam -out dh-params.pem 2048 +## +## Value: File +#exproto.listener.protoname.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem + +## A server only does x509-path validation in mode verify_peer, +## as it then sends a certificate request to the client (this +## message is not sent if the verify option is verify_none). +## You can then also want to specify option fail_if_no_peer_cert. +## More information at: http://erlang.org/doc/man/ssl.html +## +## Value: verify_peer | verify_none +#exproto.listener.protoname.verify = verify_peer + +## Used together with {verify, verify_peer} by an SSL server. If set to true, +## the server fails if the client does not have a certificate to send, that is, +## sends an empty certificate. +## +## Value: true | false +#exproto.listener.protoname.fail_if_no_peer_cert = true + +## This is the single most important configuration option of an Erlang SSL +## application. Ciphers (and their ordering) define the way the client and +## server encrypt information over the wire, from the initial Diffie-Helman +## key exchange, the session key encryption ## algorithm and the message +## digest algorithm. Selecting a good cipher suite is critical for the +## application’s data security, confidentiality and performance. +## +## The cipher list above offers: +## +## A good balance between compatibility with older browsers. +## It can get stricter for Machine-To-Machine scenarios. +## Perfect Forward Secrecy. +## No old/insecure encryption and HMAC algorithms +## +## Most of it was copied from Mozilla’s Server Side TLS article +## +## Value: Ciphers +#exproto.listener.protoname.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA + +## Ciphers for TLS PSK. +## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot +## be configured at the same time. +## See 'https://tools.ietf.org/html/rfc4279#section-2'. +#exproto.listener.protoname.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA + +## SSL parameter renegotiation is a feature that allows a client and a server +## to renegotiate the parameters of the SSL connection on the fly. +## RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, +## you drop support for the insecure renegotiation, prone to MitM attacks. +## +## Value: on | off +#exproto.listener.protoname.secure_renegotiate = off + +## A performance optimization setting, it allows clients to reuse +## pre-existing sessions, instead of initializing new ones. +## Read more about it here. +## +## See: http://erlang.org/doc/man/ssl.html +## +## Value: on | off +#exproto.listener.protoname.reuse_sessions = on + +## An important security setting, it forces the cipher to be set based +## on the server-specified order instead of the client-specified order, +## hence enforcing the (usually more properly configured) security +## ordering of the server administrator. +## +## Value: on | off +#exproto.listener.protoname.honor_cipher_order = on